Integrate AnomalyArmor with dbt to add data quality gates before and after your transformations. This guide covers common patterns for ensuring data quality throughout your dbt workflow.
The shell() function is not a built-in dbt Jinja function. This pattern requires a custom macro
or package that provides shell execution capabilities. Consider using the wrapper script approach
(Pattern 4) for a simpler, more portable solution.
-- macros/armor_quality_gate.sql{% macro armor_freshness_check(table_name) %} {% set result = run_query("SELECT 1") %} {{ log("Checking freshness for " ~ table_name, info=True) }} {# Call CLI from dbt - requires custom shell() macro #} {% set check_result = shell("armor freshness check " ~ table_name ~ " 2>&1 || echo STALE") %} {% if "STALE" in check_result %} {{ exceptions.raise_compiler_error("Data quality check failed: " ~ table_name ~ " is stale") }} {% endif %}{% endmacro %}
A comprehensive wrapper that handles pre-checks, dbt run, and post-validation:
#!/usr/bin/env python# dbt_with_quality_gates.py"""Run dbt with AnomalyArmor quality gates.Usage: python dbt_with_quality_gates.py run python dbt_with_quality_gates.py run --select marts.*"""import argparseimport subprocessimport sysfrom anomalyarmor import Clientfrom anomalyarmor.exceptions import StalenessError# ConfigurationASSET_ID = "your-asset-uuid"SOURCE_TABLES = [ "snowflake.raw.stripe.payments", "snowflake.raw.crm.customers",]def pre_checks(client: Client) -> bool: """Run pre-dbt quality checks.""" print("\n=== Pre-dbt Quality Checks ===\n") # Check source freshness all_fresh = True for table in SOURCE_TABLES: try: status = client.freshness.require_fresh(table) print(f"[FRESH] {table} ({status.hours_since_update:.1f}h)") except StalenessError as e: print(f"[STALE] {table} ({e.hours_since_update:.1f}h)") all_fresh = False return all_freshdef run_dbt(args: list[str]) -> int: """Execute dbt with provided arguments.""" print("\n=== Running dbt ===\n") cmd = ["dbt"] + args result = subprocess.run(cmd) return result.returncodedef post_checks(client: Client) -> bool: """Run post-dbt quality checks.""" print("\n=== Post-dbt Quality Checks ===\n") passed = True # Validity checks validity = client.validity.summary(ASSET_ID) if validity.failing == 0: print(f"[PASS] Validity: {validity.total_rules} rules") else: print(f"[FAIL] Validity: {validity.failing}/{validity.total_rules} failing") passed = False # Referential integrity ref = client.referential.summary(ASSET_ID) if ref.failing_checks == 0: print(f"[PASS] Referential: {ref.total_checks} checks") else: print(f"[FAIL] Referential: {ref.failing_checks}/{ref.total_checks} failing") passed = False # Metrics anomalies metrics = client.metrics.summary(ASSET_ID) if metrics.failing == 0: print(f"[PASS] Metrics: {metrics.active_metrics} monitored") else: print(f"[WARN] Metrics: {metrics.failing} anomalies detected") # Don't fail on metric anomalies, just warn return passeddef main(): parser = argparse.ArgumentParser(description="Run dbt with quality gates") parser.add_argument("dbt_command", help="dbt command (run, build, test)") parser.add_argument("dbt_args", nargs="*", help="Additional dbt arguments") parser.add_argument("--skip-pre", action="store_true", help="Skip pre-checks") parser.add_argument("--skip-post", action="store_true", help="Skip post-checks") args = parser.parse_args() client = Client() dbt_args = [args.dbt_command] + args.dbt_args # Pre-checks if not args.skip_pre: if not pre_checks(client): print("\nPre-checks failed. Use --skip-pre to bypass.") sys.exit(1) # Run dbt dbt_exit_code = run_dbt(dbt_args) if dbt_exit_code != 0: print(f"\ndbt exited with code {dbt_exit_code}") sys.exit(dbt_exit_code) # Post-checks if not args.skip_post: if not post_checks(client): print("\nPost-checks failed. Review quality issues.") sys.exit(1) print("\n=== All checks passed! ===")if __name__ == "__main__": main()
Run it:
# Full run with all checkspython dbt_with_quality_gates.py run# Run specific modelspython dbt_with_quality_gates.py run --select marts.orders_mart# Skip pre-checks (for development)python dbt_with_quality_gates.py run --skip-pre
Upload your dbt manifest.json to populate data lineage in AnomalyArmor. This lets you visualize the full DAG, run impact analysis, and check upstream freshness before transformations.
# Generate the manifestdbt parse# Upload to AnomalyArmorcurl -X POST \ "https://api.anomalyarmor.ai/api/v1/assets/$ASSET_ID/lineage/upload" \ -H "Authorization: Bearer $ARMOR_API_KEY" \ -F "file=@target/manifest.json"
Or add it as a post-run step in your dbt wrapper:
from anomalyarmor import Clientclient = Client()def upload_lineage(asset_id: str, manifest_path: str = "target/manifest.json"): """Upload dbt manifest to sync lineage after a dbt run.""" with open(manifest_path, "rb") as f: result = client.lineage.upload(asset_id=asset_id, file=f) print(f"Lineage synced: {result.sync_stats['nodes_created']} nodes, " f"{result.sync_stats['edges_created']} edges")
For dbt Cloud users, sync lineage directly without file uploads:
result = client.lineage.sync_dbt_cloud( asset_id=ASSET_ID, account_id="12345", api_token="dbtc_your_token_here", job_id="67890",)
See the full Lineage Upload Guide for detailed setup, CI/CD integration, and manual lineage options.
# Allow small percentage of stale data in non-critical tablesclient.freshness.require_fresh( "analytics.events", max_age_hours=48, # More lenient for analytics)
# Critical checks should fail the pipelineif not critical_checks_pass(): sys.exit(1)# Non-critical checks can warn without failingif not advisory_checks_pass(): log_warning("Advisory checks failed, continuing anyway")
For faster feedback, use summary endpoints instead of individual checks:
# Fast: single API callsummary = client.validity.summary(asset_id)print(f"Failing: {summary.failing}")# Slow: N API callsfor rule in client.validity.list(asset_id): result = client.validity.check(asset_id, rule.uuid) # Avoid in loops