How to set up a custom environment for your Airflow workers
If you need to run tasks on Airflow on a custom environment that comes with pre-installed libraries and tools, we recommend building your own custom docker image, upload it to a docker image repository such as dockerhub and reference it in your DAG's task operator.
Using the custom image in your DAGs
Every task in an Airflow DAG can use a different docker image. Operators accept an
executor_config
argument that can be used to customize the executor context.
Given that Datacoves runs Airflow on a kubernetes execution context, you need to pass a
dict
with a
pod_override
key that will override the worker pod's configuration, as seen in the
TRANSFORM_CONFIG
dict in the example below. The variable name for the Config dict will depend on what DAG task you are requesting more resources for.
eg) When writing your yaml, if you add the config under
marketing_automation
the
CONFIG
variable will be dynamically named
MARKETING_AUTOMATION_CONFIG
. In the yml examples below, we added the config in a transform task so the
CONFIG
variable is named
TRANSFORM_CONFIG
.
Python version
import datetime
from airflow.decorators import dag
from kubernetes.client import models as k8s
from operators.datacoves.bash import DatacovesBashOperator
TRANSFORM_CONFIG = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="transform",
# Replace with your image repo and tag
image="<IMAGE REPO>:<IMAGE TAG>",
bash_command="echo SUCCESS!",
)
]
)
),
}
@dag(
default_args={
"start_date": datetime.datetime(2023, 1, 1, 0, 0),
"owner": "Noel Gomez",
"email": "gomezn@example.com",
"email_on_failure": True,
},
description="Sample DAG with custom image",
schedule_interval="0 0 1 */12 *",
tags=["version_2"],
catchup=False,
yaml_sample_dag={
"schedule_interval": "0 0 1 */12 *",
"tags": ["version_4"],
"catchup": False,
"default_args": {
"start_date": datetime.datetime(2023, 1, 1, 0, 0),
"owner": "airflow",
"email": "some_user@exanple.com",
"email_on_failure": True,
},
},
)
def custommize_worker_dag():
transform = DatacovesBashOperator(
task_id="transform", executor_config=TRANSFORM_CONFIG
)
dag = customize_worker_dag()
YAML version
In the yml dag you can configure the image.
...
# DAG Tasks
nodes:
...
transform:
operator: operators.datacoves.bash.DatacovesBashOperator
type: task
config:
# Replace with your custom docker image <IMAGE REPO>:<IMAGE TAG>
image: <IMAGE REPO>:<IMAGE TAG>
bash_command: "echo SUCCESS!"
...