Skip to main content
Complete reference for all anomalyarmor-cli SDK classes and methods.

Client

The main entry point for the SDK.
from anomalyarmor import Client

client = Client(
    api_key="aa_live_xxx",  # Or use ARMOR_API_KEY env var
    api_url="https://app.anomalyarmor.ai/api/v1",  # Optional
    timeout=30,  # Request timeout in seconds
)

Constructor

ParameterTypeDefaultDescription
api_keystr | NoneNoneAPI key. Falls back to ARMOR_API_KEY env var
api_urlstr | NoneProduction URLBase URL for API requests
timeoutint | None30Request timeout in seconds

Context Manager

with Client() as client:
    assets = client.assets.list()
# Connection automatically closed

client.assets

Interact with data assets (tables, views, models).

assets.list()

List assets with optional filters.
assets = client.assets.list(
    source="snowflake",      # Filter by source
    asset_type="table",      # Filter by type
    search="orders",         # Search in names
    limit=50,                # Max results (default 50, max 100)
    offset=0,                # Skip N results
)
Returns: list[Asset]

assets.get()

Get a specific asset by ID or qualified name.
asset = client.assets.get("snowflake.prod.warehouse.orders")
# Or by UUID
asset = client.assets.get("550e8400-e29b-41d4-a716-446655440000")
Returns: Asset Raises: NotFoundError if asset doesn’t exist

client.freshness

Monitor data freshness.

freshness.summary()

Get aggregate freshness statistics.
summary = client.freshness.summary()
print(f"Fresh: {summary.fresh}/{summary.total_assets}")
print(f"Fresh rate: {summary.fresh_percentage}%")
Returns: FreshnessSummary

freshness.list()

List freshness status for all assets.
statuses = client.freshness.list(
    status="stale",     # Filter: "fresh", "stale", "unknown"
    limit=50,
    offset=0,
)
Returns: list[FreshnessStatus]

freshness.get()

Get freshness status for a specific asset.
status = client.freshness.get("snowflake.prod.warehouse.orders")
print(f"Fresh: {status.is_fresh}")
print(f"Last updated: {status.last_updated}")
print(f"Hours since update: {status.hours_since_update}")
Returns: FreshnessStatus

freshness.require_fresh()

Require an asset to be fresh, raising an error if stale. This is the recommended gate pattern for pipelines.
from anomalyarmor.exceptions import StalenessError

try:
    client.freshness.require_fresh(
        "snowflake.prod.warehouse.orders",
        max_age_hours=24,  # Optional custom threshold
    )
    print("Data is fresh!")
except StalenessError as e:
    print(f"Stale: {e.hours_since_update}h old")
    raise
Parameters:
  • asset_id (str): Asset qualified name or UUID
  • max_age_hours (float | None): Custom threshold. Uses asset’s configured threshold if not provided.
Returns: FreshnessStatus if fresh Raises: StalenessError if stale, NotFoundError if not found

freshness.refresh()

Trigger a freshness check.
result = client.freshness.refresh("snowflake.prod.warehouse.orders")
print(f"Job ID: {result['job_id']}")
Returns: dict with job_id, status, message Raises: NotFoundError, AuthorizationError (requires read-write scope)

client.schema

Monitor schema drift.

schema.summary()

Get schema drift summary statistics.
summary = client.schema.summary()
print(f"Changes last 24h: {summary.changes_last_24h}")
Returns: SchemaSummary

schema.changes()

List recent schema changes.
changes = client.schema.changes(
    asset_id="snowflake.prod.warehouse.orders",  # Optional filter
    change_type="column_added",  # Optional filter
    limit=50,
    offset=0,
)
for change in changes:
    print(f"{change.qualified_name}: {change.change_type}")
Returns: list[SchemaChange]

schema.refresh()

Trigger a schema check.
result = client.schema.refresh("snowflake.prod.warehouse.orders")
Returns: dict with job_id, status

client.lineage

Explore data dependencies.

lineage.list()

List assets with lineage information.
assets = client.lineage.list(
    source="snowflake",
    has_upstream=True,
    has_downstream=True,
    limit=50,
)
Returns: list[LineageAsset]

lineage.get()

Get lineage for a specific asset.
lineage = client.lineage.get(
    "snowflake.prod.warehouse.orders",
    direction="both",  # "upstream", "downstream", or "both"
    depth=2,           # Levels to traverse (1-5)
)

for upstream in lineage.upstream:
    print(f"<- {upstream.qualified_name}")

for downstream in lineage.downstream:
    print(f"-> {downstream.qualified_name}")
Returns: Lineage

client.tags

Manage data classification tags.

tags.list()

List tags for an asset.
tags = client.tags.list(
    asset="postgresql.analytics",  # Asset ID or qualified name
    category="business",           # Optional filter
)
for tag in tags:
    print(f"{tag.name} on {tag.object_path}")
Returns: list[Tag]

tags.create()

Create a tag on a database object.
tag = client.tags.create(
    asset="postgresql.analytics",
    name="pii_data",
    object_path="gold.customers",          # Required: schema.table or schema.table.column
    object_type="table",                   # "table" or "column" (default: "table")
    category="governance",                 # "business", "technical", "governance"
    description="Contains customer PII",   # Optional
)
Returns: Tag

tags.apply()

Apply multiple tags to multiple objects.
result = client.tags.apply(
    asset="postgresql.analytics",
    tag_names=["pii", "gdpr"],                           # Required
    object_paths=["gold.customers", "gold.orders"],      # Required
    category="governance",
)
print(f"Applied: {result.applied}, Failed: {result.failed}")
Returns: BulkApplyResult

client.intelligence

Query the AI knowledge base about your data.

intelligence.ask()

Ask a question about an asset’s data.
answer = client.intelligence.ask(
    asset="postgresql.analytics",
    question="What tables contain customer data?",
)
print(answer.answer)
print(f"Confidence: {answer.confidence}")
print(f"Sources: {answer.sources}")
Returns: IntelligenceAnswer Raises: NotFoundError if asset not found, ValidationError if intelligence not generated

intelligence.generate()

Generate AI intelligence for an asset (async job).
result = client.intelligence.generate(
    asset="postgresql.analytics",
)
print(f"Job ID: {result.job_id}")
print(f"Status: {result.status}")
Returns: dict with job_id, status
Requires asset discovery to be run first. Use the UI or API to discover schema before generating intelligence.

client.investigations

Run structured root-cause investigations against an asset and return a citable EvidenceCapsule. Each piece of evidence links back to the underlying alert, schema event, metric, or lineage edge so the answer is auditable, not free-form prose.
EvidenceCapsule value: one typed call replaces a five-step recipe and returns root cause, confidence, and citation chips that link back to source rows.

investigations.explain()

Run a synchronous investigation against an asset. The same correlator pipeline that powers alert-driven investigations runs against the asset and returns an EvidenceCapsule. No row is persisted.
capsule = client.investigations.explain(
    asset_id="11111111-1111-1111-1111-111111111111",
    question="Why is the orders table stale?",
    event_type="freshness",
)

print(capsule.root_cause_hypothesis)
print(f"Confidence: {capsule.confidence:.0%}")

for ev in capsule.triggers:
    citation = f"{ev.source}:{ev.source_id}" if ev.source_id else "(legacy)"
    print(f"- [{citation}] {ev.summary}")

print(capsule.suggested_fix)
Arguments:
NameTypeRequiredDescription
asset_idstr (UUID)yesPublic UUID of the asset to investigate.
questionstrnoAnchors the capsule. Defaults to one derived from event_type.
event_typestrnoOne of anomaly, freshness, schema_change, validity. Defaults to anomaly.
Returns: EvidenceCapsule Raises: NotFoundError if the asset is not found, ValidationError if asset_id is not a valid UUID.

investigations.get()

Fetch a persisted investigation (one created when an alert fires) and project it into the same capsule shape.
capsule = client.investigations.get("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")
print(capsule.root_cause_hypothesis)
print(capsule.investigation_id)  # the same UUID you passed in
Arguments:
NameTypeRequiredDescription
investigation_idstr (UUID)yesPublic UUID of an AlertInvestigation.
Returns: EvidenceCapsule Raises: NotFoundError if no investigation matches.

EvidenceCapsule shape

FieldTypeDescription
investigation_idstr | NoneUUID of the backing AlertInvestigation. None for ad-hoc explain() calls.
questionstrThe question the capsule answers.
root_cause_hypothesisstr | NoneOne-line natural-language hypothesis.
triggerslist[Evidence]Upstream signals that set the incident off.
consequenceslist[Evidence]Downstream impact. Populated for alert-anchored investigations, empty for explain().
confidencefloat | NoneAggregated confidence, 0.0 to 1.0.
suggested_fixstr | NoneAction hint.
open_questionslist[str]Anything the agent could not verify.

Evidence shape

Each row in triggers / consequences:
FieldTypeDescription
event_typestrE.g. schema_change, freshness_cascade, metric_anomaly.
confidencefloatPer-evidence confidence, 0.0 to 1.0.
timestampdatetimeWhen the evidence was observed.
summarystrOne-line human description.
detailsdictSource-specific structured payload (e.g. {"before": ..., "after": ...} for schema changes).
sourcestr | NoneOne of alert, schema_change, intelligence, metric, lineage. None for legacy rows written before evidence citations were added.
source_idstr | NonePublic UUID of the cited record. Resolves to a deep-link in the UI.
investigations.explain() replaces a previous five-step recipe (combining client.health.summary(), client.freshness.status(), client.lineage.get(), client.intelligence.ask(), and client.alerts.list()). The backend already runs those five in parallel inside the correlator pipeline; the SDK now returns the aggregated, citable result directly.

client.jobs

Monitor async job status.

jobs.status()

Get status of an async job.
status = client.jobs.status("job_abc123")
print(f"Status: {status.status}")
print(f"Progress: {status.progress}%")
if status.error:
    print(f"Error: {status.error}")
Returns: JobStatus

client.metrics

Monitor data quality metrics like row counts, null percentages, and more.

metrics.summary()

Get metrics summary for an asset.
summary = client.metrics.summary("asset-uuid")
print(f"Active: {summary.active_metrics}/{summary.total_metrics}")
print(f"Health: {summary.health_percentage}%")
Returns: MetricsSummary

metrics.list()

List metrics for an asset.
metrics = client.metrics.list(
    "asset-uuid",
    metric_type="null_percent",  # Optional filter
    is_active=True,              # Optional filter
    limit=50,
    offset=0,
)
for m in metrics:
    print(f"{m.table_path}.{m.column_name}: {m.metric_type}")
Returns: list[MetricDefinition]

metrics.get()

Get metric details with optional snapshots.
metric = client.metrics.get(
    "asset-uuid",
    "metric-uuid",
    include_snapshots=True,
    snapshot_limit=30,
)
print(f"Type: {metric.metric_type}")
print(f"Table: {metric.table_path}")
Returns: MetricDefinition

metrics.create()

Create a new metric. Requires read-write scope.
metric = client.metrics.create(
    "asset-uuid",
    metric_type="null_percent",
    table_path="catalog.schema.table",
    column_name="email",
    capture_interval="daily",
)
print(f"Created: {metric.id}")
Parameters:
  • asset_id (str): Asset UUID
  • metric_type (str): row_count, null_percent, distinct_count, etc.
  • table_path (str): Full table path
  • column_name (str | None): Column name for column metrics
  • capture_interval (str): hourly, daily, weekly (default: daily)
  • sensitivity (float): Anomaly detection sensitivity (default: 1.0)
Returns: MetricDefinition

metrics.update()

Update a metric. Requires read-write scope.
metric = client.metrics.update(
    "asset-uuid",
    "metric-uuid",
    is_active=False,
    sensitivity=2.0,
)
Returns: MetricDefinition

metrics.delete()

Delete a metric. Requires read-write scope.
client.metrics.delete("asset-uuid", "metric-uuid")

metrics.capture()

Trigger an immediate metric capture. Requires read-write scope.
result = client.metrics.capture("asset-uuid", "metric-uuid")
print(f"Captured {result.get('snapshot_count', 0)} snapshots")
Returns: dict with snapshot_count and snapshots

metrics.snapshots()

List historical snapshots for a metric.
snapshots = client.metrics.snapshots(
    "asset-uuid",
    "metric-uuid",
    limit=100,
)
for s in snapshots:
    print(f"{s.captured_at}: {s.value}")
Returns: list[MetricSnapshot]

client.validity

Define and enforce data validity rules.

validity.summary()

Get validity summary for an asset.
summary = client.validity.summary("asset-uuid")
print(f"Total rules: {summary.total_rules}")
print(f"Passing: {summary.passing}, Failing: {summary.failing}")
Returns: ValiditySummary

validity.list()

List validity rules for an asset.
rules = client.validity.list(
    "asset-uuid",
    rule_type="NOT_NULL",  # Optional filter
    is_active=True,        # Optional filter
    limit=50,
)
for r in rules:
    print(f"{r.column_name}: {r.rule_type}")
Returns: list[ValidityRule]

validity.get()

Get validity rule details.
rule = client.validity.get("asset-uuid", "rule-uuid")
print(f"Type: {rule.rule_type}")
print(f"Severity: {rule.severity}")
Returns: ValidityRule

validity.create()

Create a new validity rule. Requires read-write scope.
# NOT_NULL rule
rule = client.validity.create(
    "asset-uuid",
    rule_type="NOT_NULL",
    table_path="catalog.schema.table",
    column_name="email",
    severity="critical",
)

# REGEX rule
regex_rule = client.validity.create(
    "asset-uuid",
    rule_type="REGEX",
    table_path="catalog.schema.table",
    column_name="email",
    rule_config={"pattern": r"^[\w.-]+@[\w.-]+\.\w+$"},
)
Parameters:
  • asset_id (str): Asset UUID
  • rule_type (str): NOT_NULL, UNIQUE, REGEX, RANGE, ENUM, etc.
  • table_path (str): Full table path
  • column_name (str | None): Column name
  • rule_config (dict | None): Rule-specific configuration
  • severity (str): info, warning, critical (default: warning)
  • check_interval (str): hourly, daily, weekly (default: daily)
Returns: ValidityRule

validity.update()

Update a validity rule. Requires read-write scope.
rule = client.validity.update(
    "asset-uuid",
    "rule-uuid",
    severity="critical",
    is_active=True,
)
Returns: ValidityRule

validity.delete()

Delete a validity rule. Requires read-write scope.
client.validity.delete("asset-uuid", "rule-uuid")

validity.check()

Trigger an immediate validity check. Requires read-write scope.
result = client.validity.check(
    "asset-uuid",
    "rule-uuid",
    sample_limit=20,
)
if result.status == "fail":
    print(f"Invalid: {result.invalid_count} ({result.invalid_percent:.2f}%)")
Returns: ValidityCheckResult

validity.results()

List historical check results.
results = client.validity.results(
    "asset-uuid",
    "rule-uuid",
    limit=30,
)
for r in results:
    print(f"{r.checked_at}: {r.status}")
Returns: list[ValidityCheckResult]

client.referential

Monitor referential integrity between tables.

referential.summary()

Get referential summary for an asset.
summary = client.referential.summary("asset-uuid")
print(f"Total: {summary.total_checks}")
print(f"Passing: {summary.passing_checks}, Failing: {summary.failing_checks}")
Returns: ReferentialSummary

referential.list()

List referential checks for an asset.
checks = client.referential.list(
    "asset-uuid",
    is_active=True,
    limit=50,
)
for c in checks:
    print(f"{c.child_column_name} -> {c.parent_column_name}")
Returns: list[ReferentialCheck]

referential.get()

Get referential check details.
check = client.referential.get("asset-uuid", "check-uuid")
print(f"FK: {check.child_table_path}.{check.child_column_name}")
print(f"PK: {check.parent_table_path}.{check.parent_column_name}")
Returns: ReferentialCheck

referential.create()

Create a new referential check. Requires read-write scope.
check = client.referential.create(
    "asset-uuid",
    child_table_path="catalog.schema.orders",
    child_column_name="customer_id",
    parent_table_path="catalog.schema.customers",
    parent_column_name="id",
    name="Orders -> Customers FK",
    max_orphan_count=0,
)
Parameters:
  • asset_id (str): Asset UUID
  • child_table_path (str): Child table path (contains FK)
  • child_column_name (str): FK column name
  • parent_table_path (str): Parent table path (contains PK)
  • parent_column_name (str): PK column name
  • name (str | None): Check name
  • max_orphan_count (int | None): Alert threshold for orphan count
  • max_orphan_percent (float | None): Alert threshold for orphan %
Returns: ReferentialCheck

referential.update()

Update a referential check. Requires read-write scope.
check = client.referential.update(
    "asset-uuid",
    "check-uuid",
    max_orphan_percent=0.5,
    capture_interval="hourly",
)
Returns: ReferentialCheck

referential.delete()

Delete a referential check. Requires read-write scope.
client.referential.delete("asset-uuid", "check-uuid")

referential.execute()

Execute a referential check immediately. Requires read-write scope.
result = client.referential.execute("asset-uuid", "check-uuid")
if result.status == "fail":
    print(f"Orphans: {result.orphan_count} ({result.orphan_percent:.2f}%)")
Returns: ReferentialCheckResult

referential.results()

List historical check results.
results = client.referential.results(
    "asset-uuid",
    "check-uuid",
    limit=30,
)
for r in results:
    print(f"{r.created_at}: {r.status} ({r.orphan_count} orphans)")
Returns: list[ReferentialCheckResult]

client.alerts

Query alert history.

alerts.summary()

Get alert summary statistics.
summary = client.alerts.summary()
print(f"Triggered: {summary.triggered}")
print(f"Last 24h: {summary.triggered_last_24h}")
Returns: AlertsSummary

alerts.list()

List alerts with filters.
alerts = client.alerts.list(
    status="triggered",  # "triggered", "acknowledged", "resolved"
    alert_type="freshness",  # "freshness", "schema_change", "row_count"
    asset_id="snowflake.prod.warehouse.orders",
    limit=50,
)
Returns: list[Alert]

alerts.rules()

List configured alert rules.
rules = client.alerts.rules()
for rule in rules:
    print(f"{rule.name}: {rule.alert_type}")
Returns: list[AlertRule]

client.api_keys

Manage API keys (requires admin scope).

api_keys.list()

List your organization’s API keys.
keys = client.api_keys.list(
    include_revoked=False,
    limit=50,
)
Returns: list[APIKey]

api_keys.create()

Create a new API key.
The full key is only returned once. Store it securely!
new_key = client.api_keys.create(
    name="Airflow Production",
    scope="read-only",  # "read-only", "read-write", "admin"
)
print(f"Key: {new_key.key}")  # Save this!
Returns: CreatedAPIKey (includes full key)

api_keys.get()

Get details of a specific key.
key = client.api_keys.get(key_id)
print(f"{key.name}: {key.scope}")
Returns: APIKey (without full key)

api_keys.revoke()

Revoke an API key. This cannot be undone.
client.api_keys.revoke(key_id)
Returns: APIKey

api_keys.usage()

Get API key usage and limits.
usage = client.api_keys.usage()
print(f"Keys: {usage['current_count']}/{usage['max_keys']}")
print(f"Rate limit: {usage['rate_limit_per_min']}/min")
Returns: dict

Models

Asset

class Asset:
    id: str
    qualified_name: str
    name: str
    asset_type: str  # "table", "view", "model"
    source: str
    database: str | None
    schema_name: str | None
    description: str | None
    row_count: int | None
    column_count: int | None
    tags: list[str]
    created_at: datetime
    updated_at: datetime

FreshnessStatus

class FreshnessStatus:
    asset_id: str
    qualified_name: str
    status: str  # "fresh", "stale", "unknown"
    is_fresh: bool
    is_stale: bool
    last_updated: datetime | None
    hours_since_update: float | None
    staleness_threshold_hours: float | None
    checked_at: datetime

APIKey

class APIKey:
    id: str
    name: str
    key_prefix: str  # e.g., "aa_live_abc1"
    key_suffix: str  # e.g., "xy9z"
    scope: str
    created_at: datetime
    last_used_at: datetime | None
    revoked_at: datetime | None

CreatedAPIKey

class CreatedAPIKey:
    id: str
    name: str
    key: str  # Full key - only shown once!
    scope: str
    created_at: datetime

Tag

class Tag:
    id: str
    name: str
    category: str  # "business", "technical", "governance"
    object_path: str | None  # e.g., "gold.customers"
    object_type: str | None  # "table" or "column"
    description: str | None
    created_at: datetime | None

BulkApplyResult

class BulkApplyResult:
    applied: int   # Number of tags successfully applied
    failed: int    # Number that failed
    total: int     # Total attempted

IntelligenceAnswer

class IntelligenceAnswer:
    answer: str           # AI-generated answer
    confidence: str       # "high", "medium", "low"
    sources: str          # Data sources used

JobStatus

class JobStatus:
    job_id: str
    status: str      # "pending", "running", "completed", "failed"
    progress: int    # 0-100
    error: str | None
    created_at: datetime
    completed_at: datetime | None

MetricsSummary

class MetricsSummary:
    total_metrics: int
    active_metrics: int
    total_checks: int
    passing: int
    failing: int
    warning: int
    error: int
    health_percentage: float

MetricDefinition

class MetricDefinition:
    id: str              # Public UUID
    internal_id: int
    asset_id: int
    table_path: str
    column_name: str | None
    metric_type: str     # row_count, null_percent, etc.
    capture_interval: str
    sensitivity: int
    is_active: bool
    created_at: datetime | None

MetricSnapshot

class MetricSnapshot:
    id: int
    metric_definition_id: int
    value: float
    captured_at: datetime
    is_anomaly: bool
    z_score: float | None
    status: str | None   # PASS, FAIL, WARNING

ValiditySummary

class ValiditySummary:
    total_rules: int
    passing: int
    failing: int
    error: int

ValidityRule

class ValidityRule:
    id: int
    uuid: str
    table_path: str
    column_name: str | None
    rule_type: str       # NOT_NULL, UNIQUE, REGEX, etc.
    rule_config: dict | None
    name: str | None
    severity: str        # info, warning, critical
    is_active: bool
    check_interval: str

ValidityCheckResult

class ValidityCheckResult:
    id: int
    validity_rule_id: int
    status: str          # pass, fail, error
    total_rows: int
    invalid_count: int
    invalid_percent: float
    invalid_samples: dict | None
    checked_at: datetime

ReferentialSummary

class ReferentialSummary:
    total_checks: int
    active_checks: int
    passing_checks: int
    failing_checks: int
    last_check_at: datetime | None

ReferentialCheck

class ReferentialCheck:
    id: str              # Public UUID
    internal_id: int
    asset_id: int
    child_table_path: str
    child_column_name: str
    parent_table_path: str
    parent_column_name: str
    name: str | None
    capture_interval: str
    max_orphan_count: int | None
    max_orphan_percent: float | None
    is_active: bool

ReferentialCheckResult

class ReferentialCheckResult:
    id: int
    referential_check_id: int
    status: str          # pass, fail, error
    orphan_count: int
    orphan_percent: float
    total_child_rows: int
    orphan_sample: list | None
    created_at: datetime