Installation
Install the SDK in your Airflow environment:pip install anomalyarmor-cli
Configuration
SetARMOR_API_KEY as an Airflow variable or environment variable:
# Airflow Variable
airflow variables set ARMOR_API_KEY "aa_live_xxx"
# Or environment variable
export ARMOR_API_KEY="aa_live_xxx"
Pre-flight Freshness Check
The most common pattern: fail the task if upstream data is stale.from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
def check_upstream_freshness():
"""Gate: Fail task if upstream data is stale."""
client = Client()
# This raises StalenessError if data is stale
client.freshness.require_fresh("snowflake.prod.warehouse.orders")
print("Upstream data is fresh, proceeding...")
def run_transformation():
"""Main transformation logic."""
print("Running dbt models...")
# subprocess.run(["dbt", "run", "--select", "orders_mart"])
with DAG(
"orders_pipeline",
start_date=datetime(2024, 1, 1),
schedule_interval="@hourly",
catchup=False,
) as dag:
freshness_gate = PythonOperator(
task_id="check_freshness",
python_callable=check_upstream_freshness,
)
transform = PythonOperator(
task_id="run_transformation",
python_callable=run_transformation,
)
freshness_gate >> transform
Use
read-only scope for pre-flight checks. You only need read-write if triggering refreshes.Check Multiple Sources
Verify all upstream dependencies before running:from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
def check_all_upstream():
"""Check all upstream sources are fresh."""
client = Client()
upstream_tables = [
"snowflake.prod.warehouse.orders",
"snowflake.prod.warehouse.customers",
"snowflake.prod.warehouse.products",
]
stale_tables = []
for table in upstream_tables:
try:
client.freshness.require_fresh(table)
except StalenessError:
stale_tables.append(table)
if stale_tables:
raise Exception(f"Stale upstream data: {stale_tables}")
print("All upstream sources are fresh!")
Trigger Freshness Check
Trigger a freshness check and wait for completion:from anomalyarmor import Client
def refresh_and_check():
"""Trigger freshness check, then verify."""
client = Client()
# Trigger refresh and wait
result = client.freshness.refresh(
"snowflake.prod.warehouse.orders",
wait=True,
)
print(f"Refresh job {result.job_id}: {result.status}")
# Now check freshness
client.freshness.require_fresh("snowflake.prod.warehouse.orders")
Sensor Pattern
Wait for data to become fresh:from airflow.sensors.python import PythonSensor
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
def is_data_fresh():
"""Return True when data is fresh."""
client = Client()
try:
client.freshness.require_fresh("snowflake.prod.warehouse.orders")
return True
except StalenessError:
return False
freshness_sensor = PythonSensor(
task_id="wait_for_fresh_data",
python_callable=is_data_fresh,
poke_interval=300, # Check every 5 minutes
timeout=3600, # Timeout after 1 hour
mode="poke",
)
Check Lineage
Verify all upstream dependencies using lineage:from anomalyarmor import Client
def check_upstream_via_lineage():
"""Check all upstream sources via lineage API."""
client = Client()
# Get upstream dependencies
lineage = client.lineage.get("snowflake.prod.mart.orders_summary")
print(f"Checking {len(lineage.upstream)} upstream sources...")
for upstream in lineage.upstream:
client.freshness.require_fresh(upstream.qualified_name)
print(f" {upstream.qualified_name}")
print("All upstream sources are fresh!")
Error Handling
Handle different error types appropriately:from anomalyarmor import Client
from anomalyarmor.exceptions import (
StalenessError,
AuthenticationError,
RateLimitError,
ArmorError,
)
import time
def check_with_retry():
"""Check freshness with error handling."""
client = Client()
max_retries = 3
for attempt in range(max_retries):
try:
client.freshness.require_fresh("snowflake.prod.warehouse.orders")
return # Success
except StalenessError as e:
# Data is stale - this is expected, fail the task
raise Exception(f"Data is stale: last updated {e.last_updated}")
except RateLimitError as e:
# Rate limited - retry after waiting
if attempt < max_retries - 1:
print(f"Rate limited, waiting {e.retry_after}s...")
time.sleep(e.retry_after)
else:
raise
except AuthenticationError:
# Auth error - likely config issue
raise Exception("Invalid ARMOR_API_KEY")
except ArmorError as e:
# Other API error - retry
if attempt < max_retries - 1:
print(f"API error, retrying: {e}")
time.sleep(10)
else:
raise
Complete DAG Example
Full example with freshness gate, transformation, and post-run schema check:from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
default_args = {
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def check_upstream():
client = Client()
client.freshness.require_fresh("snowflake.prod.warehouse.orders")
client.freshness.require_fresh("snowflake.prod.warehouse.customers")
def trigger_schema_check():
client = Client()
result = client.schema.refresh(
"snowflake.prod.mart.orders_summary",
wait=True,
)
print(f"Schema check: {result.status}")
with DAG(
"orders_mart_pipeline",
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
catchup=False,
tags=["data-quality", "orders"],
) as dag:
check_freshness = PythonOperator(
task_id="check_upstream_freshness",
python_callable=check_upstream,
)
run_dbt = BashOperator(
task_id="run_dbt_models",
bash_command="cd /dbt && dbt run --select orders_mart",
)
verify_schema = PythonOperator(
task_id="verify_schema",
python_callable=trigger_schema_check,
)
check_freshness >> run_dbt >> verify_schema
Next Steps
Python SDK
SDK reference and patterns
Freshness API
Freshness endpoint details
Lineage API
Explore data dependencies
Alerts
Set up freshness alerts
