Skip to main content
Integrate AnomalyArmor with dbt to add data quality gates before and after your transformations. This guide covers common patterns for ensuring data quality throughout your dbt workflow.

Prerequisites

  • AnomalyArmor account with connected data source
  • dbt project configured
  • Python 3.9+ (for SDK) or shell access (for CLI)
  • API key with read-only scope (or read-write for triggering checks)

Installation

pip install anomalyarmor-cli dbt-core
Configure your API key:
export ARMOR_API_KEY="aa_live_your_key_here"

Pattern 1: Pre-run Quality Gate

Check data quality before running dbt:

Using CLI (Shell Script)

#!/bin/bash
# pre_dbt_check.sh
set -e

echo "Running pre-dbt quality checks..."

# Check freshness of source tables
armor freshness check snowflake.raw.stripe.payments || {
    echo "Source data is stale. Aborting dbt run."
    exit 1
}

armor freshness check snowflake.raw.crm.customers || {
    echo "Source data is stale. Aborting dbt run."
    exit 1
}

echo "All source tables are fresh. Starting dbt..."
dbt run

Using Python

# pre_dbt_check.py
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
import subprocess
import sys

def check_sources_and_run_dbt():
    client = Client()

    sources = [
        "snowflake.raw.stripe.payments",
        "snowflake.raw.crm.customers",
        "snowflake.raw.shopify.orders",
    ]

    print("Checking source freshness...")
    for source in sources:
        try:
            status = client.freshness.require_fresh(source)
            print(f"  [OK] {source} ({status.hours_since_update:.1f}h old)")
        except StalenessError as e:
            print(f"  [STALE] {source} ({e.hours_since_update:.1f}h old)")
            print("\nAborting: Source data is stale.")
            sys.exit(1)

    print("\nAll sources fresh. Running dbt...")
    result = subprocess.run(["dbt", "run"], check=True)
    return result.returncode

if __name__ == "__main__":
    check_sources_and_run_dbt()

Pattern 2: Post-run Validation

Validate output quality after dbt completes:
# post_dbt_validate.py
from anomalyarmor import Client

def validate_dbt_outputs():
    client = Client()
    asset_id = "your-asset-uuid"

    print("Validating dbt outputs...")

    # Check validity rules on transformed tables
    validity = client.validity.summary(asset_id)
    if validity.failing > 0:
        print(f"[WARN] {validity.failing} validity rules failing")

        # Get details of failing rules
        rules = client.validity.list(asset_id)
        for rule in rules:
            result = client.validity.check(asset_id, rule.uuid)
            if result.status == "fail":
                print(f"  - {rule.name} on {rule.column_name}")
                print(f"    Invalid: {result.invalid_count} ({result.invalid_percent:.2f}%)")

        return False
    else:
        print(f"[OK] All {validity.total_rules} validity rules passing")

    # Check referential integrity
    ref = client.referential.summary(asset_id)
    if ref.failing_checks > 0:
        print(f"[WARN] {ref.failing_checks} referential checks failing")
        return False
    else:
        print(f"[OK] All {ref.total_checks} referential checks passing")

    return True

if __name__ == "__main__":
    if not validate_dbt_outputs():
        print("\nPost-dbt validation failed!")
        exit(1)
    print("\nAll validations passed!")

Pattern 3: dbt run-operation Hook

Create a dbt macro that calls AnomalyArmor:
The shell() function is not a built-in dbt Jinja function. This pattern requires a custom macro or package that provides shell execution capabilities. Consider using the wrapper script approach (Pattern 4) for a simpler, more portable solution.
-- macros/armor_quality_gate.sql
{% macro armor_freshness_check(table_name) %}
    {% set result = run_query("SELECT 1") %}
    {{ log("Checking freshness for " ~ table_name, info=True) }}

    {# Call CLI from dbt - requires custom shell() macro #}
    {% set check_result = shell("armor freshness check " ~ table_name ~ " 2>&1 || echo STALE") %}

    {% if "STALE" in check_result %}
        {{ exceptions.raise_compiler_error("Data quality check failed: " ~ table_name ~ " is stale") }}
    {% endif %}
{% endmacro %}
Then in your model:
-- models/marts/orders_mart.sql
{{ config(pre_hook=armor_freshness_check('snowflake.raw.stripe.payments')) }}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    o.total_amount
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('stg_customers') }} c ON o.customer_id = c.customer_id

Pattern 4: Full dbt Wrapper Script

A comprehensive wrapper that handles pre-checks, dbt run, and post-validation:
#!/usr/bin/env python
# dbt_with_quality_gates.py
"""
Run dbt with AnomalyArmor quality gates.

Usage:
    python dbt_with_quality_gates.py run
    python dbt_with_quality_gates.py run --select marts.*
"""

import argparse
import subprocess
import sys
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

# Configuration
ASSET_ID = "your-asset-uuid"
SOURCE_TABLES = [
    "snowflake.raw.stripe.payments",
    "snowflake.raw.crm.customers",
]

def pre_checks(client: Client) -> bool:
    """Run pre-dbt quality checks."""
    print("\n=== Pre-dbt Quality Checks ===\n")

    # Check source freshness
    all_fresh = True
    for table in SOURCE_TABLES:
        try:
            status = client.freshness.require_fresh(table)
            print(f"[FRESH] {table} ({status.hours_since_update:.1f}h)")
        except StalenessError as e:
            print(f"[STALE] {table} ({e.hours_since_update:.1f}h)")
            all_fresh = False

    return all_fresh

def run_dbt(args: list[str]) -> int:
    """Execute dbt with provided arguments."""
    print("\n=== Running dbt ===\n")
    cmd = ["dbt"] + args
    result = subprocess.run(cmd)
    return result.returncode

def post_checks(client: Client) -> bool:
    """Run post-dbt quality checks."""
    print("\n=== Post-dbt Quality Checks ===\n")

    passed = True

    # Validity checks
    validity = client.validity.summary(ASSET_ID)
    if validity.failing == 0:
        print(f"[PASS] Validity: {validity.total_rules} rules")
    else:
        print(f"[FAIL] Validity: {validity.failing}/{validity.total_rules} failing")
        passed = False

    # Referential integrity
    ref = client.referential.summary(ASSET_ID)
    if ref.failing_checks == 0:
        print(f"[PASS] Referential: {ref.total_checks} checks")
    else:
        print(f"[FAIL] Referential: {ref.failing_checks}/{ref.total_checks} failing")
        passed = False

    # Metrics anomalies
    metrics = client.metrics.summary(ASSET_ID)
    if metrics.failing == 0:
        print(f"[PASS] Metrics: {metrics.active_metrics} monitored")
    else:
        print(f"[WARN] Metrics: {metrics.failing} anomalies detected")
        # Don't fail on metric anomalies, just warn

    return passed

def main():
    parser = argparse.ArgumentParser(description="Run dbt with quality gates")
    parser.add_argument("dbt_command", help="dbt command (run, build, test)")
    parser.add_argument("dbt_args", nargs="*", help="Additional dbt arguments")
    parser.add_argument("--skip-pre", action="store_true", help="Skip pre-checks")
    parser.add_argument("--skip-post", action="store_true", help="Skip post-checks")
    args = parser.parse_args()

    client = Client()
    dbt_args = [args.dbt_command] + args.dbt_args

    # Pre-checks
    if not args.skip_pre:
        if not pre_checks(client):
            print("\nPre-checks failed. Use --skip-pre to bypass.")
            sys.exit(1)

    # Run dbt
    dbt_exit_code = run_dbt(dbt_args)
    if dbt_exit_code != 0:
        print(f"\ndbt exited with code {dbt_exit_code}")
        sys.exit(dbt_exit_code)

    # Post-checks
    if not args.skip_post:
        if not post_checks(client):
            print("\nPost-checks failed. Review quality issues.")
            sys.exit(1)

    print("\n=== All checks passed! ===")

if __name__ == "__main__":
    main()
Run it:
# Full run with all checks
python dbt_with_quality_gates.py run

# Run specific models
python dbt_with_quality_gates.py run --select marts.orders_mart

# Skip pre-checks (for development)
python dbt_with_quality_gates.py run --skip-pre

Pattern 5: dbt Cloud Webhook Integration

For dbt Cloud, use webhooks to trigger AnomalyArmor checks:
# webhook_handler.py (Flask example)
from flask import Flask, request, jsonify
from anomalyarmor import Client

app = Flask(__name__)

@app.route("/dbt-webhook", methods=["POST"])
def handle_dbt_webhook():
    payload = request.json
    event_type = payload.get("eventType")
    run_status = payload.get("data", {}).get("runStatus")

    if event_type == "run.completed" and run_status == "Success":
        # dbt run completed successfully, run post-checks
        client = Client()
        asset_id = "your-asset-uuid"

        validity = client.validity.summary(asset_id)
        ref = client.referential.summary(asset_id)

        if validity.failing > 0 or ref.failing_checks > 0:
            # Alert your team via Slack, PagerDuty, etc.
            send_alert(f"dbt run completed but quality issues found")

    return jsonify({"status": "ok"})

Pattern 6: Upload Lineage from dbt

Upload your dbt manifest.json to populate data lineage in AnomalyArmor. This lets you visualize the full DAG, run impact analysis, and check upstream freshness before transformations.
# Generate the manifest
dbt parse

# Upload to AnomalyArmor
curl -X POST \
  "https://api.anomalyarmor.ai/api/v1/assets/$ASSET_ID/lineage/upload" \
  -H "Authorization: Bearer $ARMOR_API_KEY" \
  -F "file=@target/manifest.json"
Or add it as a post-run step in your dbt wrapper:
from anomalyarmor import Client

client = Client()

def upload_lineage(asset_id: str, manifest_path: str = "target/manifest.json"):
    """Upload dbt manifest to sync lineage after a dbt run."""
    with open(manifest_path, "rb") as f:
        result = client.lineage.upload(asset_id=asset_id, file=f)
    print(f"Lineage synced: {result.sync_stats['nodes_created']} nodes, "
          f"{result.sync_stats['edges_created']} edges")
For dbt Cloud users, sync lineage directly without file uploads:
result = client.lineage.sync_dbt_cloud(
    asset_id=ASSET_ID,
    account_id="12345",
    api_token="dbtc_your_token_here",
    job_id="67890",
)
See the full Lineage Upload Guide for detailed setup, CI/CD integration, and manual lineage options.

Best Practices

1. Scope Your Checks

Don’t check everything. Focus on critical paths:
# Check only critical source tables
CRITICAL_SOURCES = [
    "production.stripe.payments",  # Revenue-critical
    "production.core.users",       # Identity-critical
]

2. Set Appropriate Thresholds

Configure checks with realistic thresholds:
# Allow small percentage of stale data in non-critical tables
client.freshness.require_fresh(
    "analytics.events",
    max_age_hours=48,  # More lenient for analytics
)

3. Fail Fast, But Not Always

# Critical checks should fail the pipeline
if not critical_checks_pass():
    sys.exit(1)

# Non-critical checks can warn without failing
if not advisory_checks_pass():
    log_warning("Advisory checks failed, continuing anyway")

4. Cache API Calls

For large dbt projects, minimize API calls:
# Get summary once, not per-model
validity_summary = client.validity.summary(asset_id)
# Then use summary.failing, summary.passing, etc.

Troubleshooting

”Asset not found” errors

Ensure your table names match AnomalyArmor’s qualified names exactly:
# List your assets to see exact names
armor assets list --source snowflake

Slow checks

For faster feedback, use summary endpoints instead of individual checks:
# Fast: single API call
summary = client.validity.summary(asset_id)
print(f"Failing: {summary.failing}")

# Slow: N API calls
for rule in client.validity.list(asset_id):
    result = client.validity.check(asset_id, rule.uuid)  # Avoid in loops

Next Steps

GitHub Actions

Run checks in CI/CD

Airflow Integration

Orchestrate with Airflow

Validity API

Full validity API reference

Metrics API

Track data metrics