Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.anomalyarmor.ai/llms.txt

Use this file to discover all available pages before exploring further.

Integrate AnomalyArmor with Apache Airflow to gate pipelines on data quality and freshness.

Installation

Install the SDK in your Airflow environment:
pip install anomalyarmor-cli

Configuration

Set ARMOR_API_KEY as an Airflow variable or environment variable:
# Airflow Variable
airflow variables set ARMOR_API_KEY "aa_live_xxx"

# Or environment variable
export ARMOR_API_KEY="aa_live_xxx"

Pre-flight Freshness Check

The most common pattern: fail the task if upstream data is stale.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def check_upstream_freshness():
    """Gate: Fail task if upstream data is stale."""
    client = Client()

    # This raises StalenessError if data is stale
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")
    print("Upstream data is fresh, proceeding...")

def run_transformation():
    """Main transformation logic."""
    print("Running dbt models...")
    # subprocess.run(["dbt", "run", "--select", "orders_mart"])

with DAG(
    "orders_pipeline",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@hourly",
    catchup=False,
) as dag:

    freshness_gate = PythonOperator(
        task_id="check_freshness",
        python_callable=check_upstream_freshness,
    )

    transform = PythonOperator(
        task_id="run_transformation",
        python_callable=run_transformation,
    )

    freshness_gate >> transform
Use read-only scope for pre-flight checks. You only need read-write if triggering refreshes.

Check Multiple Sources

Verify all upstream dependencies before running:
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def check_all_upstream():
    """Check all upstream sources are fresh."""
    client = Client()

    upstream_tables = [
        "snowflake.prod.warehouse.orders",
        "snowflake.prod.warehouse.customers",
        "snowflake.prod.warehouse.products",
    ]

    stale_tables = []
    for table in upstream_tables:
        try:
            client.freshness.require_fresh(table)
        except StalenessError:
            stale_tables.append(table)

    if stale_tables:
        raise Exception(f"Stale upstream data: {stale_tables}")

    print("All upstream sources are fresh!")

Trigger Freshness Check

Trigger a freshness check and wait for completion:
from anomalyarmor import Client

def refresh_and_check():
    """Trigger freshness check, then verify."""
    client = Client()

    # Trigger refresh and wait
    result = client.freshness.refresh(
        "snowflake.prod.warehouse.orders",
        wait=True,
    )

    print(f"Refresh job {result.job_id}: {result.status}")

    # Now check freshness
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")

Sensor Pattern

Wait for data to become fresh:
from airflow.sensors.python import PythonSensor
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def is_data_fresh():
    """Return True when data is fresh."""
    client = Client()
    try:
        client.freshness.require_fresh("snowflake.prod.warehouse.orders")
        return True
    except StalenessError:
        return False

freshness_sensor = PythonSensor(
    task_id="wait_for_fresh_data",
    python_callable=is_data_fresh,
    poke_interval=300,  # Check every 5 minutes
    timeout=3600,       # Timeout after 1 hour
    mode="poke",
)

Check Lineage

Verify all upstream dependencies using lineage:
from anomalyarmor import Client

def check_upstream_via_lineage():
    """Check all upstream sources via lineage API."""
    client = Client()

    # Get upstream dependencies
    lineage = client.lineage.get("snowflake.prod.mart.orders_summary")

    print(f"Checking {len(lineage.upstream)} upstream sources...")

    for upstream in lineage.upstream:
        client.freshness.require_fresh(upstream.qualified_name)
        print(f"  {upstream.qualified_name}")

    print("All upstream sources are fresh!")

Error Handling

Handle different error types appropriately:
from anomalyarmor import Client
from anomalyarmor.exceptions import (
    StalenessError,
    AuthenticationError,
    RateLimitError,
    ArmorError,
)
import time

def check_with_retry():
    """Check freshness with error handling."""
    client = Client()

    max_retries = 3
    for attempt in range(max_retries):
        try:
            client.freshness.require_fresh("snowflake.prod.warehouse.orders")
            return  # Success

        except StalenessError as e:
            # Data is stale - this is expected, fail the task
            raise Exception(f"Data is stale: last updated {e.last_updated}")

        except RateLimitError as e:
            # Rate limited - retry after waiting
            if attempt < max_retries - 1:
                print(f"Rate limited, waiting {e.retry_after}s...")
                time.sleep(e.retry_after)
            else:
                raise

        except AuthenticationError:
            # Auth error - likely config issue
            raise Exception("Invalid ARMOR_API_KEY")

        except ArmorError as e:
            # Other API error - retry
            if attempt < max_retries - 1:
                print(f"API error, retrying: {e}")
                time.sleep(10)
            else:
                raise

Complete DAG Example

Full example with freshness gate, transformation, and post-run schema check:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

default_args = {
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

def check_upstream():
    client = Client()
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")
    client.freshness.require_fresh("snowflake.prod.warehouse.customers")

def trigger_schema_check():
    client = Client()
    result = client.schema.refresh(
        "snowflake.prod.mart.orders_summary",
        wait=True,
    )
    print(f"Schema check: {result.status}")

with DAG(
    "orders_mart_pipeline",
    default_args=default_args,
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
    tags=["data-quality", "orders"],
) as dag:

    check_freshness = PythonOperator(
        task_id="check_upstream_freshness",
        python_callable=check_upstream,
    )

    run_dbt = BashOperator(
        task_id="run_dbt_models",
        bash_command="cd /dbt && dbt run --select orders_mart",
    )

    verify_schema = PythonOperator(
        task_id="verify_schema",
        python_callable=trigger_schema_check,
    )

    check_freshness >> run_dbt >> verify_schema

Next Steps

Common Questions

How do I gate an Airflow DAG on data freshness using AnomalyArmor?

Use a PythonSensor or PythonOperator at the start of your DAG that calls client.freshness.check(table="...") from the AnomalyArmor Python SDK. If freshness is violated, raise an exception to short-circuit the DAG. See the “Gating DAGs” section above for a full example.

Does the AnomalyArmor Python SDK work with Airflow 2 and MWAA?

Yes. The SDK is a pure-Python package (pip install anomalyarmor) and works in any Airflow environment that supports pip-installed dependencies: Airflow 2.x, Astronomer, MWAA, and Airflow on Kubernetes. MWAA requires adding the package to requirements.txt.

Where should I store my AnomalyArmor API key in Airflow?

In an Airflow Connection (Extra JSON field) or a Variable - never in DAG code. For MWAA, use AWS Secrets Manager with Airflow’s Secrets Backend so keys rotate without redeploying DAGs.

Can AnomalyArmor trigger Airflow DAG runs when data quality fails?

Yes via webhooks. Configure an AnomalyArmor alert with a webhook destination pointing at Airflow’s REST API (e.g., POST /api/v1/dags/{dag_id}/dagRuns). Most teams use this pattern to trigger remediation DAGs when a critical freshness or schema-drift alert fires.

Python SDK

SDK reference and patterns

Freshness API

Freshness endpoint details

Lineage API

Explore data dependencies

Alerts

Set up freshness alerts