from anomalyarmor import Clientclient = Client( api_key="aa_live_xxx", # Or use ARMOR_API_KEY env var api_url="https://...", # Custom API URL (optional) timeout=30, # Request timeout in seconds)
from anomalyarmor import Clientfrom anomalyarmor.exceptions import StalenessErrordef check_upstream_freshness(): client = Client() try: # Raises StalenessError if stale client.freshness.require_fresh("snowflake.prod.warehouse.orders") print("Data is fresh, proceeding...") except StalenessError as e: print(f"Data is stale: {e}") raise # Fail the task
from anomalyarmor import Clientclient = Client()# All Snowflake tablestables = client.assets.list(source="snowflake", type="table")# Paginate through all assetsoffset = 0while True: assets = client.assets.list(limit=100, offset=offset) if not assets: break for asset in assets: process(asset) offset += 100
from anomalyarmor import Clientclient = Client()# Get upstream dependencieslineage = client.lineage.get("snowflake.prod.warehouse.orders")# Check all upstream sources are freshfor upstream in lineage.upstream: client.freshness.require_fresh(upstream.qualified_name)print("All upstream sources are fresh!")
from anomalyarmor import Clientclient = Client()asset_id = "550e8400-e29b-41d4-a716-446655440000"# Create a row count metric with anomaly detectionmetric = client.metrics.create( asset_id, metric_type="row_count", table_path="snowflake.prod.warehouse.orders", capture_interval="daily", sensitivity=2.0, # Alert on 2+ standard deviations)# Get metrics summarysummary = client.metrics.summary(asset_id)print(f"Health: {summary.health_percentage}%")# Check for recent anomaliessnapshots = client.metrics.snapshots(asset_id, metric.id, limit=7)anomalies = [s for s in snapshots if s.is_anomaly]if anomalies: print(f"Found {len(anomalies)} anomalies in the last 7 days")
from anomalyarmor import Clientfrom anomalyarmor.exceptions import ( StalenessError, # Data is stale AuthenticationError, # Invalid/missing API key NotFoundError, # Asset not found RateLimitError, # Rate limit exceeded ValidationError, # Invalid parameters ServerError, # Server error ArmorError, # Base exception)client = Client()try: client.freshness.require_fresh("snowflake.prod.warehouse.orders")except StalenessError as e: print(f"Data is stale: last updated {e.last_updated}")except AuthenticationError: print("Invalid API key")except RateLimitError as e: print(f"Rate limited, retry after {e.retry_after} seconds")except ArmorError as e: print(f"API error: {e}")
from anomalyarmor import Clientfrom anomalyarmor.models import Asset, FreshnessStatusclient = Client()asset: Asset = client.assets.get("snowflake.prod.warehouse.orders")status: FreshnessStatus = client.freshness.get(asset.qualified_name)print(status.is_fresh) # IDE knows this is boolprint(status.last_updated) # IDE knows this is datetime