Edit on github

Run Airbyte sync jobs

In our quest to simplify the way the different tools integrate together in the Modern Data Stack, we developed the generate airflow-dags command in the dbt-coves extension.

The main idea behind this concept is to use tags defined on dbt sources and determine which data to load via different tools (e.g. Airbyte or Fivetran). Using this information, we can dynamically create Extract and Load tasks in an Airflow DAG before running dbt.

Note

Support for Fivetran Tasks coming soon. More Information in run Fivetran sync jobs .

Before you start

Ensure your Airflow environment is properly configured

Follow this guide on How to set up Airflow 's environment.

Airbyte connection

As Airflow will need to retrieve metadata from the Airbyte's server, you need to set up a connection in Airflow.

A user with Airflow admin privileges must go to the Airflow Admin -> Connections menu.

Admin Connections

There they create a new connection using the following details:

Admin Connections

Where host is created using your environment (3 letters + 3 digits like xyz123) <environment slug> + "-airbyte-airbyte-server-svc" .

Turn off Airbyte's scheduler

To avoid conflicts between Airflow triggering Airbyte jobs and Airbyte scheduling its own jobs at the same time, we suggest you set replication frequency to manual on each Airbyte connection that will be triggered by Airflow:

Replication frequency

Generate DAG's from yml with dbt-coves

To connect Extract & Load with Transform in your DAG, you must configure your dbt-coves config file. We recommend the path to be transform/.dbt-coves/config.yml .

Field reference:

  • yml_path : Relative path to dbt project where yml to generate python DAGs will be stored
  • dags_path : Relative path to dbt project where generated python DAGs will be stored
  • dbt_project_path : Relative path to dbt project, used to run dbt ls to discover sources
  • airbyte_connection_id : Id of the airflow connection that holds the information to connect to Airbyte system. (this was set up above)

Tip

We make use of environment variables that we have configured for you upon set up. For more information on these variables please see Datacoves Environment Variables

generate:
  ...
  airflow_dags:
    # source location for yml files
    yml_path: "/config/workspace/{{ env_var('DATACOVES__AIRFLOW_DAGS_YML_PATH') }}"

    # destination for generated python dags
    dags_path: "/config/workspace/{{ env_var('DATACOVES__AIRFLOW_DAGS_PATH') }}"

    generators_params:
      AirbyteDbtGenerator:
        # Airbyte server
        host: "{{ env_var('DATACOVES__AIRBYTE_HOST_NAME') }}"
        port: "{{ env_var('DATACOVES__AIRBYTE_PORT') }}"

        # Aiflow Connection
        airbyte_conn_id: airbyte_connection

        # dbt project location for dbt ls
        dbt_project_path: "{{ env_var('DATACOVES__DBT_HOME') }}"
         # Optional
        run_dbt_compile: true
        run_dbt_deps: false

Example YML DAG

Now you are ready to write out your DAG using yml. In the following example DAG, you can notice a special task load that uses a generator instead of an operator . This will allow for the information to be pulled dynamically from airbyte such as connection_id.

Tip

We make use of special generators from the dbt-coves extension. For more information please see DAG Generators

Field reference:

  • generator : The Airbyte Tasks Generator uses the value AirbyteDbtGenerator .
  • dbt_list_args : arguments sent to dbt ls to retrieve the dbt project sources used to retrieve Airbyte connections. The AirbyteDbtGenerator generator will find the Airbyte connections to trigger using dbt sources's database, schema and table name.

YML version

description: "Loan Run"
schedule: "0 0 1 */12 *"
tags:
  - version_1
default_args:
  start_date: 2021-01
catchup: false

nodes:
  extract_and_load_airbyte:
    generator: AirbyteDbtGenerator
    type: task_group

    tooltip: "Airbyte Extract and Load"
    # The daily_run_airbyte tag must be set in the source.yml
    dbt_list_args: "--select tag:daily_run_airbyte"

  transform:
    operator: operators.datacoves.bash.DatacovesBashOperator
    type: task

    bash_command: "dbt build -s 'tag:daily_run_airbyte+'"
    dependencies: ["extract_and_load_airbyte"]