Skip to main content

Retry a dbt task

Overview

Retrying failed dbt models is a common workflow requirement when working with data transformations. This guide explains how to implement dbt task retry functionality in Airflow using Datacoves' custom datacoves_dbt decorator.

Prerequisites

  • Datacoves version 3.4 or later
  • dbt API feature enabled in your environment (contact support for further assistance)

How dbt Retries Work

The retry mechanism works by:

  1. Capturing results of a dbt run including any failures
  2. Storing these results using the dbt API
  3. Retrieving the previous run state when a retry is initiated
  4. Selectively running only the failed models and their downstream dependencies

Implementing dbt Retries

Step 1: Configure the datacoves_dbt Decorator

When defining your task, enable the necessary parameters for retries:

@task.datacoves_dbt(
connection_id="your_connection",
dbt_api_enabled=True, # Enable dbt API functionality
download_run_results=True, # Allow downloading previous run results
)

Step 2: Add Conditional Logic for Retry

Implement logic in your task function to check for existing results and execute the appropriate dbt command:

@task.datacoves_dbt(
connection_id="your_connection",
dbt_api_enabled=True,
download_run_results=True,
)
def dbt_build(expected_files: list = []):
if expected_files:
return "dbt build -s result:error+ --state logs"
else:
return "dbt build -s your_models+"

Step 3: Call the Task with Expected Files Parameter

dbt_build(expected_files=["run_results.json"])

Complete Example

Here's a complete DAG implementation:

"""
## Retry dbt Example
This DAG demonstrates how to retry a DAG that fails during a run
"""

from airflow.decorators import dag, task
from orchestrate.utils import datacoves_utils


@dag(
doc_md = __doc__,
catchup = False,
default_args=datacoves_utils.set_default_args(
owner = "Your Name",
owner_email = "your.email@example.com"
),
schedule = datacoves_utils.set_schedule("0 0 1 */12 *"),
description="Sample DAG demonstrating how to run the dbt models that fail",
tags=["dbt_retry"],
)
def retry_dbt_failures():
@task.datacoves_dbt(
connection_id="your_connection",
dbt_api_enabled=True,
download_run_results=True,
)
def dbt_build(expected_files: list = []):
if expected_files:
return "dbt build -s result:error+ --state logs"
else:
return "dbt build -s model_a+ model_b+"

dbt_build(expected_files=["run_results.json"])

retry_dbt_failures()