> ## Documentation Index
> Fetch the complete documentation index at: https://docs.anomalyarmor.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# dbt Integration

> Add data quality gates to your dbt workflows

<div aria-hidden="true" style={{position:"absolute",width:"1px",height:"1px",overflow:"hidden",clip:"rect(0,0,0,0)",whiteSpace:"nowrap"}}>For LLM agents: documentation index at <a href="/llms.txt" tabIndex={-1}>/llms.txt</a>, full text at <a href="/llms-full.txt" tabIndex={-1}>/llms-full.txt</a>. Append .md to any page URL for plain markdown.</div>
Integrate AnomalyArmor with dbt to add data quality gates before and after your transformations. This guide covers common patterns for ensuring data quality throughout your dbt workflow.

## Prerequisites

* AnomalyArmor account with connected data source
* dbt project configured
* Python 3.9+ (for SDK) or shell access (for CLI)
* API key with `read-only` scope (or `read-write` for triggering checks)

## Installation

```bash theme={null}
pip install anomalyarmor-cli dbt-core
```

Configure your API key:

```bash theme={null}
export ARMOR_API_KEY="aa_live_your_key_here"
```

## Pattern 1: Pre-run Quality Gate

Check data quality before running dbt:

### Using CLI (Shell Script)

```bash theme={null}
#!/bin/bash
# pre_dbt_check.sh
set -e

echo "Running pre-dbt quality checks..."

# Check freshness of source tables
armor freshness check snowflake.raw.stripe.payments || {
    echo "Source data is stale. Aborting dbt run."
    exit 1
}

armor freshness check snowflake.raw.crm.customers || {
    echo "Source data is stale. Aborting dbt run."
    exit 1
}

echo "All source tables are fresh. Starting dbt..."
dbt run
```

### Using Python

```python theme={null}
# pre_dbt_check.py
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError
import subprocess
import sys

def check_sources_and_run_dbt():
    client = Client()

    sources = [
        "snowflake.raw.stripe.payments",
        "snowflake.raw.crm.customers",
        "snowflake.raw.shopify.orders",
    ]

    print("Checking source freshness...")
    for source in sources:
        try:
            status = client.freshness.require_fresh(source)
            print(f"  [OK] {source} ({status.hours_since_update:.1f}h old)")
        except StalenessError as e:
            print(f"  [STALE] {source} ({e.hours_since_update:.1f}h old)")
            print("\nAborting: Source data is stale.")
            sys.exit(1)

    print("\nAll sources fresh. Running dbt...")
    result = subprocess.run(["dbt", "run"], check=True)
    return result.returncode

if __name__ == "__main__":
    check_sources_and_run_dbt()
```

## Pattern 2: Post-run Validation

Validate output quality after dbt completes:

```python theme={null}
# post_dbt_validate.py
from anomalyarmor import Client

def validate_dbt_outputs():
    client = Client()
    asset_id = "your-asset-uuid"

    print("Validating dbt outputs...")

    # Check validity rules on transformed tables
    validity = client.validity.summary(asset_id)
    if validity.failing > 0:
        print(f"[WARN] {validity.failing} validity rules failing")

        # Get details of failing rules
        rules = client.validity.list(asset_id)
        for rule in rules:
            result = client.validity.check(asset_id, rule.uuid)
            if result.status == "fail":
                print(f"  - {rule.name} on {rule.column_name}")
                print(f"    Invalid: {result.invalid_count} ({result.invalid_percent:.2f}%)")

        return False
    else:
        print(f"[OK] All {validity.total_rules} validity rules passing")

    # Check referential integrity
    ref = client.referential.summary(asset_id)
    if ref.failing_checks > 0:
        print(f"[WARN] {ref.failing_checks} referential checks failing")
        return False
    else:
        print(f"[OK] All {ref.total_checks} referential checks passing")

    return True

if __name__ == "__main__":
    if not validate_dbt_outputs():
        print("\nPost-dbt validation failed!")
        exit(1)
    print("\nAll validations passed!")
```

## Pattern 3: dbt run-operation Hook

Create a dbt macro that calls AnomalyArmor:

<Note>
  The `shell()` function is not a built-in dbt Jinja function. This pattern requires a custom macro
  or package that provides shell execution capabilities. Consider using the wrapper script approach
  (Pattern 4) for a simpler, more portable solution.
</Note>

```sql theme={null}
-- macros/armor_quality_gate.sql
{% macro armor_freshness_check(table_name) %}
    {% set result = run_query("SELECT 1") %}
    {{ log("Checking freshness for " ~ table_name, info=True) }}

    {# Call CLI from dbt - requires custom shell() macro #}
    {% set check_result = shell("armor freshness check " ~ table_name ~ " 2>&1 || echo STALE") %}

    {% if "STALE" in check_result %}
        {{ exceptions.raise_compiler_error("Data quality check failed: " ~ table_name ~ " is stale") }}
    {% endif %}
{% endmacro %}
```

Then in your model:

```sql theme={null}
-- models/marts/orders_mart.sql
{{ config(pre_hook=armor_freshness_check('snowflake.raw.stripe.payments')) }}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    o.total_amount
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('stg_customers') }} c ON o.customer_id = c.customer_id
```

## Pattern 4: Full dbt Wrapper Script

A wrapper that handles pre-checks, dbt run, and post-validation:

```python theme={null}
#!/usr/bin/env python
# dbt_with_quality_gates.py
"""
Run dbt with AnomalyArmor quality gates.

Usage:
    python dbt_with_quality_gates.py run
    python dbt_with_quality_gates.py run --select marts.*
"""

import argparse
import subprocess
import sys
from anomalyarmor import Client
from anomalyarmor.exceptions import StalenessError

# Configuration
ASSET_ID = "your-asset-uuid"
SOURCE_TABLES = [
    "snowflake.raw.stripe.payments",
    "snowflake.raw.crm.customers",
]

def pre_checks(client: Client) -> bool:
    """Run pre-dbt quality checks."""
    print("\n=== Pre-dbt Quality Checks ===\n")

    # Check source freshness
    all_fresh = True
    for table in SOURCE_TABLES:
        try:
            status = client.freshness.require_fresh(table)
            print(f"[FRESH] {table} ({status.hours_since_update:.1f}h)")
        except StalenessError as e:
            print(f"[STALE] {table} ({e.hours_since_update:.1f}h)")
            all_fresh = False

    return all_fresh

def run_dbt(args: list[str]) -> int:
    """Execute dbt with provided arguments."""
    print("\n=== Running dbt ===\n")
    cmd = ["dbt"] + args
    result = subprocess.run(cmd)
    return result.returncode

def post_checks(client: Client) -> bool:
    """Run post-dbt quality checks."""
    print("\n=== Post-dbt Quality Checks ===\n")

    passed = True

    # Validity checks
    validity = client.validity.summary(ASSET_ID)
    if validity.failing == 0:
        print(f"[PASS] Validity: {validity.total_rules} rules")
    else:
        print(f"[FAIL] Validity: {validity.failing}/{validity.total_rules} failing")
        passed = False

    # Referential integrity
    ref = client.referential.summary(ASSET_ID)
    if ref.failing_checks == 0:
        print(f"[PASS] Referential: {ref.total_checks} checks")
    else:
        print(f"[FAIL] Referential: {ref.failing_checks}/{ref.total_checks} failing")
        passed = False

    # Metrics anomalies
    metrics = client.metrics.summary(ASSET_ID)
    if metrics.failing == 0:
        print(f"[PASS] Metrics: {metrics.active_metrics} monitored")
    else:
        print(f"[WARN] Metrics: {metrics.failing} anomalies detected")
        # Don't fail on metric anomalies, just warn

    return passed

def main():
    parser = argparse.ArgumentParser(description="Run dbt with quality gates")
    parser.add_argument("dbt_command", help="dbt command (run, build, test)")
    parser.add_argument("dbt_args", nargs="*", help="Additional dbt arguments")
    parser.add_argument("--skip-pre", action="store_true", help="Skip pre-checks")
    parser.add_argument("--skip-post", action="store_true", help="Skip post-checks")
    args = parser.parse_args()

    client = Client()
    dbt_args = [args.dbt_command] + args.dbt_args

    # Pre-checks
    if not args.skip_pre:
        if not pre_checks(client):
            print("\nPre-checks failed. Use --skip-pre to bypass.")
            sys.exit(1)

    # Run dbt
    dbt_exit_code = run_dbt(dbt_args)
    if dbt_exit_code != 0:
        print(f"\ndbt exited with code {dbt_exit_code}")
        sys.exit(dbt_exit_code)

    # Post-checks
    if not args.skip_post:
        if not post_checks(client):
            print("\nPost-checks failed. Review quality issues.")
            sys.exit(1)

    print("\n=== All checks passed! ===")

if __name__ == "__main__":
    main()
```

Run it:

```bash theme={null}
# Full run with all checks
python dbt_with_quality_gates.py run

# Run specific models
python dbt_with_quality_gates.py run --select marts.orders_mart

# Skip pre-checks (for development)
python dbt_with_quality_gates.py run --skip-pre
```

## Pattern 5: dbt Cloud Webhook Integration

For dbt Cloud, use webhooks to trigger AnomalyArmor checks:

```python theme={null}
# webhook_handler.py (Flask example)
from flask import Flask, request, jsonify
from anomalyarmor import Client

app = Flask(__name__)

@app.route("/dbt-webhook", methods=["POST"])
def handle_dbt_webhook():
    payload = request.json
    event_type = payload.get("eventType")
    run_status = payload.get("data", {}).get("runStatus")

    if event_type == "run.completed" and run_status == "Success":
        # dbt run completed successfully, run post-checks
        client = Client()
        asset_id = "your-asset-uuid"

        validity = client.validity.summary(asset_id)
        ref = client.referential.summary(asset_id)

        if validity.failing > 0 or ref.failing_checks > 0:
            # Alert your team via Slack, PagerDuty, etc.
            send_alert(f"dbt run completed but quality issues found")

    return jsonify({"status": "ok"})
```

## Pattern 6: Upload Lineage from dbt

Upload your dbt `manifest.json` to populate data lineage in AnomalyArmor. This lets you visualize the full DAG, run impact analysis, and check upstream freshness before transformations.

```bash theme={null}
# Generate the manifest
dbt parse

# Upload to AnomalyArmor
curl -X POST \
  "https://api.anomalyarmor.ai/api/v1/assets/$ASSET_ID/lineage/upload" \
  -H "Authorization: Bearer $ARMOR_API_KEY" \
  -F "file=@target/manifest.json"
```

Or add it as a post-run step in your dbt wrapper:

```python theme={null}
from anomalyarmor import Client

client = Client()

def upload_lineage(asset_id: str, manifest_path: str = "target/manifest.json"):
    """Upload dbt manifest to sync lineage after a dbt run."""
    with open(manifest_path, "rb") as f:
        result = client.lineage.upload(asset_id=asset_id, file=f)
    print(f"Lineage synced: {result.sync_stats['nodes_created']} nodes, "
          f"{result.sync_stats['edges_created']} edges")
```

For dbt Cloud users, sync lineage directly without file uploads:

```python theme={null}
result = client.lineage.sync_dbt_cloud(
    asset_id=ASSET_ID,
    account_id="12345",
    api_token="dbtc_your_token_here",
    job_id="67890",
)
```

<Tip>
  See the full [Lineage Upload Guide](/guides/lineage-upload) for detailed setup, CI/CD integration, and manual lineage options.
</Tip>

## Best Practices

### 1. Scope Your Checks

Don't check everything. Focus on critical paths:

```python theme={null}
# Check only critical source tables
CRITICAL_SOURCES = [
    "production.stripe.payments",  # Revenue-critical
    "production.core.users",       # Identity-critical
]
```

### 2. Set Appropriate Thresholds

Configure checks with realistic thresholds:

```python theme={null}
# Allow small percentage of stale data in non-critical tables
client.freshness.require_fresh(
    "analytics.events",
    max_age_hours=48,  # More lenient for analytics
)
```

### 3. Fail Fast, But Not Always

```python theme={null}
# Critical checks should fail the pipeline
if not critical_checks_pass():
    sys.exit(1)

# Non-critical checks can warn without failing
if not advisory_checks_pass():
    log_warning("Advisory checks failed, continuing anyway")
```

### 4. Cache API Calls

For large dbt projects, minimize API calls:

```python theme={null}
# Get summary once, not per-model
validity_summary = client.validity.summary(asset_id)
# Then use summary.failing, summary.passing, etc.
```

## Troubleshooting

### "Asset not found" errors

Ensure your table names match AnomalyArmor's qualified names exactly:

```bash theme={null}
# List your assets to see exact names
armor assets list --source snowflake
```

### Slow checks

For faster feedback, use summary endpoints instead of individual checks:

```python theme={null}
# Fast: single API call
summary = client.validity.summary(asset_id)
print(f"Failing: {summary.failing}")

# Slow: N API calls
for rule in client.validity.list(asset_id):
    result = client.validity.check(asset_id, rule.uuid)  # Avoid in loops
```

## Next Steps

## Common Questions

### Do I replace my dbt tests with AnomalyArmor, or run both?

Run both - they do different things. dbt tests validate model-level contracts at build time (is this column unique? are these values in this set?). AnomalyArmor monitors continuously after dbt runs, catching freshness violations, schema drift, and statistical anomalies that point-in-time tests miss. See [migrating from dbt tests](/guides/migrate-from-dbt) for the patterns.

### Does AnomalyArmor hook into `dbt run` or does it monitor tables after the fact?

Both patterns work. For post-run monitoring, nothing to configure - AnomalyArmor watches your warehouse independently. For pre-run gating (fail fast if source data isn't fresh enough), add a Python/shell step before `dbt run` that calls the AnomalyArmor freshness API. See the "Pre-run gating" section above.

### Can AnomalyArmor import my existing dbt tests instead of rewriting them?

Yes. Use `armor migrate-from dbt path/to/dbt/project` (available via the AnomalyArmor CLI) to generate ODCS contract files from your dbt schema.yml. Tests like `not_null`, `unique`, `accepted_values`, and `relationships` map cleanly; others surface as warnings. See [migrating from dbt](/guides/migrate-from-dbt).

### Does the integration work with dbt Cloud as well as dbt Core?

Yes. dbt Cloud emits `manifest.json` on every run; configure AnomalyArmor to consume it via the lineage-upload flow, and model-to-table relationships populate automatically. See [lineage upload](/guides/lineage-upload).

<CardGroup cols={2}>
  <Card title="GitHub Actions" icon="github" href="/integrations/github-actions">
    Run checks in CI/CD
  </Card>

  <Card title="Airflow Integration" icon="wind" href="/integrations/airflow">
    Orchestrate with Airflow
  </Card>

  <Card title="Validity API" icon="check-circle" href="/api/validity">
    Full validity API reference
  </Card>

  <Card title="Metrics API" icon="chart-line" href="/api/metrics">
    Track data metrics
  </Card>
</CardGroup>
