Skip to main content
This guide walks you through building your first programmatic integration with AnomalyArmor. By the end, you’ll have a working data quality check that can run in your pipeline.

Prerequisites

  • An AnomalyArmor account with at least one connected data source
  • Python 3.9+ installed
  • An API key (create in Settings > API Keys)

Step 1: Install the SDK

pip install anomalyarmor-cli

Step 2: Configure Authentication

Store your API key securely. You have two options: Option A: Environment variable (recommended for CI/CD)
export ARMOR_API_KEY="aa_live_your_key_here"
Option B: Config file (for local development)
armor auth login
# Follow the prompts to enter your API key

Step 3: Verify Connection

Test that everything is working:
from anomalyarmor import Client

client = Client()

# List your data assets
assets = client.assets.list(limit=5)
for asset in assets:
    print(f"{asset.qualified_name} ({asset.asset_type})")
You should see a list of your connected tables and views.

Step 4: Check Data Freshness

The most common integration pattern is checking data freshness before running a pipeline. Here’s a complete example:
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def run_pipeline():
    client = Client()

    # Check that source data is fresh before processing
    try:
        client.freshness.require_fresh("snowflake.prod.warehouse.orders")
        print("Data is fresh, proceeding with pipeline...")
        # Your pipeline logic here
    except StalenessError as e:
        print(f"Pipeline aborted: data is {e.hours_since_update:.1f}h stale")
        raise

if __name__ == "__main__":
    run_pipeline()

Step 5: Add Data Quality Checks

Expand your integration with validity and referential integrity checks:
from anomalyarmor import Client

client = Client()
asset_id = "your-asset-uuid"  # Get from assets.list()

# Check freshness
summary = client.freshness.summary()
print(f"Overall freshness: {summary.fresh_percentage}%")

# Check validity rules
validity_summary = client.validity.summary(asset_id)
if validity_summary.failing > 0:
    print(f"Warning: {validity_summary.failing} validity rules failing")
    # List failing rules
    rules = client.validity.list(asset_id)
    for rule in rules:
        result = client.validity.check(asset_id, rule.uuid)
        if result.status == "fail":
            print(f"  - {rule.name}: {result.invalid_count} invalid records")

# Check referential integrity
ref_summary = client.referential.summary(asset_id)
if ref_summary.failing_checks > 0:
    print(f"Warning: {ref_summary.failing_checks} referential checks failing")

Step 6: Create a Quality Gate

Combine all checks into a single quality gate function:
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def quality_gate(asset_id: str, critical_tables: list[str]) -> bool:
    """
    Run all data quality checks before pipeline execution.
    Returns True if all checks pass, False otherwise.

    Note: list[str] type hint requires Python 3.9+. For earlier versions,
    use typing.List[str] instead.
    """
    client = Client()
    passed = True

    # 1. Check freshness of critical tables
    for table in critical_tables:
        try:
            client.freshness.require_fresh(table)
            print(f"[PASS] Freshness: {table}")
        except StalenessError as e:
            print(f"[FAIL] Freshness: {table} ({e.hours_since_update:.1f}h stale)")
            passed = False

    # 2. Check validity rules
    validity = client.validity.summary(asset_id)
    if validity.failing == 0:
        print(f"[PASS] Validity: {validity.total_rules} rules")
    else:
        print(f"[FAIL] Validity: {validity.failing}/{validity.total_rules} rules failing")
        passed = False

    # 3. Check referential integrity
    ref = client.referential.summary(asset_id)
    if ref.failing_checks == 0:
        print(f"[PASS] Referential: {ref.total_checks} checks")
    else:
        print(f"[FAIL] Referential: {ref.failing_checks}/{ref.total_checks} failing")
        passed = False

    return passed


# Usage
if __name__ == "__main__":
    asset = "your-asset-uuid"
    tables = [
        "snowflake.prod.warehouse.orders",
        "snowflake.prod.warehouse.customers",
    ]

    if quality_gate(asset, tables):
        print("\nAll quality checks passed! Running pipeline...")
        # run_pipeline()
    else:
        print("\nQuality checks failed. Pipeline aborted.")
        exit(1)

Step 7: CLI Integration

For shell scripts and CI/CD, use the CLI directly:
#!/bin/bash
set -e

# Check freshness (exits 1 if stale)
armor freshness check snowflake.prod.warehouse.orders

# If we get here, data is fresh
echo "Data quality checks passed!"
dbt run

Common Patterns

Pattern 1: Pre-ETL Validation

Run checks before ETL starts:
# At the start of your ETL
client.freshness.require_fresh("source_table")
client.validity.check(asset_id, "not_null_rule")
# Then run ETL...

Pattern 2: Post-ETL Validation

Verify output quality after ETL:
# After ETL completes
result = client.validity.check(asset_id, "uniqueness_rule")
if result.status == "fail":
    rollback_etl()
    alert_team(f"ETL produced {result.invalid_count} duplicate records")

Pattern 3: Continuous Monitoring

Schedule regular quality checks:
# In a scheduled job (e.g., Airflow, cron)
summary = client.metrics.summary(asset_id)
if summary.failing > 0:
    alert_team(f"{summary.failing} metrics failing")

Next Steps

dbt Integration

Add quality gates to dbt workflows

GitHub Actions

Run checks in CI/CD pipelines

Airflow Integration

Integrate with Apache Airflow

API Reference

Full API documentation