Edit on github

Use Microsoft Azure Data Factory Operators

You can use Airflow in Datacoves to trigger a Microsoft Azure Data Factory pipeline. This guide will walk you through the configuration process.

Prerequisites

  • You will need to set up a Microsoft Entra Application .
  • Assign the Data Factory Contributor role to your Microsoft Entra Application. You can do this by heading into Resource Groups and then following these instructions .
  • Collect the following values from your ADF account, more information on where to find these items in the next section:
    • DATA_FACTORY_NAME
    • RESOURCE_GROUP_NAME
    • SUBSCRIPTION_ID
    • APPLICATION_CLIENT_ID
    • TENANT_ID
    • CLIENT_SECRET

How to get the ADF information

Step 1: Login to your Microsoft Azure console and navigate to the Data Factories service .

Step 2: Copy the DATA_FACTORY_NAME for the factory which holds your data pipeline.

Step 3: Open the factory and copy the RESOURCE_GROUP_NAME , and the SUBSCRIPTION_ID from the Overview tab

Step 4: Navigate to the Microsoft Entra ID service, click on number next to Applicaitons on the Overview tab. Next click the All Applications tab, open the application or register a new application then open it and copy the APPLICATION_CLIENT_ID and Directory TENANT_ID .

Step 5: Click on Certificates and Secrets and generate a new secret for your Microsoft Entra Application and copy the Value . This is the CLIENT_SECRET .

Create a Microsoft Azure Data Factory Connection in Airflow

Step 1: In Datacoves, a user with the securityadmin role must go to the Airflow Admin -> Connection menu.

Airflow Connection

Step 2: Create a new connection using the following details.

  • Connection Id: azure_data_factory_default <- this name will be used in the Airflow DAG and is the default name used by the ADF Operator

  • Connection Type: Azure Data Factory

  • Client ID: Your APPLICATION_CLIENT_ID

  • Secret: Your CLIENT_SECRET

  • Tenant ID: Your TENANT_ID

  • Factory Name : Your DATA_FACTORY_NAME

  • Resource Group Name : Your RESOURCE_GROUP_NAME

  • Subscription ID: Your SUBSCRIPTION_ID

Note

Replace the values in the screenshot below with the actual values found above.

adf connection

Example DAG

[!NOTE] You will need to update the pipeline_name , resource_group_name , and factory_name arguments below with the correct names.

Once you have configured your Databricks connection and variables, you are ready to create your DAG. Head into the Transform tab to begin writing your DAG inside orchestrate/dags .

"""Example Airflow Azure Data Factory DAG."""

from datetime import datetime
from airflow.decorators import dag, task_group
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor

@dag(
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    tags=["version_1"],
    catchup=False,
    default_args={
        "azure_data_factory_conn_id": "azure_data_factory_default",
        "factory_name": "your-factory-name", 
        "resource_group_name": "your-resource-name",
    },
)
def adf_example_run():
    """Run an Azure Data Factory pipeline with async status checking."""

    @task_group(group_id="adf_pipeline_group", tooltip="ADF Pipeline Group")
    def adf_pipeline_tasks():
        run_pipeline = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline",
            pipeline_name="myTestPipeline",  # Rename to your Pipeline name
            parameters={"myParam": "value"},
            wait_for_termination=False,
        )

        # Deferrable sensor for async pipeline status checking
        pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_async_sensor",
            run_id=run_pipeline.output["run_id"],
            deferrable=True,
        )

        run_pipeline >> pipeline_run_async_sensor

    adf_pipeline_group = adf_pipeline_tasks()

DAG = adf_example_run()

Understanding the Airflow DAG