> ## Documentation Index
> Fetch the complete documentation index at: https://docs.anomalyarmor.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Airflow Integration

> Use AnomalyArmor in Apache Airflow DAGs

<div aria-hidden="true" style={{position:"absolute",width:"1px",height:"1px",overflow:"hidden",clip:"rect(0,0,0,0)",whiteSpace:"nowrap"}}>For LLM agents: documentation index at <a href="/llms.txt" tabIndex={-1}>/llms.txt</a>, full text at <a href="/llms-full.txt" tabIndex={-1}>/llms-full.txt</a>. Append .md to any page URL for plain markdown.</div>
Integrate AnomalyArmor with Apache Airflow to gate pipelines on data quality and freshness.

## Installation

Install the SDK in your Airflow environment:

```bash theme={null}
pip install anomalyarmor-cli
```

## Configuration

Set `ARMOR_API_KEY` as an Airflow variable or environment variable:

```bash theme={null}
# Airflow Variable
airflow variables set ARMOR_API_KEY "aa_live_xxx"

# Or environment variable
export ARMOR_API_KEY="aa_live_xxx"
```

## Pre-flight Freshness Check

The most common pattern: fail the task if upstream data is stale.

```python theme={null}
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def check_upstream_freshness():
    """Gate: Fail task if upstream data is stale."""
    client = Client()

    # This raises StalenessError if data is stale
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")
    print("Upstream data is fresh, proceeding...")

def run_transformation():
    """Main transformation logic."""
    print("Running dbt models...")
    # subprocess.run(["dbt", "run", "--select", "orders_mart"])

with DAG(
    "orders_pipeline",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@hourly",
    catchup=False,
) as dag:

    freshness_gate = PythonOperator(
        task_id="check_freshness",
        python_callable=check_upstream_freshness,
    )

    transform = PythonOperator(
        task_id="run_transformation",
        python_callable=run_transformation,
    )

    freshness_gate >> transform
```

<Tip>
  Use `read-only` scope for pre-flight checks. You only need `read-write` if triggering refreshes.
</Tip>

## Check Multiple Sources

Verify all upstream dependencies before running:

```python theme={null}
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def check_all_upstream():
    """Check all upstream sources are fresh."""
    client = Client()

    upstream_tables = [
        "snowflake.prod.warehouse.orders",
        "snowflake.prod.warehouse.customers",
        "snowflake.prod.warehouse.products",
    ]

    stale_tables = []
    for table in upstream_tables:
        try:
            client.freshness.require_fresh(table)
        except StalenessError:
            stale_tables.append(table)

    if stale_tables:
        raise Exception(f"Stale upstream data: {stale_tables}")

    print("All upstream sources are fresh!")
```

## Trigger Freshness Check

Trigger a freshness check and wait for completion:

```python theme={null}
from anomalyarmor import Client

def refresh_and_check():
    """Trigger freshness check, then verify."""
    client = Client()

    # Trigger refresh and wait
    result = client.freshness.refresh(
        "snowflake.prod.warehouse.orders",
        wait=True,
    )

    print(f"Refresh job {result.job_id}: {result.status}")

    # Now check freshness
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")
```

## Sensor Pattern

Wait for data to become fresh:

```python theme={null}
from airflow.sensors.python import PythonSensor
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

def is_data_fresh():
    """Return True when data is fresh."""
    client = Client()
    try:
        client.freshness.require_fresh("snowflake.prod.warehouse.orders")
        return True
    except StalenessError:
        return False

freshness_sensor = PythonSensor(
    task_id="wait_for_fresh_data",
    python_callable=is_data_fresh,
    poke_interval=300,  # Check every 5 minutes
    timeout=3600,       # Timeout after 1 hour
    mode="poke",
)
```

## Check Lineage

Verify all upstream dependencies using lineage:

```python theme={null}
from anomalyarmor import Client

def check_upstream_via_lineage():
    """Check all upstream sources via lineage API."""
    client = Client()

    # Get upstream dependencies
    lineage = client.lineage.get("snowflake.prod.mart.orders_summary")

    print(f"Checking {len(lineage.upstream)} upstream sources...")

    for upstream in lineage.upstream:
        client.freshness.require_fresh(upstream.qualified_name)
        print(f"  {upstream.qualified_name}")

    print("All upstream sources are fresh!")
```

## Error Handling

Handle different error types appropriately:

```python theme={null}
from anomalyarmor import Client
from anomalyarmor.exceptions import (
    StalenessError,
    AuthenticationError,
    RateLimitError,
    ArmorError,
)
import time

def check_with_retry():
    """Check freshness with error handling."""
    client = Client()

    max_retries = 3
    for attempt in range(max_retries):
        try:
            client.freshness.require_fresh("snowflake.prod.warehouse.orders")
            return  # Success

        except StalenessError as e:
            # Data is stale - this is expected, fail the task
            raise Exception(f"Data is stale: last updated {e.last_updated}")

        except RateLimitError as e:
            # Rate limited - retry after waiting
            if attempt < max_retries - 1:
                print(f"Rate limited, waiting {e.retry_after}s...")
                time.sleep(e.retry_after)
            else:
                raise

        except AuthenticationError:
            # Auth error - likely config issue
            raise Exception("Invalid ARMOR_API_KEY")

        except ArmorError as e:
            # Other API error - retry
            if attempt < max_retries - 1:
                print(f"API error, retrying: {e}")
                time.sleep(10)
            else:
                raise
```

## Complete DAG Example

Full example with freshness gate, transformation, and post-run schema check:

```python theme={null}
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

default_args = {
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

def check_upstream():
    client = Client()
    client.freshness.require_fresh("snowflake.prod.warehouse.orders")
    client.freshness.require_fresh("snowflake.prod.warehouse.customers")

def trigger_schema_check():
    client = Client()
    result = client.schema.refresh(
        "snowflake.prod.mart.orders_summary",
        wait=True,
    )
    print(f"Schema check: {result.status}")

with DAG(
    "orders_mart_pipeline",
    default_args=default_args,
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
    tags=["data-quality", "orders"],
) as dag:

    check_freshness = PythonOperator(
        task_id="check_upstream_freshness",
        python_callable=check_upstream,
    )

    run_dbt = BashOperator(
        task_id="run_dbt_models",
        bash_command="cd /dbt && dbt run --select orders_mart",
    )

    verify_schema = PythonOperator(
        task_id="verify_schema",
        python_callable=trigger_schema_check,
    )

    check_freshness >> run_dbt >> verify_schema
```

## Next Steps

## Common Questions

### How do I gate an Airflow DAG on data freshness using AnomalyArmor?

Use a `PythonSensor` or `PythonOperator` at the start of your DAG that calls `client.freshness.check(table="...")` from the AnomalyArmor Python SDK. If freshness is violated, raise an exception to short-circuit the DAG. See the "Gating DAGs" section above for a full example.

### Does the AnomalyArmor Python SDK work with Airflow 2 and MWAA?

Yes. The SDK is a pure-Python package (`pip install anomalyarmor`) and works in any Airflow environment that supports pip-installed dependencies: Airflow 2.x, Astronomer, MWAA, and Airflow on Kubernetes. MWAA requires adding the package to `requirements.txt`.

### Where should I store my AnomalyArmor API key in Airflow?

In an Airflow Connection (Extra JSON field) or a Variable - never in DAG code. For MWAA, use AWS Secrets Manager with Airflow's Secrets Backend so keys rotate without redeploying DAGs.

### Can AnomalyArmor trigger Airflow DAG runs when data quality fails?

Yes via webhooks. Configure an AnomalyArmor alert with a webhook destination pointing at Airflow's REST API (e.g., `POST /api/v1/dags/{dag_id}/dagRuns`). Most teams use this pattern to trigger remediation DAGs when a critical freshness or schema-drift alert fires.

<CardGroup cols={2}>
  <Card title="Python SDK" icon="python" href="/sdk/overview">
    SDK reference and patterns
  </Card>

  <Card title="Freshness API" icon="clock" href="/api/freshness">
    Freshness endpoint details
  </Card>

  <Card title="Lineage API" icon="diagram-project" href="/api/lineage">
    Explore data dependencies
  </Card>

  <Card title="Alerts" icon="bell" href="/alerts/overview">
    Set up freshness alerts
  </Card>
</CardGroup>
