Edit on github

How to Dynamically set the schedule Interval

By default, DAGs are created with a paused state in Airflow, but you can change this with the is_paused_on_creation=True option. However, you will likely not want to schedule DAGs in a development Airflow instance. The steps below describe how do not set a schedule in a Development Airflow instance.

You can dynamically set the DAG schedule based on your Datacoves environment (development or production). By using a function called get_schedule, you can ensure that the correct schedule is applied only in the production Airflow instance.

Here is how to achieve this:

Step 1: Create a get_schedule.py file inside of orchestrate/utils

Step 2: Paste the following code: Note: Find your environment slug here

# get_schedule.py
import os
from typing import Union

DEV_ENVIRONMENT_SLUG = "dev123" # Replace with your environment slug

def get_schedule(default_input: Union[str, None]) -> Union[str, None]:
    """
    Sets the application's schedule based on the current environment setting. Allows you to
    set the the default for dev to none and the the default for prod to the default input.

    This function checks the Datacoves Slug through 'DATACOVES__ENVIRONMENT_SLUG' variable to determine
    if the application is running in a specific environment (e.g., 'dev123'). If the application
    is running in the 'dev123' environment, it indicates that no schedule should be used, and
    hence returns None. For all other environments, the function returns the given 'default_input'
    as the schedule.

   Parameters:
    - default_input (Union[str, None]): The default schedule to return if the application is not
      running in the dev environment.

    Returns:
    - Union[str, None]: The default schedule if the environment is not 'dev123'; otherwise, None,
      indicating that no schedule should be used in the dev environment.
    """
    env_slug = os.environ.get("DATACOVES__ENVIRONMENT_SLUG", "").lower()
    if env_slug == DEV_ENVIRONMENT_SLUG:
        return None
    else:
        return default_input

Step 3: In your DAG, import the get_schedule function using from orchestrate.utils.get_schedule import get_schedule and pass in your desired schedule.

ie) If your desired schedule is '0 1 * * *' then you will set schedule=get_schedule('0 1 * * *') as seen in the example below.

from airflow.decorators import dag, task
from pendulum import datetime
from orchestrate.utils.get_schedule import get_schedule

@dag(
    default_args={
        "start_date": datetime(2022, 10, 10),
        "owner": "Noel Gomez",
        "email": "gomezn@example.com",
        "email_on_failure": True,
    },
    is_paused_on_creation=True, 
    catchup=False,
    tags=["version_8"],
    description="Datacoves Sample DAG",
    schedule=get_schedule('0 1 * * *'),  # Replace with desired schedule
)
def datacoves_sample_dag():

    @task.datacoves_dbt(connection_id="main")
    def run_dbt_task():
        return "dbt debug"

    run_dbt_task()

datacoves_sample_dag()