Skip to main content
Integrate AnomalyArmor with Apache Airflow to gate pipelines on data quality and freshness.

Installation

Install the SDK in your Airflow environment:
pip install anomalyarmor-cli

Configuration

Set ARMOR_API_KEY as an Airflow variable or environment variable:
# Airflow Variable
airflow variables set ARMOR_API_KEY "aa_live_xxx"

# Or environment variable
export ARMOR_API_KEY="aa_live_xxx"

Pre-flight Freshness Check

The most common pattern: fail the task if upstream data is stale.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def check_upstream_freshness():
    """Gate: Fail task if upstream data is stale."""
    client = Client()

    # This raises StalenessError if data is stale
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")
    print("Upstream data is fresh, proceeding...")

def run_transformation():
    """Main transformation logic."""
    print("Running dbt models...")
    # subprocess.run(["dbt", "run", "--select", "orders_mart"])

with DAG(
    "orders_pipeline",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@hourly",
    catchup=False,
) as dag:

    freshness_gate = PythonOperator(
        task_id="check_freshness",
        python_callable=check_upstream_freshness,
    )

    transform = PythonOperator(
        task_id="run_transformation",
        python_callable=run_transformation,
    )

    freshness_gate >> transform
Use read-only scope for pre-flight checks. You only need read-write if triggering refreshes.

Check Multiple Sources

Verify all upstream dependencies before running:
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def check_all_upstream():
    """Check all upstream sources are fresh."""
    client = Client()

    upstream_tables = [
        "snowflake.prod.warehouse.orders",
        "snowflake.prod.warehouse.customers",
        "snowflake.prod.warehouse.products",
    ]

    stale_tables = []
    for table in upstream_tables:
        try:
            client.freshness.require_fresh(table)
        except StalenessError:
            stale_tables.append(table)

    if stale_tables:
        raise Exception(f"Stale upstream data: {stale_tables}")

    print("All upstream sources are fresh!")

Trigger Freshness Check

Trigger a freshness check and wait for completion:
from anomalyarmor import Client

def refresh_and_check():
    """Trigger freshness check, then verify."""
    client = Client()

    # Trigger refresh and wait
    result = client.freshness.refresh(
        "snowflake.prod.warehouse.orders",
        wait=True,
    )

    print(f"Refresh job {result.job_id}: {result.status}")

    # Now check freshness
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")

Sensor Pattern

Wait for data to become fresh:
from airflow.sensors.python import PythonSensor
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def is_data_fresh():
    """Return True when data is fresh."""
    client = Client()
    try:
        client.freshness.require_fresh("snowflake.prod.warehouse.orders")
        return True
    except StalenessError:
        return False

freshness_sensor = PythonSensor(
    task_id="wait_for_fresh_data",
    python_callable=is_data_fresh,
    poke_interval=300,  # Check every 5 minutes
    timeout=3600,       # Timeout after 1 hour
    mode="poke",
)

Check Lineage

Verify all upstream dependencies using lineage:
from anomalyarmor import Client

def check_upstream_via_lineage():
    """Check all upstream sources via lineage API."""
    client = Client()

    # Get upstream dependencies
    lineage = client.lineage.get("snowflake.prod.mart.orders_summary")

    print(f"Checking {len(lineage.upstream)} upstream sources...")

    for upstream in lineage.upstream:
        client.freshness.require_fresh(upstream.qualified_name)
        print(f"  {upstream.qualified_name}")

    print("All upstream sources are fresh!")

Error Handling

Handle different error types appropriately:
from anomalyarmor import Client
from anomalyarmor.exceptions import (
    StalenessError,
    AuthenticationError,
    RateLimitError,
    ArmorError,
)
import time

def check_with_retry():
    """Check freshness with error handling."""
    client = Client()

    max_retries = 3
    for attempt in range(max_retries):
        try:
            client.freshness.require_fresh("snowflake.prod.warehouse.orders")
            return  # Success

        except StalenessError as e:
            # Data is stale - this is expected, fail the task
            raise Exception(f"Data is stale: last updated {e.last_updated}")

        except RateLimitError as e:
            # Rate limited - retry after waiting
            if attempt < max_retries - 1:
                print(f"Rate limited, waiting {e.retry_after}s...")
                time.sleep(e.retry_after)
            else:
                raise

        except AuthenticationError:
            # Auth error - likely config issue
            raise Exception("Invalid ARMOR_API_KEY")

        except ArmorError as e:
            # Other API error - retry
            if attempt < max_retries - 1:
                print(f"API error, retrying: {e}")
                time.sleep(10)
            else:
                raise

Complete DAG Example

Full example with freshness gate, transformation, and post-run schema check:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

default_args = {
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

def check_upstream():
    client = Client()
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")
    client.freshness.require_fresh("snowflake.prod.warehouse.customers")

def trigger_schema_check():
    client = Client()
    result = client.schema.refresh(
        "snowflake.prod.mart.orders_summary",
        wait=True,
    )
    print(f"Schema check: {result.status}")

with DAG(
    "orders_mart_pipeline",
    default_args=default_args,
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
    tags=["data-quality", "orders"],
) as dag:

    check_freshness = PythonOperator(
        task_id="check_upstream_freshness",
        python_callable=check_upstream,
    )

    run_dbt = BashOperator(
        task_id="run_dbt_models",
        bash_command="cd /dbt && dbt run --select orders_mart",
    )

    verify_schema = PythonOperator(
        task_id="verify_schema",
        python_callable=trigger_schema_check,
    )

    check_freshness >> run_dbt >> verify_schema

Next Steps

Python SDK

SDK reference and patterns

Freshness API

Freshness endpoint details

Lineage API

Explore data dependencies

Alerts

Set up freshness alerts