I have faced the same problem. And there is no solution out of the box, but we can write a custom operator for it.
So here the code of a custom operator, that get python_callable
and trigger_dag_id
as arguments:
class TriggerMultiDagRunOperator(TriggerDagRunOperator):
@apply_defaults
def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
def execute(self, context):
session = settings.Session()
created = False
for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
if not dro or not isinstance(dro, DagRunOrder):
break
if dro.run_id is None:
dro.run_id = 'trig__' + datetime.utcnow().isoformat()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True
)
created = True
self.log.info("Creating DagRun %s", dr)
if created is True:
session.commit()
else:
self.log.info("No DagRun created")
session.close()
trigger_dag_id
is dag id what we want running multiple times.
python_callable
is a function, it should return a list of DagRunOrder
objects, one object for schedule one instance of DAG with dag_id trigger_dag_id
.
Code and examples on GitHub: https://github.com/mastak/airflow_multi_dagrun
Little bit more description about this code: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13