Skip to main content
The anomalyarmor-cli package provides a Python SDK and CLI for programmatic access to AnomalyArmor.

Installation

pip install anomalyarmor-cli
Requires Python 3.9 or higher.

Quick Start

from anomalyarmor import Client

# Initialize - uses ARMOR_API_KEY env var
client = Client()

# Or pass key directly
client = Client(api_key="aa_live_xxx")

# List assets
assets = client.assets.list(source="snowflake", limit=10)
for asset in assets:
    print(asset.qualified_name)

# Check freshness (raises StalenessError if stale)
client.freshness.require_fresh("snowflake.prod.warehouse.orders")

Configuration

Environment Variables

VariableDescription
ARMOR_API_KEYYour API key (recommended)
ARMOR_API_URLCustom API URL (optional)
export ARMOR_API_KEY="aa_live_your_key_here"

Client Options

from anomalyarmor import Client

client = Client(
    api_key="aa_live_xxx",      # Or use ARMOR_API_KEY env var
    api_url="https://...",       # Custom API URL (optional)
    timeout=30,                  # Request timeout in seconds
)

Resources

The client provides access to all AnomalyArmor resources:
client.assets      # Data assets
client.freshness   # Freshness monitoring
client.schema      # Schema drift detection
client.lineage     # Data lineage
client.alerts      # Alert history
client.metrics     # Data quality metrics
client.api_keys    # API key management (admin scope)

Common Patterns

Airflow Pre-flight Check

Gate your pipeline on data freshness:
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def check_upstream_freshness():
    client = Client()

    try:
        # Raises StalenessError if stale
        client.freshness.require_fresh("snowflake.prod.warehouse.orders")
        print("Data is fresh, proceeding...")
    except StalenessError as e:
        print(f"Data is stale: {e}")
        raise  # Fail the task

List and Filter Assets

from anomalyarmor import Client

client = Client()

# All Snowflake tables
tables = client.assets.list(source="snowflake", type="table")

# Paginate through all assets
offset = 0
while True:
    assets = client.assets.list(limit=100, offset=offset)
    if not assets:
        break
    for asset in assets:
        process(asset)
    offset += 100

Trigger and Wait for Refresh

from anomalyarmor import Client

client = Client()

# Trigger freshness check and wait for completion
result = client.freshness.refresh(
    "snowflake.prod.warehouse.orders",
    wait=True  # Block until complete
)
print(f"Job {result.job_id}: {result.status}")

Check Lineage Before Running

from anomalyarmor import Client

client = Client()

# Get upstream dependencies
lineage = client.lineage.get("snowflake.prod.warehouse.orders")

# Check all upstream sources are fresh
for upstream in lineage.upstream:
    client.freshness.require_fresh(upstream.qualified_name)

print("All upstream sources are fresh!")

Monitor Data Quality Metrics

from anomalyarmor import Client

client = Client()
asset_id = "550e8400-e29b-41d4-a716-446655440000"

# Create a row count metric with anomaly detection
metric = client.metrics.create(
    asset_id,
    metric_type="row_count",
    table_path="snowflake.prod.warehouse.orders",
    capture_interval="daily",
    sensitivity=2.0,  # Alert on 2+ standard deviations
)

# Get metrics summary
summary = client.metrics.summary(asset_id)
print(f"Health: {summary.health_percentage}%")

# Check for recent anomalies
snapshots = client.metrics.snapshots(asset_id, metric.id, limit=7)
anomalies = [s for s in snapshots if s.is_anomaly]
if anomalies:
    print(f"Found {len(anomalies)} anomalies in the last 7 days")

Exception Handling

from anomalyarmor import Client
from anomalyarmor.exceptions import (
    StalenessError,      # Data is stale
    AuthenticationError, # Invalid/missing API key
    NotFoundError,       # Asset not found
    RateLimitError,      # Rate limit exceeded
    ValidationError,     # Invalid parameters
    ServerError,         # Server error
    ArmorError,          # Base exception
)

client = Client()

try:
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")
except StalenessError as e:
    print(f"Data is stale: last updated {e.last_updated}")
except AuthenticationError:
    print("Invalid API key")
except RateLimitError as e:
    print(f"Rate limited, retry after {e.retry_after} seconds")
except ArmorError as e:
    print(f"API error: {e}")

Context Manager

The client supports context manager for automatic cleanup:
from anomalyarmor import Client

with Client() as client:
    assets = client.assets.list()
    # Connection automatically closed

Type Hints

The SDK is fully typed for IDE support:
from anomalyarmor import Client
from anomalyarmor.models import Asset, FreshnessStatus

client = Client()

asset: Asset = client.assets.get("snowflake.prod.warehouse.orders")
status: FreshnessStatus = client.freshness.get(asset.qualified_name)

print(status.is_fresh)  # IDE knows this is bool
print(status.last_updated)  # IDE knows this is datetime

Next Steps

SDK Reference

Complete method reference

CLI Guide

Command-line interface

Airflow Integration

Use in Airflow DAGs

API Reference

REST API documentation