duplicate key value violates unique constraint when adding path variable in airflow dag
Asked Answered
C

4

2

To set up the connections and variables in airflow i use a DAG, we do this inorder to setup airflow fast in case we have to setup everything again fast. It does work my connections and variables show up but the task "fails". The error is saying that there is already an sql_path variable

[2018-03-30 19:42:48,784] {{models.py:1595}} ERROR - (psycopg2.IntegrityError) duplicate key value violates unique constraint "variable_key_key"
DETAIL:  Key (key)=(sql_path) already exists.
 [SQL: 'INSERT INTO variable (key, val, is_encrypted) VALUES (%(key)s, %(val)s, %(is_encrypted)s) RETURNING variable.id'] [parameters: {'key': 'sql_path', 'val': 'gAAAAABavpM46rWjISLZRRKu4hJRD7HFKMuXMpmJ5Z3DyhFbFOQ91cD9NsQsYyFof_pdPn116d6yNoNoOAqx_LRqMahjbYKUqrhNRiYru4juPv4JEGAv2d0=', 'is_encrypted': True}] (Background on this error at: http://sqlalche.me/e/gkpj)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 507, in do_execute
    cursor.execute(statement, parameters)
psycopg2.IntegrityError: duplicate key value violates unique constraint "variable_key_key"
DETAIL:  Key (key)=(sql_path) already exists.

However I checked and before I run the DAG the addhoc query SELECT * FROM variable returns nothing and afterwards it returns my two variables.

I checked that I don't create the variable twice but I don't think so. Here you see the part of the dag creating the path variables

import airflow
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.settings import Session
import logging


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'provide_context': True
}


def init_staging_airflow():
    logging.info('Creating connections, pool and sql path')

    session = Session()

    new_var = models.Variable()
    new_var.key = "sql_path"
    new_var.set_val("/usr/local/airflow/sql")
    session.add(new_var)
    session.commit()

    new_var = models.Variable()
    new_var.key = "conf_path"
    new_var.set_val("/usr/local/airflow/conf")
    session.add(new_var)
    session.commit()

    session.add(new_pool)
    session.commit()

    session.close()

dag = airflow.DAG(
    'init_staging_airflow',
    schedule_interval="@once",
    default_args=args,
    max_active_runs=1)

t1 = PythonOperator(task_id='init_staging_airflow',
                    python_callable=init_staging_airflow,
                    provide_context=False,
                    dag=dag)
Crass answered 30/3, 2018 at 21:6 Comment(0)
B
4

I ran into the same problem when trying to do Variable.set() inside a DAG. I believe the scheduler will constantly poll the DagBag to refresh any changes dynamically. That's why you see a ton of these when running the webserver:

[2018-04-02 11:28:41,531] [45914] {models.py:168} INFO - Filling up the DagBag from /Users/jasontang/XXX/data-server/dags

Sooner or later you'll hit the key constraint: enter image description here

What I did was to set all my variables that I need to set at runtime into a global dictionary ("VARIABLE_DICT" in the example below), and just allow all my DAGs and sub-DAGs access it.

def initialize(dag_run_obj):
    global VARIABLE_DICT
    if dag_run_obj.external_trigger:
        VARIABLE_DICT.update(dag_run_obj.conf)
        values = (dag_run_obj.conf['client'],
                  dag_run_obj.conf['vertical'],
                  dag_run_obj.conf['frequency'],
                  dag_run_obj.conf.get('snapshot'))
        config_file = '{0}-{1}/{0}-{1}-{2}.json'.format(*values)
        path = os.path.join(Variable.get('repo_root'), 'conf', config_file)
        VARIABLE_DICT.update(read_config(path))

You could ignore the dag_run_obj part, since I specifically look for any additional configuration values provided to the DAG Run when it is created. In your other DAGs and subDAGs just import the dictionary.

Blower answered 2/4, 2018 at 15:56 Comment(2)
I'm not sure if it is really related to the scheduler, because if you actually run only the web server with another database than PostgreSQL in LocalExecutor for example, you will get the same error.Ugrian
Where did you define VARIABLE_DICT?Mulch
L
0

justang is correct, the reason this is happening is because the scheduler executes your dag every time the scheduler runs (the scheduler runs frequently to check to see if your DAGs have changed, if they need to be started etc.).

Lutero answered 5/4, 2018 at 12:8 Comment(0)
V
0

I fixed this one by calling Variable.delete() every time before Variable.set().

Vizier answered 18/8, 2020 at 21:5 Comment(1)
I tried this too. and it is still happeningEuphoria
T
0

I had this same issue. The cause was that someone had inserted rows into the airflow database manually. This meant the autonumber was trying to insert a row with id=8 when an id of 8 already existed. Possibly not the cause of your problem but could help someone. I found the issue by looking through postgres logs. The solution for postgres was this command below. For mysql I know you can insert a row with the correct id and then delete that row.

ALTER SEQUENCE public.connection_id_seq RESTART WITH 19;
Truncheon answered 5/4 at 5:46 Comment(1)
Thinking about it this problem likely won't present itself with mysql.Truncheon

© 2022 - 2024 — McMap. All rights reserved.