Airflow: Duplicate entry mysql integrity error when triggering a DAG run
Asked Answered
S

1

6

I have two Airflow DAGs - scheduler and worker. Scheduler runs every minute and polls for new aggregation jobs and triggers worker jobs. You can find the code for scheduler job below.

However out of over 6000 scheduler job runs 30 failed with the exception like this:

[2019-05-14 11:02:12,382] {models.py:1760} ERROR - (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'") [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] [parameters: ('run_query', 'worker', datetime.datetime(2019, 5, 14, 11, 2, 11, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 0, '', 'airflow', None, None, 'default', 1, None, None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.')]
Traceback (most recent call last)
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_contex
    context
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execut
    cursor.execute(statement, parameters
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 198, in execut
    res = self._query(query
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 304, in _quer
    db.query(q
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/connections.py", line 217, in quer
    _mysql.connection.query(self, query
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'"

Considering the vast majority of scheduler runs is ok I am half inclined to say that it's some kind of a race condition in Airflow when using manual/external trigger.

Has anybody experienced a similar issue?

class SchedulerOperator(BaseOperator):
    def __init__(self, **kwargs):
        super(SchedulerOperator, self).__init__(**kwargs)

    def execute(self, context):
        current_time = pytz.utc.localize(datetime.utcnow())
        execution_time = current_time.replace(microsecond=0)

        meta_service = MetaServiceIntegration()
        jobs = meta_service.poll_for_jobs()

        for job in jobs:
            run_id = "{0}-{1}-{2}_{3}".format(job["cdn"], job["dist_id"], job["iso_date"], execution_time.strftime(
                '%Y-%m-%dT%H:%M:%SZ'))
            self.log.info("Scheduling DAG {0}".format(run_id))
            conf = json.dumps({
                'job_id': job["job_id"],
                "src_version": job['src_version'],
                'cdn': job["cdn"],
                'dist_id': job["dist_id"],
                'iso_date': job["iso_date"]})
            self.log.info("DAG config {0}".format(conf))

            trigger_dag(
                dag_id='worker',
                run_id=run_id,
                conf=conf,
                execution_date=execution_time
            )

            # increment by 1 sec to guarantee unique execution times for the consecutive jobs
            execution_time = execution_time + timedelta(seconds=1)

with DAG(
        dag_id="scheduler",
        start_date=datetime(2019, 1, 1, 0, 0, 0, 0),
        schedule_interval="* * * * *",  # runs every minute
        default_view="graph",
        orientation="LR",
        concurrency=5,
        max_active_runs=1,
        catchup=False
) as dag:
    node = SchedulerOperator(
        task_id="schedule",
        dag=dag
    )

Surrebutter answered 15/5, 2019 at 10:40 Comment(1)
I am getting exactly the same error. Were you able to find the root cause and/or fix for this?Swashbuckling
T
0

It looks like this issue has to do with this open Airflow issue and this user had a fix they posted as an answer to this question that involves a changing the execution_date

Trahurn answered 19/8, 2019 at 22:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.