Documentation Index Fetch the complete documentation index at: https://docs.anomalyarmor.ai/llms.txt
Use this file to discover all available pages before exploring further.
For LLM agents: documentation index at
/llms.txt , full text at
/llms-full.txt . Append .md to any page URL for plain markdown.
Integrate AnomalyArmor with Apache Airflow to gate pipelines on data quality and freshness.
Installation
Install the SDK in your Airflow environment:
pip install anomalyarmor-cli
Configuration
Set ARMOR_API_KEY as an Airflow variable or environment variable:
# Airflow Variable
airflow variables set ARMOR_API_KEY "aa_live_xxx"
# Or environment variable
export ARMOR_API_KEY = "aa_live_xxx"
Pre-flight Freshness Check
The most common pattern: fail the task if upstream data is stale.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
def check_upstream_freshness ():
"""Gate: Fail task if upstream data is stale."""
client = Client()
# This raises StalenessError if data is stale
client.freshness.require_fresh( "snowflake.prod.warehouse.orders" )
print ( "Upstream data is fresh, proceeding..." )
def run_transformation ():
"""Main transformation logic."""
print ( "Running dbt models..." )
# subprocess.run(["dbt", "run", "--select", "orders_mart"])
with DAG(
"orders_pipeline" ,
start_date = datetime( 2024 , 1 , 1 ),
schedule_interval = "@hourly" ,
catchup = False ,
) as dag:
freshness_gate = PythonOperator(
task_id = "check_freshness" ,
python_callable = check_upstream_freshness,
)
transform = PythonOperator(
task_id = "run_transformation" ,
python_callable = run_transformation,
)
freshness_gate >> transform
Use read-only scope for pre-flight checks. You only need read-write if triggering refreshes.
Check Multiple Sources
Verify all upstream dependencies before running:
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
def check_all_upstream ():
"""Check all upstream sources are fresh."""
client = Client()
upstream_tables = [
"snowflake.prod.warehouse.orders" ,
"snowflake.prod.warehouse.customers" ,
"snowflake.prod.warehouse.products" ,
]
stale_tables = []
for table in upstream_tables:
try :
client.freshness.require_fresh(table)
except StalenessError:
stale_tables.append(table)
if stale_tables:
raise Exception ( f "Stale upstream data: { stale_tables } " )
print ( "All upstream sources are fresh!" )
Trigger Freshness Check
Trigger a freshness check and wait for completion:
from anomalyarmor import Client
def refresh_and_check ():
"""Trigger freshness check, then verify."""
client = Client()
# Trigger refresh and wait
result = client.freshness.refresh(
"snowflake.prod.warehouse.orders" ,
wait = True ,
)
print ( f "Refresh job { result.job_id } : { result.status } " )
# Now check freshness
client.freshness.require_fresh( "snowflake.prod.warehouse.orders" )
Sensor Pattern
Wait for data to become fresh:
from airflow.sensors.python import PythonSensor
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
def is_data_fresh ():
"""Return True when data is fresh."""
client = Client()
try :
client.freshness.require_fresh( "snowflake.prod.warehouse.orders" )
return True
except StalenessError:
return False
freshness_sensor = PythonSensor(
task_id = "wait_for_fresh_data" ,
python_callable = is_data_fresh,
poke_interval = 300 , # Check every 5 minutes
timeout = 3600 , # Timeout after 1 hour
mode = "poke" ,
)
Check Lineage
Verify all upstream dependencies using lineage:
from anomalyarmor import Client
def check_upstream_via_lineage ():
"""Check all upstream sources via lineage API."""
client = Client()
# Get upstream dependencies
lineage = client.lineage.get( "snowflake.prod.mart.orders_summary" )
print ( f "Checking { len (lineage.upstream) } upstream sources..." )
for upstream in lineage.upstream:
client.freshness.require_fresh(upstream.qualified_name)
print ( f " { upstream.qualified_name } " )
print ( "All upstream sources are fresh!" )
Error Handling
Handle different error types appropriately:
from anomalyarmor import Client
from anomalyarmor.exceptions import (
StalenessError,
AuthenticationError,
RateLimitError,
ArmorError,
)
import time
def check_with_retry ():
"""Check freshness with error handling."""
client = Client()
max_retries = 3
for attempt in range (max_retries):
try :
client.freshness.require_fresh( "snowflake.prod.warehouse.orders" )
return # Success
except StalenessError as e:
# Data is stale - this is expected, fail the task
raise Exception ( f "Data is stale: last updated { e.last_updated } " )
except RateLimitError as e:
# Rate limited - retry after waiting
if attempt < max_retries - 1 :
print ( f "Rate limited, waiting { e.retry_after } s..." )
time.sleep(e.retry_after)
else :
raise
except AuthenticationError:
# Auth error - likely config issue
raise Exception ( "Invalid ARMOR_API_KEY" )
except ArmorError as e:
# Other API error - retry
if attempt < max_retries - 1 :
print ( f "API error, retrying: { e } " )
time.sleep( 10 )
else :
raise
Complete DAG Example
Full example with freshness gate, transformation, and post-run schema check:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
default_args = {
"retries" : 1 ,
"retry_delay" : timedelta( minutes = 5 ),
}
def check_upstream ():
client = Client()
client.freshness.require_fresh( "snowflake.prod.warehouse.orders" )
client.freshness.require_fresh( "snowflake.prod.warehouse.customers" )
def trigger_schema_check ():
client = Client()
result = client.schema.refresh(
"snowflake.prod.mart.orders_summary" ,
wait = True ,
)
print ( f "Schema check: { result.status } " )
with DAG(
"orders_mart_pipeline" ,
default_args = default_args,
start_date = datetime( 2024 , 1 , 1 ),
schedule_interval = "@daily" ,
catchup = False ,
tags = [ "data-quality" , "orders" ],
) as dag:
check_freshness = PythonOperator(
task_id = "check_upstream_freshness" ,
python_callable = check_upstream,
)
run_dbt = BashOperator(
task_id = "run_dbt_models" ,
bash_command = "cd /dbt && dbt run --select orders_mart" ,
)
verify_schema = PythonOperator(
task_id = "verify_schema" ,
python_callable = trigger_schema_check,
)
check_freshness >> run_dbt >> verify_schema
Next Steps
Common Questions
How do I gate an Airflow DAG on data freshness using AnomalyArmor?
Use a PythonSensor or PythonOperator at the start of your DAG that calls client.freshness.check(table="...") from the AnomalyArmor Python SDK. If freshness is violated, raise an exception to short-circuit the DAG. See the “Gating DAGs” section above for a full example.
Does the AnomalyArmor Python SDK work with Airflow 2 and MWAA?
Yes. The SDK is a pure-Python package (pip install anomalyarmor) and works in any Airflow environment that supports pip-installed dependencies: Airflow 2.x, Astronomer, MWAA, and Airflow on Kubernetes. MWAA requires adding the package to requirements.txt.
Where should I store my AnomalyArmor API key in Airflow?
In an Airflow Connection (Extra JSON field) or a Variable - never in DAG code. For MWAA, use AWS Secrets Manager with Airflow’s Secrets Backend so keys rotate without redeploying DAGs.
Can AnomalyArmor trigger Airflow DAG runs when data quality fails?
Yes via webhooks. Configure an AnomalyArmor alert with a webhook destination pointing at Airflow’s REST API (e.g., POST /api/v1/dags/{dag_id}/dagRuns). Most teams use this pattern to trigger remediation DAGs when a critical freshness or schema-drift alert fires.
Python SDK SDK reference and patterns
Freshness API Freshness endpoint details
Lineage API Explore data dependencies
Alerts Set up freshness alerts