Skip to main content
Complete reference for all anomalyarmor-cli SDK classes and methods.

Client

The main entry point for the SDK.
from anomalyarmor import Client

client = Client(
    api_key="aa_live_xxx",  # Or use ARMOR_API_KEY env var
    api_url="https://app.anomalyarmor.ai/api/v1",  # Optional
    timeout=30,  # Request timeout in seconds
)

Constructor

ParameterTypeDefaultDescription
api_keystr | NoneNoneAPI key. Falls back to ARMOR_API_KEY env var
api_urlstr | NoneProduction URLBase URL for API requests
timeoutint | None30Request timeout in seconds

Context Manager

with Client() as client:
    assets = client.assets.list()
# Connection automatically closed

client.assets

Interact with data assets (tables, views, models).

assets.list()

List assets with optional filters.
assets = client.assets.list(
    source="snowflake",      # Filter by source
    asset_type="table",      # Filter by type
    search="orders",         # Search in names
    limit=50,                # Max results (default 50, max 100)
    offset=0,                # Skip N results
)
Returns: list[Asset]

assets.get()

Get a specific asset by ID or qualified name.
asset = client.assets.get("snowflake.prod.warehouse.orders")
# Or by UUID
asset = client.assets.get("550e8400-e29b-41d4-a716-446655440000")
Returns: Asset Raises: NotFoundError if asset doesn’t exist

client.freshness

Monitor data freshness.

freshness.summary()

Get aggregate freshness statistics.
summary = client.freshness.summary()
print(f"Fresh: {summary.fresh}/{summary.total_assets}")
print(f"Fresh rate: {summary.fresh_percentage}%")
Returns: FreshnessSummary

freshness.list()

List freshness status for all assets.
statuses = client.freshness.list(
    status="stale",     # Filter: "fresh", "stale", "unknown"
    limit=50,
    offset=0,
)
Returns: list[FreshnessStatus]

freshness.get()

Get freshness status for a specific asset.
status = client.freshness.get("snowflake.prod.warehouse.orders")
print(f"Fresh: {status.is_fresh}")
print(f"Last updated: {status.last_updated}")
print(f"Hours since update: {status.hours_since_update}")
Returns: FreshnessStatus

freshness.require_fresh()

Require an asset to be fresh, raising an error if stale. This is the recommended gate pattern for pipelines.
from anomalyarmor.exceptions import StalenessError

try:
    client.freshness.require_fresh(
        "snowflake.prod.warehouse.orders",
        max_age_hours=24,  # Optional custom threshold
    )
    print("Data is fresh!")
except StalenessError as e:
    print(f"Stale: {e.hours_since_update}h old")
    raise
Parameters:
  • asset_id (str): Asset qualified name or UUID
  • max_age_hours (float | None): Custom threshold. Uses asset’s configured threshold if not provided.
Returns: FreshnessStatus if fresh Raises: StalenessError if stale, NotFoundError if not found

freshness.refresh()

Trigger a freshness check.
result = client.freshness.refresh("snowflake.prod.warehouse.orders")
print(f"Job ID: {result['job_id']}")
Returns: dict with job_id, status, message Raises: NotFoundError, AuthorizationError (requires read-write scope)

client.schema

Monitor schema drift.

schema.summary()

Get schema drift summary statistics.
summary = client.schema.summary()
print(f"Changes last 24h: {summary.changes_last_24h}")
Returns: SchemaSummary

schema.changes()

List recent schema changes.
changes = client.schema.changes(
    asset_id="snowflake.prod.warehouse.orders",  # Optional filter
    change_type="column_added",  # Optional filter
    limit=50,
    offset=0,
)
for change in changes:
    print(f"{change.qualified_name}: {change.change_type}")
Returns: list[SchemaChange]

schema.refresh()

Trigger a schema check.
result = client.schema.refresh("snowflake.prod.warehouse.orders")
Returns: dict with job_id, status

client.lineage

Explore data dependencies.

lineage.list()

List assets with lineage information.
assets = client.lineage.list(
    source="snowflake",
    has_upstream=True,
    has_downstream=True,
    limit=50,
)
Returns: list[LineageAsset]

lineage.get()

Get lineage for a specific asset.
lineage = client.lineage.get(
    "snowflake.prod.warehouse.orders",
    direction="both",  # "upstream", "downstream", or "both"
    depth=2,           # Levels to traverse (1-5)
)

for upstream in lineage.upstream:
    print(f"<- {upstream.qualified_name}")

for downstream in lineage.downstream:
    print(f"-> {downstream.qualified_name}")
Returns: Lineage

client.tags

Manage data classification tags.

tags.list()

List tags for an asset.
tags = client.tags.list(
    asset="postgresql.analytics",  # Asset ID or qualified name
    category="business",           # Optional filter
)
for tag in tags:
    print(f"{tag.name} on {tag.object_path}")
Returns: list[Tag]

tags.create()

Create a tag on a database object.
tag = client.tags.create(
    asset="postgresql.analytics",
    name="pii_data",
    object_path="gold.customers",          # Required: schema.table or schema.table.column
    object_type="table",                   # "table" or "column" (default: "table")
    category="governance",                 # "business", "technical", "governance"
    description="Contains customer PII",   # Optional
)
Returns: Tag

tags.apply()

Apply multiple tags to multiple objects.
result = client.tags.apply(
    asset="postgresql.analytics",
    tag_names=["pii", "gdpr"],                           # Required
    object_paths=["gold.customers", "gold.orders"],      # Required
    category="governance",
)
print(f"Applied: {result.applied}, Failed: {result.failed}")
Returns: BulkApplyResult

client.intelligence

Query the AI knowledge base about your data.

intelligence.ask()

Ask a question about an asset’s data.
answer = client.intelligence.ask(
    asset="postgresql.analytics",
    question="What tables contain customer data?",
)
print(answer.answer)
print(f"Confidence: {answer.confidence}")
print(f"Sources: {answer.sources}")
Returns: IntelligenceAnswer Raises: NotFoundError if asset not found, ValidationError if intelligence not generated

intelligence.generate()

Generate AI intelligence for an asset (async job).
result = client.intelligence.generate(
    asset="postgresql.analytics",
)
print(f"Job ID: {result.job_id}")
print(f"Status: {result.status}")
Returns: dict with job_id, status
Requires schema discovery to be run first. Use the UI or API to discover schema before generating intelligence.

client.jobs

Monitor async job status.

jobs.status()

Get status of an async job.
status = client.jobs.status("job_abc123")
print(f"Status: {status.status}")
print(f"Progress: {status.progress}%")
if status.error:
    print(f"Error: {status.error}")
Returns: JobStatus

client.metrics

Monitor data quality metrics like row counts, null percentages, and more.

metrics.summary()

Get metrics summary for an asset.
summary = client.metrics.summary("asset-uuid")
print(f"Active: {summary.active_metrics}/{summary.total_metrics}")
print(f"Health: {summary.health_percentage}%")
Returns: MetricsSummary

metrics.list()

List metrics for an asset.
metrics = client.metrics.list(
    "asset-uuid",
    metric_type="null_percent",  # Optional filter
    is_active=True,              # Optional filter
    limit=50,
    offset=0,
)
for m in metrics:
    print(f"{m.table_path}.{m.column_name}: {m.metric_type}")
Returns: list[MetricDefinition]

metrics.get()

Get metric details with optional snapshots.
metric = client.metrics.get(
    "asset-uuid",
    "metric-uuid",
    include_snapshots=True,
    snapshot_limit=30,
)
print(f"Type: {metric.metric_type}")
print(f"Table: {metric.table_path}")
Returns: MetricDefinition

metrics.create()

Create a new metric. Requires read-write scope.
metric = client.metrics.create(
    "asset-uuid",
    metric_type="null_percent",
    table_path="catalog.schema.table",
    column_name="email",
    capture_interval="daily",
)
print(f"Created: {metric.id}")
Parameters:
  • asset_id (str): Asset UUID
  • metric_type (str): row_count, null_percent, distinct_count, etc.
  • table_path (str): Full table path
  • column_name (str | None): Column name for column metrics
  • capture_interval (str): hourly, daily, weekly (default: daily)
  • sensitivity (float): Anomaly detection sensitivity (default: 1.0)
Returns: MetricDefinition

metrics.update()

Update a metric. Requires read-write scope.
metric = client.metrics.update(
    "asset-uuid",
    "metric-uuid",
    is_active=False,
    sensitivity=2.0,
)
Returns: MetricDefinition

metrics.delete()

Delete a metric. Requires read-write scope.
client.metrics.delete("asset-uuid", "metric-uuid")

metrics.capture()

Trigger an immediate metric capture. Requires read-write scope.
result = client.metrics.capture("asset-uuid", "metric-uuid")
print(f"Captured {result.get('snapshot_count', 0)} snapshots")
Returns: dict with snapshot_count and snapshots

metrics.snapshots()

List historical snapshots for a metric.
snapshots = client.metrics.snapshots(
    "asset-uuid",
    "metric-uuid",
    limit=100,
)
for s in snapshots:
    print(f"{s.captured_at}: {s.value}")
Returns: list[MetricSnapshot]

client.validity

Define and enforce data validity rules.

validity.summary()

Get validity summary for an asset.
summary = client.validity.summary("asset-uuid")
print(f"Total rules: {summary.total_rules}")
print(f"Passing: {summary.passing}, Failing: {summary.failing}")
Returns: ValiditySummary

validity.list()

List validity rules for an asset.
rules = client.validity.list(
    "asset-uuid",
    rule_type="NOT_NULL",  # Optional filter
    is_active=True,        # Optional filter
    limit=50,
)
for r in rules:
    print(f"{r.column_name}: {r.rule_type}")
Returns: list[ValidityRule]

validity.get()

Get validity rule details.
rule = client.validity.get("asset-uuid", "rule-uuid")
print(f"Type: {rule.rule_type}")
print(f"Severity: {rule.severity}")
Returns: ValidityRule

validity.create()

Create a new validity rule. Requires read-write scope.
# NOT_NULL rule
rule = client.validity.create(
    "asset-uuid",
    rule_type="NOT_NULL",
    table_path="catalog.schema.table",
    column_name="email",
    severity="critical",
)

# REGEX rule
regex_rule = client.validity.create(
    "asset-uuid",
    rule_type="REGEX",
    table_path="catalog.schema.table",
    column_name="email",
    rule_config={"pattern": r"^[\w.-]+@[\w.-]+\.\w+$"},
)
Parameters:
  • asset_id (str): Asset UUID
  • rule_type (str): NOT_NULL, UNIQUE, REGEX, RANGE, ENUM, etc.
  • table_path (str): Full table path
  • column_name (str | None): Column name
  • rule_config (dict | None): Rule-specific configuration
  • severity (str): info, warning, critical (default: warning)
  • check_interval (str): hourly, daily, weekly (default: daily)
Returns: ValidityRule

validity.update()

Update a validity rule. Requires read-write scope.
rule = client.validity.update(
    "asset-uuid",
    "rule-uuid",
    severity="critical",
    is_active=True,
)
Returns: ValidityRule

validity.delete()

Delete a validity rule. Requires read-write scope.
client.validity.delete("asset-uuid", "rule-uuid")

validity.check()

Trigger an immediate validity check. Requires read-write scope.
result = client.validity.check(
    "asset-uuid",
    "rule-uuid",
    sample_limit=20,
)
if result.status == "fail":
    print(f"Invalid: {result.invalid_count} ({result.invalid_percent:.2f}%)")
Returns: ValidityCheckResult

validity.results()

List historical check results.
results = client.validity.results(
    "asset-uuid",
    "rule-uuid",
    limit=30,
)
for r in results:
    print(f"{r.checked_at}: {r.status}")
Returns: list[ValidityCheckResult]

client.referential

Monitor referential integrity between tables.

referential.summary()

Get referential summary for an asset.
summary = client.referential.summary("asset-uuid")
print(f"Total: {summary.total_checks}")
print(f"Passing: {summary.passing_checks}, Failing: {summary.failing_checks}")
Returns: ReferentialSummary

referential.list()

List referential checks for an asset.
checks = client.referential.list(
    "asset-uuid",
    is_active=True,
    limit=50,
)
for c in checks:
    print(f"{c.child_column_name} -> {c.parent_column_name}")
Returns: list[ReferentialCheck]

referential.get()

Get referential check details.
check = client.referential.get("asset-uuid", "check-uuid")
print(f"FK: {check.child_table_path}.{check.child_column_name}")
print(f"PK: {check.parent_table_path}.{check.parent_column_name}")
Returns: ReferentialCheck

referential.create()

Create a new referential check. Requires read-write scope.
check = client.referential.create(
    "asset-uuid",
    child_table_path="catalog.schema.orders",
    child_column_name="customer_id",
    parent_table_path="catalog.schema.customers",
    parent_column_name="id",
    name="Orders -> Customers FK",
    max_orphan_count=0,
)
Parameters:
  • asset_id (str): Asset UUID
  • child_table_path (str): Child table path (contains FK)
  • child_column_name (str): FK column name
  • parent_table_path (str): Parent table path (contains PK)
  • parent_column_name (str): PK column name
  • name (str | None): Check name
  • max_orphan_count (int | None): Alert threshold for orphan count
  • max_orphan_percent (float | None): Alert threshold for orphan %
Returns: ReferentialCheck

referential.update()

Update a referential check. Requires read-write scope.
check = client.referential.update(
    "asset-uuid",
    "check-uuid",
    max_orphan_percent=0.5,
    capture_interval="hourly",
)
Returns: ReferentialCheck

referential.delete()

Delete a referential check. Requires read-write scope.
client.referential.delete("asset-uuid", "check-uuid")

referential.execute()

Execute a referential check immediately. Requires read-write scope.
result = client.referential.execute("asset-uuid", "check-uuid")
if result.status == "fail":
    print(f"Orphans: {result.orphan_count} ({result.orphan_percent:.2f}%)")
Returns: ReferentialCheckResult

referential.results()

List historical check results.
results = client.referential.results(
    "asset-uuid",
    "check-uuid",
    limit=30,
)
for r in results:
    print(f"{r.created_at}: {r.status} ({r.orphan_count} orphans)")
Returns: list[ReferentialCheckResult]

client.alerts

Query alert history.

alerts.summary()

Get alert summary statistics.
summary = client.alerts.summary()
print(f"Triggered: {summary.triggered}")
print(f"Last 24h: {summary.triggered_last_24h}")
Returns: AlertsSummary

alerts.list()

List alerts with filters.
alerts = client.alerts.list(
    status="triggered",  # "triggered", "acknowledged", "resolved"
    alert_type="freshness",  # "freshness", "schema_drift", "row_count"
    asset_id="snowflake.prod.warehouse.orders",
    limit=50,
)
Returns: list[Alert]

alerts.rules()

List configured alert rules.
rules = client.alerts.rules()
for rule in rules:
    print(f"{rule.name}: {rule.alert_type}")
Returns: list[AlertRule]

client.api_keys

Manage API keys (requires admin scope).

api_keys.list()

List your organization’s API keys.
keys = client.api_keys.list(
    include_revoked=False,
    limit=50,
)
Returns: list[APIKey]

api_keys.create()

Create a new API key.
The full key is only returned once. Store it securely!
new_key = client.api_keys.create(
    name="Airflow Production",
    scope="read-only",  # "read-only", "read-write", "admin"
)
print(f"Key: {new_key.key}")  # Save this!
Returns: CreatedAPIKey (includes full key)

api_keys.get()

Get details of a specific key.
key = client.api_keys.get(key_id)
print(f"{key.name}: {key.scope}")
Returns: APIKey (without full key)

api_keys.revoke()

Revoke an API key. This cannot be undone.
client.api_keys.revoke(key_id)
Returns: APIKey

api_keys.usage()

Get API key usage and limits.
usage = client.api_keys.usage()
print(f"Keys: {usage['current_count']}/{usage['max_keys']}")
print(f"Rate limit: {usage['rate_limit_per_min']}/min")
Returns: dict

Models

Asset

class Asset:
    id: str
    qualified_name: str
    name: str
    asset_type: str  # "table", "view", "model"
    source: str
    database: str | None
    schema_name: str | None
    description: str | None
    row_count: int | None
    column_count: int | None
    tags: list[str]
    created_at: datetime
    updated_at: datetime

FreshnessStatus

class FreshnessStatus:
    asset_id: str
    qualified_name: str
    status: str  # "fresh", "stale", "unknown"
    is_fresh: bool
    is_stale: bool
    last_updated: datetime | None
    hours_since_update: float | None
    staleness_threshold_hours: float | None
    checked_at: datetime

APIKey

class APIKey:
    id: str
    name: str
    key_prefix: str  # e.g., "aa_live_abc1"
    key_suffix: str  # e.g., "xy9z"
    scope: str
    created_at: datetime
    last_used_at: datetime | None
    revoked_at: datetime | None

CreatedAPIKey

class CreatedAPIKey:
    id: str
    name: str
    key: str  # Full key - only shown once!
    scope: str
    created_at: datetime

Tag

class Tag:
    id: str
    name: str
    category: str  # "business", "technical", "governance"
    object_path: str | None  # e.g., "gold.customers"
    object_type: str | None  # "table" or "column"
    description: str | None
    created_at: datetime | None

BulkApplyResult

class BulkApplyResult:
    applied: int   # Number of tags successfully applied
    failed: int    # Number that failed
    total: int     # Total attempted

IntelligenceAnswer

class IntelligenceAnswer:
    answer: str           # AI-generated answer
    confidence: str       # "high", "medium", "low"
    sources: str          # Data sources used

JobStatus

class JobStatus:
    job_id: str
    status: str      # "pending", "running", "completed", "failed"
    progress: int    # 0-100
    error: str | None
    created_at: datetime
    completed_at: datetime | None

MetricsSummary

class MetricsSummary:
    total_metrics: int
    active_metrics: int
    total_checks: int
    passing: int
    failing: int
    warning: int
    error: int
    health_percentage: float

MetricDefinition

class MetricDefinition:
    id: str              # Public UUID
    internal_id: int
    asset_id: int
    table_path: str
    column_name: str | None
    metric_type: str     # row_count, null_percent, etc.
    capture_interval: str
    sensitivity: int
    is_active: bool
    created_at: datetime | None

MetricSnapshot

class MetricSnapshot:
    id: int
    metric_definition_id: int
    value: float
    captured_at: datetime
    is_anomaly: bool
    z_score: float | None
    status: str | None   # PASS, FAIL, WARNING

ValiditySummary

class ValiditySummary:
    total_rules: int
    passing: int
    failing: int
    error: int

ValidityRule

class ValidityRule:
    id: int
    uuid: str
    table_path: str
    column_name: str | None
    rule_type: str       # NOT_NULL, UNIQUE, REGEX, etc.
    rule_config: dict | None
    name: str | None
    severity: str        # info, warning, critical
    is_active: bool
    check_interval: str

ValidityCheckResult

class ValidityCheckResult:
    id: int
    validity_rule_id: int
    status: str          # pass, fail, error
    total_rows: int
    invalid_count: int
    invalid_percent: float
    invalid_samples: dict | None
    checked_at: datetime

ReferentialSummary

class ReferentialSummary:
    total_checks: int
    active_checks: int
    passing_checks: int
    failing_checks: int
    last_check_at: datetime | None

ReferentialCheck

class ReferentialCheck:
    id: str              # Public UUID
    internal_id: int
    asset_id: int
    child_table_path: str
    child_column_name: str
    parent_table_path: str
    parent_column_name: str
    name: str | None
    capture_interval: str
    max_orphan_count: int | None
    max_orphan_percent: float | None
    is_active: bool

ReferentialCheckResult

class ReferentialCheckResult:
    id: int
    referential_check_id: int
    status: str          # pass, fail, error
    orphan_count: int
    orphan_percent: float
    total_child_rows: int
    orphan_sample: list | None
    created_at: datetime