Complete reference for all anomalyarmor-cli SDK classes and methods.
Client
The main entry point for the SDK.
from anomalyarmor import Client
client = Client(
api_key="aa_live_xxx", # Or use ARMOR_API_KEY env var
api_url="https://app.anomalyarmor.ai/api/v1", # Optional
timeout=30, # Request timeout in seconds
)
Constructor
| Parameter | Type | Default | Description |
|---|
api_key | str | None | None | API key. Falls back to ARMOR_API_KEY env var |
api_url | str | None | Production URL | Base URL for API requests |
timeout | int | None | 30 | Request timeout in seconds |
Context Manager
with Client() as client:
assets = client.assets.list()
# Connection automatically closed
client.assets
Interact with data assets (tables, views, models).
assets.list()
List assets with optional filters.
assets = client.assets.list(
source="snowflake", # Filter by source
asset_type="table", # Filter by type
search="orders", # Search in names
limit=50, # Max results (default 50, max 100)
offset=0, # Skip N results
)
Returns: list[Asset]
assets.get()
Get a specific asset by ID or qualified name.
asset = client.assets.get("snowflake.prod.warehouse.orders")
# Or by UUID
asset = client.assets.get("550e8400-e29b-41d4-a716-446655440000")
Returns: Asset
Raises: NotFoundError if asset doesn’t exist
client.freshness
Monitor data freshness.
freshness.summary()
Get aggregate freshness statistics.
summary = client.freshness.summary()
print(f"Fresh: {summary.fresh}/{summary.total_assets}")
print(f"Fresh rate: {summary.fresh_percentage}%")
Returns: FreshnessSummary
freshness.list()
List freshness status for all assets.
statuses = client.freshness.list(
status="stale", # Filter: "fresh", "stale", "unknown"
limit=50,
offset=0,
)
Returns: list[FreshnessStatus]
freshness.get()
Get freshness status for a specific asset.
status = client.freshness.get("snowflake.prod.warehouse.orders")
print(f"Fresh: {status.is_fresh}")
print(f"Last updated: {status.last_updated}")
print(f"Hours since update: {status.hours_since_update}")
Returns: FreshnessStatus
freshness.require_fresh()
Require an asset to be fresh, raising an error if stale. This is the recommended gate pattern for pipelines.
from anomalyarmor.exceptions import StalenessError
try:
client.freshness.require_fresh(
"snowflake.prod.warehouse.orders",
max_age_hours=24, # Optional custom threshold
)
print("Data is fresh!")
except StalenessError as e:
print(f"Stale: {e.hours_since_update}h old")
raise
Parameters:
asset_id (str): Asset qualified name or UUID
max_age_hours (float | None): Custom threshold. Uses asset’s configured threshold if not provided.
Returns: FreshnessStatus if fresh
Raises: StalenessError if stale, NotFoundError if not found
freshness.refresh()
Trigger a freshness check.
result = client.freshness.refresh("snowflake.prod.warehouse.orders")
print(f"Job ID: {result['job_id']}")
Returns: dict with job_id, status, message
Raises: NotFoundError, AuthorizationError (requires read-write scope)
client.schema
Monitor schema drift.
schema.summary()
Get schema drift summary statistics.
summary = client.schema.summary()
print(f"Changes last 24h: {summary.changes_last_24h}")
Returns: SchemaSummary
schema.changes()
List recent schema changes.
changes = client.schema.changes(
asset_id="snowflake.prod.warehouse.orders", # Optional filter
change_type="column_added", # Optional filter
limit=50,
offset=0,
)
for change in changes:
print(f"{change.qualified_name}: {change.change_type}")
Returns: list[SchemaChange]
schema.refresh()
Trigger a schema check.
result = client.schema.refresh("snowflake.prod.warehouse.orders")
Returns: dict with job_id, status
client.lineage
Explore data dependencies.
lineage.list()
List assets with lineage information.
assets = client.lineage.list(
source="snowflake",
has_upstream=True,
has_downstream=True,
limit=50,
)
Returns: list[LineageAsset]
lineage.get()
Get lineage for a specific asset.
lineage = client.lineage.get(
"snowflake.prod.warehouse.orders",
direction="both", # "upstream", "downstream", or "both"
depth=2, # Levels to traverse (1-5)
)
for upstream in lineage.upstream:
print(f"<- {upstream.qualified_name}")
for downstream in lineage.downstream:
print(f"-> {downstream.qualified_name}")
Returns: Lineage
Manage data classification tags.
List tags for an asset.
tags = client.tags.list(
asset="postgresql.analytics", # Asset ID or qualified name
category="business", # Optional filter
)
for tag in tags:
print(f"{tag.name} on {tag.object_path}")
Returns: list[Tag]
Create a tag on a database object.
tag = client.tags.create(
asset="postgresql.analytics",
name="pii_data",
object_path="gold.customers", # Required: schema.table or schema.table.column
object_type="table", # "table" or "column" (default: "table")
category="governance", # "business", "technical", "governance"
description="Contains customer PII", # Optional
)
Returns: Tag
Apply multiple tags to multiple objects.
result = client.tags.apply(
asset="postgresql.analytics",
tag_names=["pii", "gdpr"], # Required
object_paths=["gold.customers", "gold.orders"], # Required
category="governance",
)
print(f"Applied: {result.applied}, Failed: {result.failed}")
Returns: BulkApplyResult
client.intelligence
Query the AI knowledge base about your data.
intelligence.ask()
Ask a question about an asset’s data.
answer = client.intelligence.ask(
asset="postgresql.analytics",
question="What tables contain customer data?",
)
print(answer.answer)
print(f"Confidence: {answer.confidence}")
print(f"Sources: {answer.sources}")
Returns: IntelligenceAnswer
Raises: NotFoundError if asset not found, ValidationError if intelligence not generated
intelligence.generate()
Generate AI intelligence for an asset (async job).
result = client.intelligence.generate(
asset="postgresql.analytics",
)
print(f"Job ID: {result.job_id}")
print(f"Status: {result.status}")
Returns: dict with job_id, status
Requires schema discovery to be run first. Use the UI or API to discover schema before generating intelligence.
client.jobs
Monitor async job status.
jobs.status()
Get status of an async job.
status = client.jobs.status("job_abc123")
print(f"Status: {status.status}")
print(f"Progress: {status.progress}%")
if status.error:
print(f"Error: {status.error}")
Returns: JobStatus
client.metrics
Monitor data quality metrics like row counts, null percentages, and more.
metrics.summary()
Get metrics summary for an asset.
summary = client.metrics.summary("asset-uuid")
print(f"Active: {summary.active_metrics}/{summary.total_metrics}")
print(f"Health: {summary.health_percentage}%")
Returns: MetricsSummary
metrics.list()
List metrics for an asset.
metrics = client.metrics.list(
"asset-uuid",
metric_type="null_percent", # Optional filter
is_active=True, # Optional filter
limit=50,
offset=0,
)
for m in metrics:
print(f"{m.table_path}.{m.column_name}: {m.metric_type}")
Returns: list[MetricDefinition]
metrics.get()
Get metric details with optional snapshots.
metric = client.metrics.get(
"asset-uuid",
"metric-uuid",
include_snapshots=True,
snapshot_limit=30,
)
print(f"Type: {metric.metric_type}")
print(f"Table: {metric.table_path}")
Returns: MetricDefinition
metrics.create()
Create a new metric. Requires read-write scope.
metric = client.metrics.create(
"asset-uuid",
metric_type="null_percent",
table_path="catalog.schema.table",
column_name="email",
capture_interval="daily",
)
print(f"Created: {metric.id}")
Parameters:
asset_id (str): Asset UUID
metric_type (str): row_count, null_percent, distinct_count, etc.
table_path (str): Full table path
column_name (str | None): Column name for column metrics
capture_interval (str): hourly, daily, weekly (default: daily)
sensitivity (float): Anomaly detection sensitivity (default: 1.0)
Returns: MetricDefinition
metrics.update()
Update a metric. Requires read-write scope.
metric = client.metrics.update(
"asset-uuid",
"metric-uuid",
is_active=False,
sensitivity=2.0,
)
Returns: MetricDefinition
metrics.delete()
Delete a metric. Requires read-write scope.
client.metrics.delete("asset-uuid", "metric-uuid")
metrics.capture()
Trigger an immediate metric capture. Requires read-write scope.
result = client.metrics.capture("asset-uuid", "metric-uuid")
print(f"Captured {result.get('snapshot_count', 0)} snapshots")
Returns: dict with snapshot_count and snapshots
metrics.snapshots()
List historical snapshots for a metric.
snapshots = client.metrics.snapshots(
"asset-uuid",
"metric-uuid",
limit=100,
)
for s in snapshots:
print(f"{s.captured_at}: {s.value}")
Returns: list[MetricSnapshot]
client.validity
Define and enforce data validity rules.
validity.summary()
Get validity summary for an asset.
summary = client.validity.summary("asset-uuid")
print(f"Total rules: {summary.total_rules}")
print(f"Passing: {summary.passing}, Failing: {summary.failing}")
Returns: ValiditySummary
validity.list()
List validity rules for an asset.
rules = client.validity.list(
"asset-uuid",
rule_type="NOT_NULL", # Optional filter
is_active=True, # Optional filter
limit=50,
)
for r in rules:
print(f"{r.column_name}: {r.rule_type}")
Returns: list[ValidityRule]
validity.get()
Get validity rule details.
rule = client.validity.get("asset-uuid", "rule-uuid")
print(f"Type: {rule.rule_type}")
print(f"Severity: {rule.severity}")
Returns: ValidityRule
validity.create()
Create a new validity rule. Requires read-write scope.
# NOT_NULL rule
rule = client.validity.create(
"asset-uuid",
rule_type="NOT_NULL",
table_path="catalog.schema.table",
column_name="email",
severity="critical",
)
# REGEX rule
regex_rule = client.validity.create(
"asset-uuid",
rule_type="REGEX",
table_path="catalog.schema.table",
column_name="email",
rule_config={"pattern": r"^[\w.-]+@[\w.-]+\.\w+$"},
)
Parameters:
asset_id (str): Asset UUID
rule_type (str): NOT_NULL, UNIQUE, REGEX, RANGE, ENUM, etc.
table_path (str): Full table path
column_name (str | None): Column name
rule_config (dict | None): Rule-specific configuration
severity (str): info, warning, critical (default: warning)
check_interval (str): hourly, daily, weekly (default: daily)
Returns: ValidityRule
validity.update()
Update a validity rule. Requires read-write scope.
rule = client.validity.update(
"asset-uuid",
"rule-uuid",
severity="critical",
is_active=True,
)
Returns: ValidityRule
validity.delete()
Delete a validity rule. Requires read-write scope.
client.validity.delete("asset-uuid", "rule-uuid")
validity.check()
Trigger an immediate validity check. Requires read-write scope.
result = client.validity.check(
"asset-uuid",
"rule-uuid",
sample_limit=20,
)
if result.status == "fail":
print(f"Invalid: {result.invalid_count} ({result.invalid_percent:.2f}%)")
Returns: ValidityCheckResult
validity.results()
List historical check results.
results = client.validity.results(
"asset-uuid",
"rule-uuid",
limit=30,
)
for r in results:
print(f"{r.checked_at}: {r.status}")
Returns: list[ValidityCheckResult]
client.referential
Monitor referential integrity between tables.
referential.summary()
Get referential summary for an asset.
summary = client.referential.summary("asset-uuid")
print(f"Total: {summary.total_checks}")
print(f"Passing: {summary.passing_checks}, Failing: {summary.failing_checks}")
Returns: ReferentialSummary
referential.list()
List referential checks for an asset.
checks = client.referential.list(
"asset-uuid",
is_active=True,
limit=50,
)
for c in checks:
print(f"{c.child_column_name} -> {c.parent_column_name}")
Returns: list[ReferentialCheck]
referential.get()
Get referential check details.
check = client.referential.get("asset-uuid", "check-uuid")
print(f"FK: {check.child_table_path}.{check.child_column_name}")
print(f"PK: {check.parent_table_path}.{check.parent_column_name}")
Returns: ReferentialCheck
referential.create()
Create a new referential check. Requires read-write scope.
check = client.referential.create(
"asset-uuid",
child_table_path="catalog.schema.orders",
child_column_name="customer_id",
parent_table_path="catalog.schema.customers",
parent_column_name="id",
name="Orders -> Customers FK",
max_orphan_count=0,
)
Parameters:
asset_id (str): Asset UUID
child_table_path (str): Child table path (contains FK)
child_column_name (str): FK column name
parent_table_path (str): Parent table path (contains PK)
parent_column_name (str): PK column name
name (str | None): Check name
max_orphan_count (int | None): Alert threshold for orphan count
max_orphan_percent (float | None): Alert threshold for orphan %
Returns: ReferentialCheck
referential.update()
Update a referential check. Requires read-write scope.
check = client.referential.update(
"asset-uuid",
"check-uuid",
max_orphan_percent=0.5,
capture_interval="hourly",
)
Returns: ReferentialCheck
referential.delete()
Delete a referential check. Requires read-write scope.
client.referential.delete("asset-uuid", "check-uuid")
referential.execute()
Execute a referential check immediately. Requires read-write scope.
result = client.referential.execute("asset-uuid", "check-uuid")
if result.status == "fail":
print(f"Orphans: {result.orphan_count} ({result.orphan_percent:.2f}%)")
Returns: ReferentialCheckResult
referential.results()
List historical check results.
results = client.referential.results(
"asset-uuid",
"check-uuid",
limit=30,
)
for r in results:
print(f"{r.created_at}: {r.status} ({r.orphan_count} orphans)")
Returns: list[ReferentialCheckResult]
client.alerts
Query alert history.
alerts.summary()
Get alert summary statistics.
summary = client.alerts.summary()
print(f"Triggered: {summary.triggered}")
print(f"Last 24h: {summary.triggered_last_24h}")
Returns: AlertsSummary
alerts.list()
List alerts with filters.
alerts = client.alerts.list(
status="triggered", # "triggered", "acknowledged", "resolved"
alert_type="freshness", # "freshness", "schema_drift", "row_count"
asset_id="snowflake.prod.warehouse.orders",
limit=50,
)
Returns: list[Alert]
alerts.rules()
List configured alert rules.
rules = client.alerts.rules()
for rule in rules:
print(f"{rule.name}: {rule.alert_type}")
Returns: list[AlertRule]
client.api_keys
Manage API keys (requires admin scope).
api_keys.list()
List your organization’s API keys.
keys = client.api_keys.list(
include_revoked=False,
limit=50,
)
Returns: list[APIKey]
api_keys.create()
Create a new API key.
The full key is only returned once. Store it securely!
new_key = client.api_keys.create(
name="Airflow Production",
scope="read-only", # "read-only", "read-write", "admin"
)
print(f"Key: {new_key.key}") # Save this!
Returns: CreatedAPIKey (includes full key)
api_keys.get()
Get details of a specific key.
key = client.api_keys.get(key_id)
print(f"{key.name}: {key.scope}")
Returns: APIKey (without full key)
api_keys.revoke()
Revoke an API key. This cannot be undone.
client.api_keys.revoke(key_id)
Returns: APIKey
api_keys.usage()
Get API key usage and limits.
usage = client.api_keys.usage()
print(f"Keys: {usage['current_count']}/{usage['max_keys']}")
print(f"Rate limit: {usage['rate_limit_per_min']}/min")
Returns: dict
Models
Asset
class Asset:
id: str
qualified_name: str
name: str
asset_type: str # "table", "view", "model"
source: str
database: str | None
schema_name: str | None
description: str | None
row_count: int | None
column_count: int | None
tags: list[str]
created_at: datetime
updated_at: datetime
FreshnessStatus
class FreshnessStatus:
asset_id: str
qualified_name: str
status: str # "fresh", "stale", "unknown"
is_fresh: bool
is_stale: bool
last_updated: datetime | None
hours_since_update: float | None
staleness_threshold_hours: float | None
checked_at: datetime
APIKey
class APIKey:
id: str
name: str
key_prefix: str # e.g., "aa_live_abc1"
key_suffix: str # e.g., "xy9z"
scope: str
created_at: datetime
last_used_at: datetime | None
revoked_at: datetime | None
CreatedAPIKey
class CreatedAPIKey:
id: str
name: str
key: str # Full key - only shown once!
scope: str
created_at: datetime
Tag
class Tag:
id: str
name: str
category: str # "business", "technical", "governance"
object_path: str | None # e.g., "gold.customers"
object_type: str | None # "table" or "column"
description: str | None
created_at: datetime | None
BulkApplyResult
class BulkApplyResult:
applied: int # Number of tags successfully applied
failed: int # Number that failed
total: int # Total attempted
IntelligenceAnswer
class IntelligenceAnswer:
answer: str # AI-generated answer
confidence: str # "high", "medium", "low"
sources: str # Data sources used
JobStatus
class JobStatus:
job_id: str
status: str # "pending", "running", "completed", "failed"
progress: int # 0-100
error: str | None
created_at: datetime
completed_at: datetime | None
MetricsSummary
class MetricsSummary:
total_metrics: int
active_metrics: int
total_checks: int
passing: int
failing: int
warning: int
error: int
health_percentage: float
MetricDefinition
class MetricDefinition:
id: str # Public UUID
internal_id: int
asset_id: int
table_path: str
column_name: str | None
metric_type: str # row_count, null_percent, etc.
capture_interval: str
sensitivity: int
is_active: bool
created_at: datetime | None
MetricSnapshot
class MetricSnapshot:
id: int
metric_definition_id: int
value: float
captured_at: datetime
is_anomaly: bool
z_score: float | None
status: str | None # PASS, FAIL, WARNING
ValiditySummary
class ValiditySummary:
total_rules: int
passing: int
failing: int
error: int
ValidityRule
class ValidityRule:
id: int
uuid: str
table_path: str
column_name: str | None
rule_type: str # NOT_NULL, UNIQUE, REGEX, etc.
rule_config: dict | None
name: str | None
severity: str # info, warning, critical
is_active: bool
check_interval: str
ValidityCheckResult
class ValidityCheckResult:
id: int
validity_rule_id: int
status: str # pass, fail, error
total_rows: int
invalid_count: int
invalid_percent: float
invalid_samples: dict | None
checked_at: datetime
ReferentialSummary
class ReferentialSummary:
total_checks: int
active_checks: int
passing_checks: int
failing_checks: int
last_check_at: datetime | None
ReferentialCheck
class ReferentialCheck:
id: str # Public UUID
internal_id: int
asset_id: int
child_table_path: str
child_column_name: str
parent_table_path: str
parent_column_name: str
name: str | None
capture_interval: str
max_orphan_count: int | None
max_orphan_percent: float | None
is_active: bool
ReferentialCheckResult
class ReferentialCheckResult:
id: int
referential_check_id: int
status: str # pass, fail, error
orphan_count: int
orphan_percent: float
total_child_rows: int
orphan_sample: list | None
created_at: datetime