Dagster Monitoring with Prometheus: dbt Assets and SQL Transformations (Part 2)

Completing the Dagster Observability Picture

In Part 1 of this series, we established a solid foundation for monitoring self-hosted Dagster with Prometheus, covering everything from system-level metrics to custom asset instrumentation using push gateways. We demonstrated how to capture execution times, success rates, and business metrics from your Python-based Dagster assets.

But we left a critical gap in our observability story: dbt assets.

If you're among the 50% of Dagster users who rely on dbt for data transformations, you know that monitoring SQL-based models presents unique challenges. Unlike Python assets where you can directly instrument code with the prometheus_client library, dbt models are pure SQL - you can't simply add metric collection calls to your transformations.

Yet understanding how your dbt models perform is absolutely critical for pipeline reliability. Model execution times, row processing counts, and transformation success rates are often the most important metrics in your entire data platform. Without visibility into these SQL transformations, you're missing the heart of your data pipeline's performance story.

In this second part, we'll bridge that observability gap by exploring how to extract detailed metrics from dbt runs and feed them into your Prometheus monitoring stack, giving you complete end-to-end visibility across your entire Dagster deployment.


Understanding dbt Execution and Artifacts

To capture dbt details, first we need to understand how dbt works and how it operates within Dagster. dbt (dbt-core) is essentially a command line tool, so you can't do much with how it works internally, but what we can control is what comes in and what comes out.

After each dbt run, it produces a number of artifacts. One of those artifacts is the run_results.json file, which includes detailed results of the last dbt run with all assets and important stats about their execution.

What's Inside run_results.json

This file provides comprehensive information including:

  • Model Execution Details: Status (success/failure), execution time, start and end timestamps

  • Model Metadata: Model name, resource type, how the model is materialized and where in the database

  • Query Information: Rows affected, SQL file path, compiled SQL

  • Error Details: If any occurred, with detailed error messages and diagnostic information

By obtaining and parsing this file, we can get access to all internal details of the dbt run and extract meaningful metrics for Prometheus monitoring.


Extracting dbt Metrics in Dagster

dagster-dbt is the library Dagster uses to load and process dbt resources. If you're orchestrating dbt with Dagster, you probably already have something like this in your asset definition. The only thing we're adding is the line to extract the run_results.json artifact after the dbt run is completed.

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    dbt_build_invocation = dbt.cli(["build"], context=context)

    yield from dbt_build_invocation.stream()

    run_results_json = dbt_build_invocation.get_artifact("run_results.json")

The beauty of this approach lies in its simplicity - with a basic JSON parser and the prometheus_client library, we can extract rich metrics from every dbt run. Let's walk through a complete example that demonstrates how to capture dbt execution metrics and push them to Prometheus.


import os
import subprocess
import json
from dagster import asset
from prometheus_client import Gauge, REGISTRY, generate_latest, Info, Counter
import requests as prom_requests

# Prometheus metric definitions
execution_time_gauge = Gauge('dbt_model_execution_time_seconds', 'Execution time for dbt models', ['model_name'])
status_gauge = Gauge('dbt_model_status', 'Status of dbt models (1=success, 0=failure)', ['model_name'])
start_time_gauge = Gauge('dbt_model_execution_started_at', 'Start timestamp for dbt models (unix epoch seconds)', ['model_name'])
end_time_gauge = Gauge('dbt_model_execution_completed_at', 'End timestamp for dbt models (unix epoch seconds)', ['model_name'])
rows_affected_gauge = Gauge('dbt_model_rows_affected', 'Rows affected by dbt model', ['model_name'])
error_info = Info('dbt_model_error', 'Error message for dbt models', ['model_name'])
run_success_counter = Counter('dbt_model_run_success_total', 'Number of successful dbt model runs', ['model_name'])
run_failure_counter = Counter('dbt_model_run_failure_total', 'Number of failed dbt model runs', ['model_name'])
failure_count_gauge = Gauge('dbt_model_failure_count', 'Current number of failures for dbt model', ['model_name'])

PROM_AGG_GATEWAY_URL = os.getenv("PROM_AGG_GATEWAY_URL") or "http://localhost/metrics/job/dbt_example"

def parse_run_results(run_results_json, manifest_json=None):
    data = json.loads(run_results_json)
    manifest = json.loads(manifest_json) if manifest_json else None
    manifest_nodes = manifest.get('nodes', {}) if manifest else {}
    results = []
    for result in data.get('results', []):
        unique_id = result.get('unique_id')
        node = manifest_nodes.get(unique_id, {}) if manifest else {}
        model_info = {
            'model_name': unique_id,
            'status': result.get('status'),
            'execution_time': result.get('execution_time'),
            'timing': result.get('timing'),
            'adapter_response': result.get('adapter_response'),
            'message': result.get('message'),
            'materialization': node.get('config', {}).get('materialized') if node else None,
            'resource_type': node.get('resource_type') if node else None,
            'database': node.get('database') if node else None,
        }
        results.append(model_info)
    return results


def push_dbt_metrics(context, run_results_path, manifest_path=None):
    """
    Extracts metrics from dbt run_results and pushes to Prometheus Aggregation Gateway.
    """
    try:
        if not os.path.exists(run_results_path):
            context.log.warning(f"{run_results_path} not found. Skipping metrics push.")
            return False
        with open(run_results_path) as f:
            run_results_json = f.read()
        manifest_json = None
        if manifest_path and os.path.exists(manifest_path):
            with open(manifest_path) as f:
                manifest_json = f.read()
        model_results = parse_run_results(run_results_json, manifest_json)
        import dateutil.parser
        # Only increment counters for models (not tests, snapshots, etc.), and only once per run
        counted_success = set()
        counted_failure = set()
        # Track all model names for gauge
        all_model_names = set()
        for model in model_results:
            model_name = model['model_name']
            all_model_names.add(model_name)
            execution_time = model.get('execution_time', 0) or 0
            status = 1 if model.get('status') == 'success' else 0
            execution_time_gauge.labels(model_name=model_name).set(execution_time)
            status_gauge.labels(model_name=model_name).set(status)
            # Only increment counters for resource_type == 'model'
            if model.get('resource_type') == 'model':
                if model.get('status') == 'success' and model_name not in counted_success:
                    run_success_counter.labels(model_name=model_name).inc()
                    counted_success.add(model_name)
                elif model.get('status') != 'success' and model_name not in counted_failure:
                    run_failure_counter.labels(model_name=model_name).inc()
                    counted_failure.add(model_name)
            # Export start and end timestamps if available
            timing = model.get('timing') or []
            started_at = None
            completed_at = None
            for t in timing:
                if t.get('name') == 'execute':
                    started_at = t.get('started_at')
                    completed_at = t.get('completed_at')
            if started_at:
                try:
                    ts = int(dateutil.parser.isoparse(started_at).timestamp())
                    start_time_gauge.labels(model_name=model_name).set(ts)
                except Exception as e:
                    context.log.warning(f"Could not parse start time for {model_name}: {e}")
            if completed_at:
                try:
                    ts = int(dateutil.parser.isoparse(completed_at).timestamp())
                    end_time_gauge.labels(model_name=model_name).set(ts)
                except Exception as e:
                    context.log.warning(f"Could not parse end time for {model_name}: {e}")
            # Export rows_affected if present
            rows_affected = None
            adapter_response = model.get('adapter_response')
            if adapter_response and isinstance(adapter_response, dict):
                rows_affected = adapter_response.get('rows_affected')
            if rows_affected is not None:
                try:
                    rows_affected_gauge.labels(model_name=model_name).set(rows_affected)
                except Exception as e:
                    context.log.warning(f"Could not set rows_affected for {model_name}: {e}")
            # Export error info if present
            if model.get('status') != 'success' and model.get('message'):
                try:
                    error_info.labels(model_name=model_name).info({'error': model.get('message')})
                except Exception as e:
                    context.log.warning(f"Could not set error info for {model_name}: {e}")
        # Set failure count gauge for all models (including zero)
        for model_name in all_model_names:
            # Get current failure count from Prometheus counter (if available)
            # This is a best effort; Prometheus counters are monotonic, so we just set to 0 if not failed this run
            count = 1 if model_name in counted_failure else 0
            failure_count_gauge.labels(model_name=model_name).set(count)
        # Push metrics to Prometheus Aggregation Gateway
        try:
            metrics_payload = generate_latest(REGISTRY)
            resp = prom_requests.post(
                PROM_AGG_GATEWAY_URL,
                data=metrics_payload,
                headers={"Content-Type": "text/plain"},
                timeout=5
            )
            if resp.status_code == 202:
                context.log.info("Metrics successfully pushed to Aggregation Gateway (202 Accepted)")
            elif resp.status_code != 200:
                context.log.warning(f"Failed to push metrics to Aggregation Gateway: {resp.status_code} {resp.text}")
        except Exception as e:
            context.log.warning(f"Exception pushing metrics to Aggregation Gateway: {e}")
    except Exception as e:
        context.log.warning(f"Exception in metrics extraction/push: {e}")
    return True

@asset
def dbt_build_asset(context):
    """
    Asset that runs `dbt build`, parses run_results, and pushes metrics to Prometheus Aggregation Gateway.
    Metrics are pushed even if dbt build fails.
    """
    project_dir = "dbt_example"
    profiles_dir = os.path.join(project_dir, "config")
    run_results_path = os.path.join(project_dir, "target", "run_results.json")
    manifest_path = os.path.join(project_dir, "target", "manifest.json")
    result = subprocess.run([
        "dbt", "build", "--project-dir", project_dir, "--profiles-dir", profiles_dir
    ], capture_output=True, text=True)
    context.log.info(result.stdout)
    if result.returncode != 0:
        context.log.error(result.stderr)
    # Always try to push metrics, even if build failed
    push_dbt_metrics(context, run_results_path, manifest_path)
    if result.returncode != 0:
        raise Exception(f"dbt build failed: {result.stderr}")
    return True

After running Dagster and materializing your dbt assets, the metrics extraction process automatically kicks in, parsing the run_results.json file and pushing the collected metrics to our aggregation gateway. Let's verify that our dbt metrics are being captured correctly by checking the gateway.

curl http://localhost/metrics | grep dbt_model | grep -Ev "HELP|TYPE"

dbt_model_error_info{error="Database Error in model example_model (models/example_model.sql)\n  syntax error at or near \"from\"\n  LINE 18: from generate_series(1, 100) as t(id)\n           ^\n  compiled code at target/run/dbt_example/models/example_model.sql",job="dbt_example",model_name="model.dbt_example.example_model"} 3
dbt_model_error_info{error="Database Error in model second_model (models/second_model.sql)\n  syntax error at or near \"from\"\n  LINE 19: from \"dbt_metrics_example\".\"public\".\"example_model\"\n           ^\n  compiled code at target/run/dbt_example/models/second_model.sql",job="dbt_example",model_name="model.dbt_example.second_model"} 1
dbt_model_execution_completed_at{job="dbt_example",model_name="model.dbt_example.example_model"} 1.9297774722e+10
dbt_model_execution_completed_at{job="dbt_example",model_name="model.dbt_example.second_model"} 8.771719863e+09
dbt_model_execution_started_at{job="dbt_example",model_name="model.dbt_example.example_model"} 1.9297774721e+10
dbt_model_execution_started_at{job="dbt_example",model_name="model.dbt_example.second_model"} 8.771719863e+09
dbt_model_execution_time_seconds{job="dbt_example",model_name="model.dbt_example.example_model"} 1.3418488502502441
dbt_model_execution_time_seconds{job="dbt_example",model_name="model.dbt_example.second_model"} 0.25799131393432617
dbt_model_failure_count{job="dbt_example",model_name="model.dbt_example.example_model"} 0
dbt_model_failure_count{job="dbt_example",model_name="model.dbt_example.second_model"} 1
dbt_model_last_run_success{job="dbt_example",model_name="model.dbt_example.example_model"} 2
dbt_model_rows_affected{job="dbt_example",model_name="model.dbt_example.example_model"} 800
dbt_model_rows_affected{job="dbt_example",model_name="model.dbt_example.second_model"} 97
dbt_model_run_failure_created{job="dbt_example",model_name="model.dbt_example.example_model"} 5.263027589890698e+09
dbt_model_run_failure_created{job="dbt_example",model_name="model.dbt_example.second_model"} 1.7543443122349281e+09
dbt_model_run_failure_total{job="dbt_example",model_name="model.dbt_example.example_model"} 3
dbt_model_run_failure_total{job="dbt_example",model_name="model.dbt_example.second_model"} 1
dbt_model_run_success_created{job="dbt_example",model_name="model.dbt_example.example_model"} 1.4034747146853819e+10
dbt_model_run_success_created{job="dbt_example",model_name="model.dbt_example.second_model"} 7.017375557182798e+09
dbt_model_run_success_total{job="dbt_example",model_name="model.dbt_example.example_model"} 8
dbt_model_run_success_total{job="dbt_example",model_name="model.dbt_example.second_model"} 4
dbt_model_status{job="dbt_example",model_name="model.dbt_example.example_model"} 8
dbt_model_status{job="dbt_example",model_name="model.dbt_example.second_model"} 4

Now we can see the data flowing into our monitoring system. With dbt metrics successfully captured in Prometheus, it's time to build meaningful Grafana visualisations that provide actionable insights into your SQL transformation performance. Let's create dashboards that showcase model execution times, success rates, and resource utilisation patterns.


Complete Dagster Observability

We've now established comprehensive monitoring coverage for your entire Dagster deployment. Starting with system-level metrics and custom Python assets in Part 1, and now extending to dbt SQL transformations in Part 2, you have end-to-end visibility into your data pipeline performance.

What We've Accomplished

This two-part series has taken you from basic infrastructure monitoring to complete pipeline observability:

  • System Monitoring: Out-of-the-box Kubernetes metrics for your Dagster services

  • Custom Asset Instrumentation: Push-gateway patterns for ephemeral Python jobs

  • dbt Model Monitoring: Automated metric extraction from SQL transformations

  • Unified Dashboards: Grafana visualisations combining all metric sources

What This Opens Up

Having your Dagster metrics in Prometheus format unlocks powerful operational capabilities:

Consistent Alerting: Set up alerts using Prometheus as a datasource with the same alerting rules and thresholds you use for your infrastructure. Pipeline failures, slow transformations, and data quality issues can trigger the same incident response workflows as your application alerts.

Monitoring Stack Integration: Feed Prometheus stats into your existing monitoring systems - whether that's Datadog, New Relic, or custom solutions. Since virtually all modern monitoring platforms speak Prometheus, your Dagster metrics integrate seamlessly with your broader observability stack.

Cross-System Correlation: Most importantly, you can now correlate stats from your data pipelines with all other systems you already monitor with Prometheus. Identify when a slow dbt model correlates with database load spikes, or when pipeline failures coincide with infrastructure issues across your entire platform.

Moving Forward

Your Dagster deployment is no longer a black box. Whether you're debugging a failing transformation at 2 AM or optimising pipeline performance during business hours, you now have the observability tools needed to operate a production-grade data platform with confidence.

The investment in comprehensive monitoring pays dividends in operational excellence, faster incident response, and deeper understanding of your data pipeline's true performance characteristics.

Complete observability isn't a luxury in production data platforms - it's a necessity. Now your entire Dagster ecosystem, from Python assets to dbt models, speaks the same monitoring language as your infrastructure.

Complete observability isn't a luxury in production data platforms - it's a necessity. Now your entire Dagster ecosystem, from Python assets to dbt models, speaks the same monitoring language as your infrastructure.

Complete observability isn't a luxury in production data platforms - it's a necessity. Now your entire Dagster ecosystem, from Python assets to dbt models, speaks the same monitoring language as your infrastructure.

Abhivan Chekuri

Subscribe for the latest blogs and news updates!

Related Posts

dagster

Aug 5, 2025

Learn how to set up comprehensive monitoring for your self-hosted Dagster deployment using Prometheus, from infrastructure metrics to custom asset instrumentation.

dagster

Jul 17, 2025

The combination of open-source tools like Authentik with Kubernetes ingress controllers provides enterprise-grade authentication without the enterprise price tag, making secure self-hosted data stacks accessible to organizations of any size.

© MetaOps 2024

© MetaOps 2024

© MetaOps 2024