Use this file to discover all available pages before exploring further.
For LLM agents: documentation index at /llms.txt, full text at /llms-full.txt. Append .md to any page URL for plain markdown.
Integrate AnomalyArmor with dbt to add data quality gates before and after your transformations. This guide covers common patterns for ensuring data quality throughout your dbt workflow.
The shell() function is not a built-in dbt Jinja function. This pattern requires a custom macro
or package that provides shell execution capabilities. Consider using the wrapper script approach
(Pattern 4) for a simpler, more portable solution.
-- macros/armor_quality_gate.sql{% macro armor_freshness_check(table_name) %} {% set result = run_query("SELECT 1") %} {{ log("Checking freshness for " ~ table_name, info=True) }} {# Call CLI from dbt - requires custom shell() macro #} {% set check_result = shell("armor freshness check " ~ table_name ~ " 2>&1 || echo STALE") %} {% if "STALE" in check_result %} {{ exceptions.raise_compiler_error("Data quality check failed: " ~ table_name ~ " is stale") }} {% endif %}{% endmacro %}
A wrapper that handles pre-checks, dbt run, and post-validation:
#!/usr/bin/env python# dbt_with_quality_gates.py"""Run dbt with AnomalyArmor quality gates.Usage: python dbt_with_quality_gates.py run python dbt_with_quality_gates.py run --select marts.*"""import argparseimport subprocessimport sysfrom anomalyarmor import Clientfrom anomalyarmor.exceptions import StalenessError# ConfigurationASSET_ID = "your-asset-uuid"SOURCE_TABLES = [ "snowflake.raw.stripe.payments", "snowflake.raw.crm.customers",]def pre_checks(client: Client) -> bool: """Run pre-dbt quality checks.""" print("\n=== Pre-dbt Quality Checks ===\n") # Check source freshness all_fresh = True for table in SOURCE_TABLES: try: status = client.freshness.require_fresh(table) print(f"[FRESH] {table} ({status.hours_since_update:.1f}h)") except StalenessError as e: print(f"[STALE] {table} ({e.hours_since_update:.1f}h)") all_fresh = False return all_freshdef run_dbt(args: list[str]) -> int: """Execute dbt with provided arguments.""" print("\n=== Running dbt ===\n") cmd = ["dbt"] + args result = subprocess.run(cmd) return result.returncodedef post_checks(client: Client) -> bool: """Run post-dbt quality checks.""" print("\n=== Post-dbt Quality Checks ===\n") passed = True # Validity checks validity = client.validity.summary(ASSET_ID) if validity.failing == 0: print(f"[PASS] Validity: {validity.total_rules} rules") else: print(f"[FAIL] Validity: {validity.failing}/{validity.total_rules} failing") passed = False # Referential integrity ref = client.referential.summary(ASSET_ID) if ref.failing_checks == 0: print(f"[PASS] Referential: {ref.total_checks} checks") else: print(f"[FAIL] Referential: {ref.failing_checks}/{ref.total_checks} failing") passed = False # Metrics anomalies metrics = client.metrics.summary(ASSET_ID) if metrics.failing == 0: print(f"[PASS] Metrics: {metrics.active_metrics} monitored") else: print(f"[WARN] Metrics: {metrics.failing} anomalies detected") # Don't fail on metric anomalies, just warn return passeddef main(): parser = argparse.ArgumentParser(description="Run dbt with quality gates") parser.add_argument("dbt_command", help="dbt command (run, build, test)") parser.add_argument("dbt_args", nargs="*", help="Additional dbt arguments") parser.add_argument("--skip-pre", action="store_true", help="Skip pre-checks") parser.add_argument("--skip-post", action="store_true", help="Skip post-checks") args = parser.parse_args() client = Client() dbt_args = [args.dbt_command] + args.dbt_args # Pre-checks if not args.skip_pre: if not pre_checks(client): print("\nPre-checks failed. Use --skip-pre to bypass.") sys.exit(1) # Run dbt dbt_exit_code = run_dbt(dbt_args) if dbt_exit_code != 0: print(f"\ndbt exited with code {dbt_exit_code}") sys.exit(dbt_exit_code) # Post-checks if not args.skip_post: if not post_checks(client): print("\nPost-checks failed. Review quality issues.") sys.exit(1) print("\n=== All checks passed! ===")if __name__ == "__main__": main()
Run it:
# Full run with all checkspython dbt_with_quality_gates.py run# Run specific modelspython dbt_with_quality_gates.py run --select marts.orders_mart# Skip pre-checks (for development)python dbt_with_quality_gates.py run --skip-pre
Upload your dbt manifest.json to populate data lineage in AnomalyArmor. This lets you visualize the full DAG, run impact analysis, and check upstream freshness before transformations.
# Generate the manifestdbt parse# Upload to AnomalyArmorcurl -X POST \ "https://api.anomalyarmor.ai/api/v1/assets/$ASSET_ID/lineage/upload" \ -H "Authorization: Bearer $ARMOR_API_KEY" \ -F "file=@target/manifest.json"
Or add it as a post-run step in your dbt wrapper:
from anomalyarmor import Clientclient = Client()def upload_lineage(asset_id: str, manifest_path: str = "target/manifest.json"): """Upload dbt manifest to sync lineage after a dbt run.""" with open(manifest_path, "rb") as f: result = client.lineage.upload(asset_id=asset_id, file=f) print(f"Lineage synced: {result.sync_stats['nodes_created']} nodes, " f"{result.sync_stats['edges_created']} edges")
For dbt Cloud users, sync lineage directly without file uploads:
result = client.lineage.sync_dbt_cloud( asset_id=ASSET_ID, account_id="12345", api_token="dbtc_your_token_here", job_id="67890",)
See the full Lineage Upload Guide for detailed setup, CI/CD integration, and manual lineage options.
# Allow small percentage of stale data in non-critical tablesclient.freshness.require_fresh( "analytics.events", max_age_hours=48, # More lenient for analytics)
# Critical checks should fail the pipelineif not critical_checks_pass(): sys.exit(1)# Non-critical checks can warn without failingif not advisory_checks_pass(): log_warning("Advisory checks failed, continuing anyway")
For faster feedback, use summary endpoints instead of individual checks:
# Fast: single API callsummary = client.validity.summary(asset_id)print(f"Failing: {summary.failing}")# Slow: N API callsfor rule in client.validity.list(asset_id): result = client.validity.check(asset_id, rule.uuid) # Avoid in loops
Do I replace my dbt tests with AnomalyArmor, or run both?
Run both - they do different things. dbt tests validate model-level contracts at build time (is this column unique? are these values in this set?). AnomalyArmor monitors continuously after dbt runs, catching freshness violations, schema drift, and statistical anomalies that point-in-time tests miss. See migrating from dbt tests for the patterns.
Does AnomalyArmor hook into dbt run or does it monitor tables after the fact?
Both patterns work. For post-run monitoring, nothing to configure - AnomalyArmor watches your warehouse independently. For pre-run gating (fail fast if source data isn’t fresh enough), add a Python/shell step before dbt run that calls the AnomalyArmor freshness API. See the “Pre-run gating” section above.
Can AnomalyArmor import my existing dbt tests instead of rewriting them?
Yes. Use armor migrate-from dbt path/to/dbt/project (available via the AnomalyArmor CLI) to generate ODCS contract files from your dbt schema.yml. Tests like not_null, unique, accepted_values, and relationships map cleanly; others surface as warnings. See migrating from dbt.
Does the integration work with dbt Cloud as well as dbt Core?
Yes. dbt Cloud emits manifest.json on every run; configure AnomalyArmor to consume it via the lineage-upload flow, and model-to-table relationships populate automatically. See lineage upload.