Completing the Dagster Observability Picture
In Part 1 of this series, we established a solid foundation for monitoring self-hosted Dagster with Prometheus, covering everything from system-level metrics to custom asset instrumentation using push gateways. We demonstrated how to capture execution times, success rates, and business metrics from your Python-based Dagster assets.
But we left a critical gap in our observability story: dbt assets.
If you're among the 50% of Dagster users who rely on dbt for data transformations, you know that monitoring SQL-based models presents unique challenges. Unlike Python assets where you can directly instrument code with the prometheus_client
library, dbt models are pure SQL - you can't simply add metric collection calls to your transformations.
Yet understanding how your dbt models perform is absolutely critical for pipeline reliability. Model execution times, row processing counts, and transformation success rates are often the most important metrics in your entire data platform. Without visibility into these SQL transformations, you're missing the heart of your data pipeline's performance story.
In this second part, we'll bridge that observability gap by exploring how to extract detailed metrics from dbt runs and feed them into your Prometheus monitoring stack, giving you complete end-to-end visibility across your entire Dagster deployment.
Understanding dbt Execution and Artifacts
To capture dbt details, first we need to understand how dbt works and how it operates within Dagster. dbt (dbt-core) is essentially a command line tool, so you can't do much with how it works internally, but what we can control is what comes in and what comes out.
After each dbt run, it produces a number of artifacts. One of those artifacts is the run_results.json
file, which includes detailed results of the last dbt run with all assets and important stats about their execution.
What's Inside run_results.json
This file provides comprehensive information including:
Model Execution Details: Status (success/failure), execution time, start and end timestamps
Model Metadata: Model name, resource type, how the model is materialized and where in the database
Query Information: Rows affected, SQL file path, compiled SQL
Error Details: If any occurred, with detailed error messages and diagnostic information
By obtaining and parsing this file, we can get access to all internal details of the dbt run and extract meaningful metrics for Prometheus monitoring.
Extracting dbt Metrics in Dagster
dagster-dbt is the library Dagster uses to load and process dbt resources. If you're orchestrating dbt with Dagster, you probably already have something like this in your asset definition. The only thing we're adding is the line to extract the run_results.json
artifact after the dbt run is completed.
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_build_invocation = dbt.cli(["build"], context=context)
yield from dbt_build_invocation.stream()
run_results_json = dbt_build_invocation.get_artifact("run_results.json")
The beauty of this approach lies in its simplicity - with a basic JSON parser and the prometheus_client library, we can extract rich metrics from every dbt run. Let's walk through a complete example that demonstrates how to capture dbt execution metrics and push them to Prometheus.
import os
import subprocess
import json
from dagster import asset
from prometheus_client import Gauge, REGISTRY, generate_latest, Info, Counter
import requests as prom_requests
# Prometheus metric definitions
execution_time_gauge = Gauge('dbt_model_execution_time_seconds', 'Execution time for dbt models', ['model_name'])
status_gauge = Gauge('dbt_model_status', 'Status of dbt models (1=success, 0=failure)', ['model_name'])
start_time_gauge = Gauge('dbt_model_execution_started_at', 'Start timestamp for dbt models (unix epoch seconds)', ['model_name'])
end_time_gauge = Gauge('dbt_model_execution_completed_at', 'End timestamp for dbt models (unix epoch seconds)', ['model_name'])
rows_affected_gauge = Gauge('dbt_model_rows_affected', 'Rows affected by dbt model', ['model_name'])
error_info = Info('dbt_model_error', 'Error message for dbt models', ['model_name'])
run_success_counter = Counter('dbt_model_run_success_total', 'Number of successful dbt model runs', ['model_name'])
run_failure_counter = Counter('dbt_model_run_failure_total', 'Number of failed dbt model runs', ['model_name'])
failure_count_gauge = Gauge('dbt_model_failure_count', 'Current number of failures for dbt model', ['model_name'])
PROM_AGG_GATEWAY_URL = os.getenv("PROM_AGG_GATEWAY_URL") or "http://localhost/metrics/job/dbt_example"
def parse_run_results(run_results_json, manifest_json=None):
data = json.loads(run_results_json)
manifest = json.loads(manifest_json) if manifest_json else None
manifest_nodes = manifest.get('nodes', {}) if manifest else {}
results = []
for result in data.get('results', []):
unique_id = result.get('unique_id')
node = manifest_nodes.get(unique_id, {}) if manifest else {}
model_info = {
'model_name': unique_id,
'status': result.get('status'),
'execution_time': result.get('execution_time'),
'timing': result.get('timing'),
'adapter_response': result.get('adapter_response'),
'message': result.get('message'),
'materialization': node.get('config', {}).get('materialized') if node else None,
'resource_type': node.get('resource_type') if node else None,
'database': node.get('database') if node else None,
}
results.append(model_info)
return results
def push_dbt_metrics(context, run_results_path, manifest_path=None):
"""
Extracts metrics from dbt run_results and pushes to Prometheus Aggregation Gateway.
"""
try:
if not os.path.exists(run_results_path):
context.log.warning(f"{run_results_path} not found. Skipping metrics push.")
return False
with open(run_results_path) as f:
run_results_json = f.read()
manifest_json = None
if manifest_path and os.path.exists(manifest_path):
with open(manifest_path) as f:
manifest_json = f.read()
model_results = parse_run_results(run_results_json, manifest_json)
import dateutil.parser
# Only increment counters for models (not tests, snapshots, etc.), and only once per run
counted_success = set()
counted_failure = set()
# Track all model names for gauge
all_model_names = set()
for model in model_results:
model_name = model['model_name']
all_model_names.add(model_name)
execution_time = model.get('execution_time', 0) or 0
status = 1 if model.get('status') == 'success' else 0
execution_time_gauge.labels(model_name=model_name).set(execution_time)
status_gauge.labels(model_name=model_name).set(status)
# Only increment counters for resource_type == 'model'
if model.get('resource_type') == 'model':
if model.get('status') == 'success' and model_name not in counted_success:
run_success_counter.labels(model_name=model_name).inc()
counted_success.add(model_name)
elif model.get('status') != 'success' and model_name not in counted_failure:
run_failure_counter.labels(model_name=model_name).inc()
counted_failure.add(model_name)
# Export start and end timestamps if available
timing = model.get('timing') or []
started_at = None
completed_at = None
for t in timing:
if t.get('name') == 'execute':
started_at = t.get('started_at')
completed_at = t.get('completed_at')
if started_at:
try:
ts = int(dateutil.parser.isoparse(started_at).timestamp())
start_time_gauge.labels(model_name=model_name).set(ts)
except Exception as e:
context.log.warning(f"Could not parse start time for {model_name}: {e}")
if completed_at:
try:
ts = int(dateutil.parser.isoparse(completed_at).timestamp())
end_time_gauge.labels(model_name=model_name).set(ts)
except Exception as e:
context.log.warning(f"Could not parse end time for {model_name}: {e}")
# Export rows_affected if present
rows_affected = None
adapter_response = model.get('adapter_response')
if adapter_response and isinstance(adapter_response, dict):
rows_affected = adapter_response.get('rows_affected')
if rows_affected is not None:
try:
rows_affected_gauge.labels(model_name=model_name).set(rows_affected)
except Exception as e:
context.log.warning(f"Could not set rows_affected for {model_name}: {e}")
# Export error info if present
if model.get('status') != 'success' and model.get('message'):
try:
error_info.labels(model_name=model_name).info({'error': model.get('message')})
except Exception as e:
context.log.warning(f"Could not set error info for {model_name}: {e}")
# Set failure count gauge for all models (including zero)
for model_name in all_model_names:
# Get current failure count from Prometheus counter (if available)
# This is a best effort; Prometheus counters are monotonic, so we just set to 0 if not failed this run
count = 1 if model_name in counted_failure else 0
failure_count_gauge.labels(model_name=model_name).set(count)
# Push metrics to Prometheus Aggregation Gateway
try:
metrics_payload = generate_latest(REGISTRY)
resp = prom_requests.post(
PROM_AGG_GATEWAY_URL,
data=metrics_payload,
headers={"Content-Type": "text/plain"},
timeout=5
)
if resp.status_code == 202:
context.log.info("Metrics successfully pushed to Aggregation Gateway (202 Accepted)")
elif resp.status_code != 200:
context.log.warning(f"Failed to push metrics to Aggregation Gateway: {resp.status_code} {resp.text}")
except Exception as e:
context.log.warning(f"Exception pushing metrics to Aggregation Gateway: {e}")
except Exception as e:
context.log.warning(f"Exception in metrics extraction/push: {e}")
return True
@asset
def dbt_build_asset(context):
"""
Asset that runs `dbt build`, parses run_results, and pushes metrics to Prometheus Aggregation Gateway.
Metrics are pushed even if dbt build fails.
"""
project_dir = "dbt_example"
profiles_dir = os.path.join(project_dir, "config")
run_results_path = os.path.join(project_dir, "target", "run_results.json")
manifest_path = os.path.join(project_dir, "target", "manifest.json")
result = subprocess.run([
"dbt", "build", "--project-dir", project_dir, "--profiles-dir", profiles_dir
], capture_output=True, text=True)
context.log.info(result.stdout)
if result.returncode != 0:
context.log.error(result.stderr)
# Always try to push metrics, even if build failed
push_dbt_metrics(context, run_results_path, manifest_path)
if result.returncode != 0:
raise Exception(f"dbt build failed: {result.stderr}")
return True
After running Dagster and materializing your dbt assets, the metrics extraction process automatically kicks in, parsing the run_results.json
file and pushing the collected metrics to our aggregation gateway. Let's verify that our dbt metrics are being captured correctly by checking the gateway.
➜ curl http:
dbt_model_error_info{error="Database Error in model example_model (models/example_model.sql)\n syntax error at or near \"from\"\n LINE 18: from generate_series(1, 100) as t(id)\n ^\n compiled code at target/run/dbt_example/models/example_model.sql",job="dbt_example",model_name="model.dbt_example.example_model"} 3
dbt_model_error_info{error="Database Error in model second_model (models/second_model.sql)\n syntax error at or near \"from\"\n LINE 19: from \"dbt_metrics_example\".\"public\".\"example_model\"\n ^\n compiled code at target/run/dbt_example/models/second_model.sql",job="dbt_example",model_name="model.dbt_example.second_model"} 1
dbt_model_execution_completed_at{job="dbt_example",model_name="model.dbt_example.example_model"} 1.9297774722e+10
dbt_model_execution_completed_at{job="dbt_example",model_name="model.dbt_example.second_model"} 8.771719863e+09
dbt_model_execution_started_at{job="dbt_example",model_name="model.dbt_example.example_model"} 1.9297774721e+10
dbt_model_execution_started_at{job="dbt_example",model_name="model.dbt_example.second_model"} 8.771719863e+09
dbt_model_execution_time_seconds{job="dbt_example",model_name="model.dbt_example.example_model"} 1.3418488502502441
dbt_model_execution_time_seconds{job="dbt_example",model_name="model.dbt_example.second_model"} 0.25799131393432617
dbt_model_failure_count{job="dbt_example",model_name="model.dbt_example.example_model"} 0
dbt_model_failure_count{job="dbt_example",model_name="model.dbt_example.second_model"} 1
dbt_model_last_run_success{job="dbt_example",model_name="model.dbt_example.example_model"} 2
dbt_model_rows_affected{job="dbt_example",model_name="model.dbt_example.example_model"} 800
dbt_model_rows_affected{job="dbt_example",model_name="model.dbt_example.second_model"} 97
dbt_model_run_failure_created{job="dbt_example",model_name="model.dbt_example.example_model"} 5.263027589890698e+09
dbt_model_run_failure_created{job="dbt_example",model_name="model.dbt_example.second_model"} 1.7543443122349281e+09
dbt_model_run_failure_total{job="dbt_example",model_name="model.dbt_example.example_model"} 3
dbt_model_run_failure_total{job="dbt_example",model_name="model.dbt_example.second_model"} 1
dbt_model_run_success_created{job="dbt_example",model_name="model.dbt_example.example_model"} 1.4034747146853819e+10
dbt_model_run_success_created{job="dbt_example",model_name="model.dbt_example.second_model"} 7.017375557182798e+09
dbt_model_run_success_total{job="dbt_example",model_name="model.dbt_example.example_model"} 8
dbt_model_run_success_total{job="dbt_example",model_name="model.dbt_example.second_model"} 4
dbt_model_status{job="dbt_example",model_name="model.dbt_example.example_model"} 8
dbt_model_status{job="dbt_example",model_name="model.dbt_example.second_model"} 4
Now we can see the data flowing into our monitoring system. With dbt metrics successfully captured in Prometheus, it's time to build meaningful Grafana visualisations that provide actionable insights into your SQL transformation performance. Let's create dashboards that showcase model execution times, success rates, and resource utilisation patterns.

Complete Dagster Observability
We've now established comprehensive monitoring coverage for your entire Dagster deployment. Starting with system-level metrics and custom Python assets in Part 1, and now extending to dbt SQL transformations in Part 2, you have end-to-end visibility into your data pipeline performance.
What We've Accomplished
This two-part series has taken you from basic infrastructure monitoring to complete pipeline observability:
System Monitoring: Out-of-the-box Kubernetes metrics for your Dagster services
Custom Asset Instrumentation: Push-gateway patterns for ephemeral Python jobs
dbt Model Monitoring: Automated metric extraction from SQL transformations
Unified Dashboards: Grafana visualisations combining all metric sources
What This Opens Up
Having your Dagster metrics in Prometheus format unlocks powerful operational capabilities:
Consistent Alerting: Set up alerts using Prometheus as a datasource with the same alerting rules and thresholds you use for your infrastructure. Pipeline failures, slow transformations, and data quality issues can trigger the same incident response workflows as your application alerts.
Monitoring Stack Integration: Feed Prometheus stats into your existing monitoring systems - whether that's Datadog, New Relic, or custom solutions. Since virtually all modern monitoring platforms speak Prometheus, your Dagster metrics integrate seamlessly with your broader observability stack.
Cross-System Correlation: Most importantly, you can now correlate stats from your data pipelines with all other systems you already monitor with Prometheus. Identify when a slow dbt model correlates with database load spikes, or when pipeline failures coincide with infrastructure issues across your entire platform.
Moving Forward
Your Dagster deployment is no longer a black box. Whether you're debugging a failing transformation at 2 AM or optimising pipeline performance during business hours, you now have the observability tools needed to operate a production-grade data platform with confidence.
The investment in comprehensive monitoring pays dividends in operational excellence, faster incident response, and deeper understanding of your data pipeline's true performance characteristics.