Running Airflow DAG Only If Another DAG Is Successful


Using Airflow sensors to control the execution of DAGs on a different schedule

Image generated by DALL-E2

Recently, I’ve been trying to coordinate two Airflow DAGs such that one would only run — on its own hourly schedule — if the other DAG (running on a daily basis) has been successful.

In today’s tutorial I will walk you through the use case and demonstrate how to achieve the desired behaviour in three different ways; two using the ExternalTaskSensor and another one using a customised approach with PythonOperator.

Now let’s get started with our use case that involves two Airflow DAGs.

The first DAG, my_daily_dag, runs every day at 5AM UTC.

from datetime import datetime, timedelta
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator

with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 5 * * *',
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')

The second DAG, my_hourly_dag, runs on an hourly basis, between 6AM and 8PM UTC.

from datetime import datetime, timedelta
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator

with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 every hour between 6AM-8PM
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')

In our use case, we would like my_hourly_dag to run only if my_daily_dag has ran successfully within the current date. If not, then my_hourly_dag should be skipped. It is important to mention here that we don’t want to trigger my_hourly_dag as soon as my_daily_dag succeeds. That would be achievable with TriggerDagRun



Source link

Leave a Comment