For LLM agents: documentation index at
/llms.txt, full text at
/llms-full.txt. Append .md to any page URL for plain markdown.
Complete reference for all anomalyarmor-cli SDK classes and methods.
Client
The main entry point for the SDK.
from anomalyarmor import Client
client = Client(
api_key="aa_live_xxx", # Or use ARMOR_API_KEY env var
api_url="https://app.anomalyarmor.ai/api/v1", # Optional
timeout=30, # Request timeout in seconds
)
Constructor
| Parameter | Type | Default | Description |
|---|
api_key | str | None | None | API key. Falls back to ARMOR_API_KEY env var |
api_url | str | None | Production URL | Base URL for API requests |
timeout | int | None | 30 | Request timeout in seconds |
Context Manager
with Client() as client:
assets = client.assets.list()
# Connection automatically closed
client.assets
Interact with data assets (tables, views, models).
assets.list()
List assets with optional filters.
assets = client.assets.list(
source="snowflake", # Filter by source
asset_type="table", # Filter by type
search="orders", # Search in names
limit=50, # Max results (default 50, max 100)
offset=0, # Skip N results
)
Returns: list[Asset]
assets.get()
Get a specific asset by ID or qualified name.
asset = client.assets.get("snowflake.prod.warehouse.orders")
# Or by UUID
asset = client.assets.get("550e8400-e29b-41d4-a716-446655440000")
Returns: Asset
Raises: NotFoundError if asset doesn’t exist
client.freshness
Monitor data freshness.
freshness.summary()
Get aggregate freshness statistics.
summary = client.freshness.summary()
print(f"Fresh: {summary.fresh}/{summary.total_assets}")
print(f"Fresh rate: {summary.fresh_percentage}%")
Returns: FreshnessSummary
freshness.list()
List freshness status for all assets.
statuses = client.freshness.list(
status="stale", # Filter: "fresh", "stale", "unknown"
limit=50,
offset=0,
)
Returns: list[FreshnessStatus]
freshness.get()
Get freshness status for a specific asset.
status = client.freshness.get("snowflake.prod.warehouse.orders")
print(f"Fresh: {status.is_fresh}")
print(f"Last updated: {status.last_updated}")
print(f"Hours since update: {status.hours_since_update}")
Returns: FreshnessStatus
freshness.require_fresh()
Require an asset to be fresh, raising an error if stale. This is the recommended gate pattern for pipelines.
from anomalyarmor.exceptions import StalenessError
try:
client.freshness.require_fresh(
"snowflake.prod.warehouse.orders",
max_age_hours=24, # Optional custom threshold
)
print("Data is fresh!")
except StalenessError as e:
print(f"Stale: {e.hours_since_update}h old")
raise
Parameters:
asset_id (str): Asset qualified name or UUID
max_age_hours (float | None): Custom threshold. Uses asset’s configured threshold if not provided.
Returns: FreshnessStatus if fresh
Raises: StalenessError if stale, NotFoundError if not found
freshness.refresh()
Trigger a freshness check.
result = client.freshness.refresh("snowflake.prod.warehouse.orders")
print(f"Job ID: {result['job_id']}")
Returns: dict with job_id, status, message
Raises: NotFoundError, AuthorizationError (requires read-write scope)
client.schema
Monitor schema drift.
schema.summary()
Get schema drift summary statistics.
summary = client.schema.summary()
print(f"Changes last 24h: {summary.changes_last_24h}")
Returns: SchemaSummary
schema.changes()
List recent schema changes.
changes = client.schema.changes(
asset_id="snowflake.prod.warehouse.orders", # Optional filter
change_type="column_added", # Optional filter
limit=50,
offset=0,
)
for change in changes:
print(f"{change.qualified_name}: {change.change_type}")
Returns: list[SchemaChange]
schema.refresh()
Trigger a schema check.
result = client.schema.refresh("snowflake.prod.warehouse.orders")
Returns: dict with job_id, status
client.lineage
Explore data dependencies.
lineage.list()
List assets with lineage information.
assets = client.lineage.list(
source="snowflake",
has_upstream=True,
has_downstream=True,
limit=50,
)
Returns: list[LineageAsset]
lineage.get()
Get lineage for a specific asset.
lineage = client.lineage.get(
"snowflake.prod.warehouse.orders",
direction="both", # "upstream", "downstream", or "both"
depth=2, # Levels to traverse (1-5)
)
for upstream in lineage.upstream:
print(f"<- {upstream.qualified_name}")
for downstream in lineage.downstream:
print(f"-> {downstream.qualified_name}")
Returns: Lineage
Manage data classification tags.
List tags for an asset.
tags = client.tags.list(
asset="postgresql.analytics", # Asset ID or qualified name
category="business", # Optional filter
)
for tag in tags:
print(f"{tag.name} on {tag.object_path}")
Returns: list[Tag]
Create a tag on a database object.
tag = client.tags.create(
asset="postgresql.analytics",
name="pii_data",
object_path="gold.customers", # Required: schema.table or schema.table.column
object_type="table", # "table" or "column" (default: "table")
category="governance", # "business", "technical", "governance"
description="Contains customer PII", # Optional
)
Returns: Tag
Apply multiple tags to multiple objects.
result = client.tags.apply(
asset="postgresql.analytics",
tag_names=["pii", "gdpr"], # Required
object_paths=["gold.customers", "gold.orders"], # Required
category="governance",
)
print(f"Applied: {result.applied}, Failed: {result.failed}")
Returns: BulkApplyResult
client.intelligence
Query the AI knowledge base about your data.
intelligence.ask()
Ask a question about an asset’s data.
answer = client.intelligence.ask(
asset="postgresql.analytics",
question="What tables contain customer data?",
)
print(answer.answer)
print(f"Confidence: {answer.confidence}")
print(f"Sources: {answer.sources}")
Returns: IntelligenceAnswer
Raises: NotFoundError if asset not found, ValidationError if intelligence not generated
intelligence.generate()
Generate AI intelligence for an asset (async job).
result = client.intelligence.generate(
asset="postgresql.analytics",
)
print(f"Job ID: {result.job_id}")
print(f"Status: {result.status}")
Returns: dict with job_id, status
Requires asset discovery to be run first. Use the UI or API to discover schema before generating intelligence.
client.investigations
Run structured root-cause investigations against an asset and return a citable EvidenceCapsule. Each piece of evidence links back to the underlying alert, schema event, metric, or lineage edge so the answer is auditable, not free-form prose.
investigations.explain()
Run a synchronous investigation against an asset. The same correlator pipeline that powers alert-driven investigations runs against the asset and returns an EvidenceCapsule. No row is persisted.
capsule = client.investigations.explain(
asset_id="11111111-1111-1111-1111-111111111111",
question="Why is the orders table stale?",
event_type="freshness",
)
print(capsule.root_cause_hypothesis)
print(f"Confidence: {capsule.confidence:.0%}")
for ev in capsule.triggers:
citation = f"{ev.source}:{ev.source_id}" if ev.source_id else "(legacy)"
print(f"- [{citation}] {ev.summary}")
print(capsule.suggested_fix)
Arguments:
| Name | Type | Required | Description |
|---|
asset_id | str (UUID) | yes | Public UUID of the asset to investigate. |
question | str | no | Anchors the capsule. Defaults to one derived from event_type. |
event_type | str | no | One of anomaly, freshness, schema_change, validity. Defaults to anomaly. |
Returns: EvidenceCapsule
Raises: NotFoundError if the asset is not found, ValidationError if asset_id is not a valid UUID.
investigations.get()
Fetch a persisted investigation (one created when an alert fires) and project it into the same capsule shape.
capsule = client.investigations.get("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")
print(capsule.root_cause_hypothesis)
print(capsule.investigation_id) # the same UUID you passed in
Arguments:
| Name | Type | Required | Description |
|---|
investigation_id | str (UUID) | yes | Public UUID of an AlertInvestigation. |
Returns: EvidenceCapsule
Raises: NotFoundError if no investigation matches.
EvidenceCapsule shape
| Field | Type | Description |
|---|
investigation_id | str | None | UUID of the backing AlertInvestigation. None for ad-hoc explain() calls. |
question | str | The question the capsule answers. |
root_cause_hypothesis | str | None | One-line natural-language hypothesis. |
triggers | list[Evidence] | Upstream signals that set the incident off. |
consequences | list[Evidence] | Downstream impact. Populated for alert-anchored investigations, empty for explain(). |
confidence | float | None | Aggregated confidence, 0.0 to 1.0. |
suggested_fix | str | None | Action hint. |
open_questions | list[str] | Anything the agent could not verify. |
Evidence shape
Each row in triggers / consequences:
| Field | Type | Description |
|---|
event_type | str | E.g. schema_change, freshness_cascade, metric_anomaly. |
confidence | float | Per-evidence confidence, 0.0 to 1.0. |
timestamp | datetime | When the evidence was observed. |
summary | str | One-line human description. |
details | dict | Source-specific structured payload (e.g. {"before": ..., "after": ...} for schema changes). |
source | str | None | One of alert, schema_change, intelligence, metric, lineage. None for legacy rows written before evidence citations were added. |
source_id | str | None | Public UUID of the cited record. Resolves to a deep-link in the UI. |
investigations.explain() replaces a previous five-step recipe (combining client.health.summary(), client.freshness.status(), client.lineage.get(), client.intelligence.ask(), and client.alerts.list()). The backend already runs those five in parallel inside the correlator pipeline; the SDK now returns the aggregated, citable result directly.
client.jobs
Monitor async job status.
jobs.status()
Get status of an async job.
status = client.jobs.status("job_abc123")
print(f"Status: {status.status}")
print(f"Progress: {status.progress}%")
if status.error:
print(f"Error: {status.error}")
Returns: JobStatus
client.metrics
Monitor data quality metrics like row counts, null percentages, and more.
metrics.summary()
Get metrics summary for an asset.
summary = client.metrics.summary("asset-uuid")
print(f"Active: {summary.active_metrics}/{summary.total_metrics}")
print(f"Health: {summary.health_percentage}%")
Returns: MetricsSummary
metrics.list()
List metrics for an asset.
metrics = client.metrics.list(
"asset-uuid",
metric_type="null_percent", # Optional filter
is_active=True, # Optional filter
limit=50,
offset=0,
)
for m in metrics:
print(f"{m.table_path}.{m.column_name}: {m.metric_type}")
Returns: list[MetricDefinition]
metrics.get()
Get metric details with optional snapshots.
metric = client.metrics.get(
"asset-uuid",
"metric-uuid",
include_snapshots=True,
snapshot_limit=30,
)
print(f"Type: {metric.metric_type}")
print(f"Table: {metric.table_path}")
Returns: MetricDefinition
metrics.create()
Create a new metric. Requires read-write scope.
metric = client.metrics.create(
"asset-uuid",
metric_type="null_percent",
table_path="catalog.schema.table",
column_name="email",
capture_interval="daily",
)
print(f"Created: {metric.id}")
Parameters:
asset_id (str): Asset UUID
metric_type (str): row_count, null_percent, distinct_count, etc.
table_path (str): Full table path
column_name (str | None): Column name for column metrics
capture_interval (str): hourly, daily, weekly (default: daily)
sensitivity (float): Anomaly detection sensitivity (default: 1.0)
Returns: MetricDefinition
metrics.update()
Update a metric. Requires read-write scope.
metric = client.metrics.update(
"asset-uuid",
"metric-uuid",
is_active=False,
sensitivity=2.0,
)
Returns: MetricDefinition
metrics.delete()
Delete a metric. Requires read-write scope.
client.metrics.delete("asset-uuid", "metric-uuid")
metrics.capture()
Trigger an immediate metric capture. Requires read-write scope.
result = client.metrics.capture("asset-uuid", "metric-uuid")
print(f"Captured {result.get('snapshot_count', 0)} snapshots")
Returns: dict with snapshot_count and snapshots
metrics.snapshots()
List historical snapshots for a metric.
snapshots = client.metrics.snapshots(
"asset-uuid",
"metric-uuid",
limit=100,
)
for s in snapshots:
print(f"{s.captured_at}: {s.value}")
Returns: list[MetricSnapshot]
client.validity
Define and enforce data validity rules.
validity.summary()
Get validity summary for an asset.
summary = client.validity.summary("asset-uuid")
print(f"Total rules: {summary.total_rules}")
print(f"Passing: {summary.passing}, Failing: {summary.failing}")
Returns: ValiditySummary
validity.list()
List validity rules for an asset.
rules = client.validity.list(
"asset-uuid",
rule_type="NOT_NULL", # Optional filter
is_active=True, # Optional filter
limit=50,
)
for r in rules:
print(f"{r.column_name}: {r.rule_type}")
Returns: list[ValidityRule]
validity.get()
Get validity rule details.
rule = client.validity.get("asset-uuid", "rule-uuid")
print(f"Type: {rule.rule_type}")
print(f"Severity: {rule.severity}")
Returns: ValidityRule
validity.create()
Create a new validity rule. Requires read-write scope.
# NOT_NULL rule
rule = client.validity.create(
"asset-uuid",
rule_type="NOT_NULL",
table_path="catalog.schema.table",
column_name="email",
severity="critical",
)
# REGEX rule
regex_rule = client.validity.create(
"asset-uuid",
rule_type="REGEX",
table_path="catalog.schema.table",
column_name="email",
rule_config={"pattern": r"^[\w.-]+@[\w.-]+\.\w+$"},
)
Parameters:
asset_id (str): Asset UUID
rule_type (str): NOT_NULL, UNIQUE, REGEX, RANGE, ENUM, etc.
table_path (str): Full table path
column_name (str | None): Column name
rule_config (dict | None): Rule-specific configuration
severity (str): info, warning, critical (default: warning)
check_interval (str): hourly, daily, weekly (default: daily)
Returns: ValidityRule
validity.update()
Update a validity rule. Requires read-write scope.
rule = client.validity.update(
"asset-uuid",
"rule-uuid",
severity="critical",
is_active=True,
)
Returns: ValidityRule
validity.delete()
Delete a validity rule. Requires read-write scope.
client.validity.delete("asset-uuid", "rule-uuid")
validity.check()
Trigger an immediate validity check. Requires read-write scope.
result = client.validity.check(
"asset-uuid",
"rule-uuid",
sample_limit=20,
)
if result.status == "fail":
print(f"Invalid: {result.invalid_count} ({result.invalid_percent:.2f}%)")
Returns: ValidityCheckResult
validity.results()
List historical check results.
results = client.validity.results(
"asset-uuid",
"rule-uuid",
limit=30,
)
for r in results:
print(f"{r.checked_at}: {r.status}")
Returns: list[ValidityCheckResult]
client.referential
Monitor referential integrity between tables.
referential.summary()
Get referential summary for an asset.
summary = client.referential.summary("asset-uuid")
print(f"Total: {summary.total_checks}")
print(f"Passing: {summary.passing_checks}, Failing: {summary.failing_checks}")
Returns: ReferentialSummary
referential.list()
List referential checks for an asset.
checks = client.referential.list(
"asset-uuid",
is_active=True,
limit=50,
)
for c in checks:
print(f"{c.child_column_name} -> {c.parent_column_name}")
Returns: list[ReferentialCheck]
referential.get()
Get referential check details.
check = client.referential.get("asset-uuid", "check-uuid")
print(f"FK: {check.child_table_path}.{check.child_column_name}")
print(f"PK: {check.parent_table_path}.{check.parent_column_name}")
Returns: ReferentialCheck
referential.create()
Create a new referential check. Requires read-write scope.
check = client.referential.create(
"asset-uuid",
child_table_path="catalog.schema.orders",
child_column_name="customer_id",
parent_table_path="catalog.schema.customers",
parent_column_name="id",
name="Orders -> Customers FK",
max_orphan_count=0,
)
Parameters:
asset_id (str): Asset UUID
child_table_path (str): Child table path (contains FK)
child_column_name (str): FK column name
parent_table_path (str): Parent table path (contains PK)
parent_column_name (str): PK column name
name (str | None): Check name
max_orphan_count (int | None): Alert threshold for orphan count
max_orphan_percent (float | None): Alert threshold for orphan %
Returns: ReferentialCheck
referential.update()
Update a referential check. Requires read-write scope.
check = client.referential.update(
"asset-uuid",
"check-uuid",
max_orphan_percent=0.5,
capture_interval="hourly",
)
Returns: ReferentialCheck
referential.delete()
Delete a referential check. Requires read-write scope.
client.referential.delete("asset-uuid", "check-uuid")
referential.execute()
Execute a referential check immediately. Requires read-write scope.
result = client.referential.execute("asset-uuid", "check-uuid")
if result.status == "fail":
print(f"Orphans: {result.orphan_count} ({result.orphan_percent:.2f}%)")
Returns: ReferentialCheckResult
referential.results()
List historical check results.
results = client.referential.results(
"asset-uuid",
"check-uuid",
limit=30,
)
for r in results:
print(f"{r.created_at}: {r.status} ({r.orphan_count} orphans)")
Returns: list[ReferentialCheckResult]
client.alerts
Query alert history.
alerts.summary()
Get alert summary statistics.
summary = client.alerts.summary()
print(f"Triggered: {summary.triggered}")
print(f"Last 24h: {summary.triggered_last_24h}")
Returns: AlertsSummary
alerts.list()
List alerts with filters.
alerts = client.alerts.list(
status="triggered", # "triggered", "acknowledged", "resolved"
alert_type="freshness", # "freshness", "schema_change", "row_count"
asset_id="snowflake.prod.warehouse.orders",
limit=50,
)
Returns: list[Alert]
alerts.rules()
List configured alert rules.
rules = client.alerts.rules()
for rule in rules:
print(f"{rule.name}: {rule.alert_type}")
Returns: list[AlertRule]
client.api_keys
Manage API keys (requires admin scope).
api_keys.list()
List your organization’s API keys.
keys = client.api_keys.list(
include_revoked=False,
limit=50,
)
Returns: list[APIKey]
api_keys.create()
Create a new API key.
The full key is only returned once. Store it securely!
new_key = client.api_keys.create(
name="Airflow Production",
scope="read-only", # "read-only", "read-write", "admin"
)
print(f"Key: {new_key.key}") # Save this!
Returns: CreatedAPIKey (includes full key)
api_keys.get()
Get details of a specific key.
key = client.api_keys.get(key_id)
print(f"{key.name}: {key.scope}")
Returns: APIKey (without full key)
api_keys.revoke()
Revoke an API key. This cannot be undone.
client.api_keys.revoke(key_id)
Returns: APIKey
api_keys.usage()
Get API key usage and limits.
usage = client.api_keys.usage()
print(f"Keys: {usage['current_count']}/{usage['max_keys']}")
print(f"Rate limit: {usage['rate_limit_per_min']}/min")
Returns: dict
Models
Asset
class Asset:
id: str
qualified_name: str
name: str
asset_type: str # "table", "view", "model"
source: str
database: str | None
schema_name: str | None
description: str | None
row_count: int | None
column_count: int | None
tags: list[str]
created_at: datetime
updated_at: datetime
FreshnessStatus
class FreshnessStatus:
asset_id: str
qualified_name: str
status: str # "fresh", "stale", "unknown"
is_fresh: bool
is_stale: bool
last_updated: datetime | None
hours_since_update: float | None
staleness_threshold_hours: float | None
checked_at: datetime
APIKey
class APIKey:
id: str
name: str
key_prefix: str # e.g., "aa_live_abc1"
key_suffix: str # e.g., "xy9z"
scope: str
created_at: datetime
last_used_at: datetime | None
revoked_at: datetime | None
CreatedAPIKey
class CreatedAPIKey:
id: str
name: str
key: str # Full key - only shown once!
scope: str
created_at: datetime
Tag
class Tag:
id: str
name: str
category: str # "business", "technical", "governance"
object_path: str | None # e.g., "gold.customers"
object_type: str | None # "table" or "column"
description: str | None
created_at: datetime | None
BulkApplyResult
class BulkApplyResult:
applied: int # Number of tags successfully applied
failed: int # Number that failed
total: int # Total attempted
IntelligenceAnswer
class IntelligenceAnswer:
answer: str # AI-generated answer
confidence: str # "high", "medium", "low"
sources: str # Data sources used
JobStatus
class JobStatus:
job_id: str
status: str # "pending", "running", "completed", "failed"
progress: int # 0-100
error: str | None
created_at: datetime
completed_at: datetime | None
MetricsSummary
class MetricsSummary:
total_metrics: int
active_metrics: int
total_checks: int
passing: int
failing: int
warning: int
error: int
health_percentage: float
MetricDefinition
class MetricDefinition:
id: str # Public UUID
internal_id: int
asset_id: int
table_path: str
column_name: str | None
metric_type: str # row_count, null_percent, etc.
capture_interval: str
sensitivity: int
is_active: bool
created_at: datetime | None
MetricSnapshot
class MetricSnapshot:
id: int
metric_definition_id: int
value: float
captured_at: datetime
is_anomaly: bool
z_score: float | None
status: str | None # PASS, FAIL, WARNING
ValiditySummary
class ValiditySummary:
total_rules: int
passing: int
failing: int
error: int
ValidityRule
class ValidityRule:
id: int
uuid: str
table_path: str
column_name: str | None
rule_type: str # NOT_NULL, UNIQUE, REGEX, etc.
rule_config: dict | None
name: str | None
severity: str # info, warning, critical
is_active: bool
check_interval: str
ValidityCheckResult
class ValidityCheckResult:
id: int
validity_rule_id: int
status: str # pass, fail, error
total_rows: int
invalid_count: int
invalid_percent: float
invalid_samples: dict | None
checked_at: datetime
ReferentialSummary
class ReferentialSummary:
total_checks: int
active_checks: int
passing_checks: int
failing_checks: int
last_check_at: datetime | None
ReferentialCheck
class ReferentialCheck:
id: str # Public UUID
internal_id: int
asset_id: int
child_table_path: str
child_column_name: str
parent_table_path: str
parent_column_name: str
name: str | None
capture_interval: str
max_orphan_count: int | None
max_orphan_percent: float | None
is_active: bool
ReferentialCheckResult
class ReferentialCheckResult:
id: int
referential_check_id: int
status: str # pass, fail, error
orphan_count: int
orphan_percent: float
total_child_rows: int
orphan_sample: list | None
created_at: datetime