I have two Airflow DAGs - scheduler and worker. Scheduler runs every minute and polls for new aggregation jobs and triggers worker jobs. You can find the code for scheduler job below.
However out of over 6000 scheduler job runs 30 failed with the exception like this:
[2019-05-14 11:02:12,382] {models.py:1760} ERROR - (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'") [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] [parameters: ('run_query', 'worker', datetime.datetime(2019, 5, 14, 11, 2, 11, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 0, '', 'airflow', None, None, 'default', 1, None, None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.')]
Traceback (most recent call last)
File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_contex
context
File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execut
cursor.execute(statement, parameters
File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 198, in execut
res = self._query(query
File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 304, in _quer
db.query(q
File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/connections.py", line 217, in quer
_mysql.connection.query(self, query
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'"
Considering the vast majority of scheduler runs is ok I am half inclined to say that it's some kind of a race condition in Airflow when using manual/external trigger.
Has anybody experienced a similar issue?
class SchedulerOperator(BaseOperator):
def __init__(self, **kwargs):
super(SchedulerOperator, self).__init__(**kwargs)
def execute(self, context):
current_time = pytz.utc.localize(datetime.utcnow())
execution_time = current_time.replace(microsecond=0)
meta_service = MetaServiceIntegration()
jobs = meta_service.poll_for_jobs()
for job in jobs:
run_id = "{0}-{1}-{2}_{3}".format(job["cdn"], job["dist_id"], job["iso_date"], execution_time.strftime(
'%Y-%m-%dT%H:%M:%SZ'))
self.log.info("Scheduling DAG {0}".format(run_id))
conf = json.dumps({
'job_id': job["job_id"],
"src_version": job['src_version'],
'cdn': job["cdn"],
'dist_id': job["dist_id"],
'iso_date': job["iso_date"]})
self.log.info("DAG config {0}".format(conf))
trigger_dag(
dag_id='worker',
run_id=run_id,
conf=conf,
execution_date=execution_time
)
# increment by 1 sec to guarantee unique execution times for the consecutive jobs
execution_time = execution_time + timedelta(seconds=1)
with DAG(
dag_id="scheduler",
start_date=datetime(2019, 1, 1, 0, 0, 0, 0),
schedule_interval="* * * * *", # runs every minute
default_view="graph",
orientation="LR",
concurrency=5,
max_active_runs=1,
catchup=False
) as dag:
node = SchedulerOperator(
task_id="schedule",
dag=dag
)