To set up the connections and variables in airflow i use a DAG, we do this inorder to setup airflow fast in case we have to setup everything again fast. It does work my connections and variables show up but the task "fails". The error is saying that there is already an sql_path variable
[2018-03-30 19:42:48,784] {{models.py:1595}} ERROR - (psycopg2.IntegrityError) duplicate key value violates unique constraint "variable_key_key"
DETAIL: Key (key)=(sql_path) already exists.
[SQL: 'INSERT INTO variable (key, val, is_encrypted) VALUES (%(key)s, %(val)s, %(is_encrypted)s) RETURNING variable.id'] [parameters: {'key': 'sql_path', 'val': 'gAAAAABavpM46rWjISLZRRKu4hJRD7HFKMuXMpmJ5Z3DyhFbFOQ91cD9NsQsYyFof_pdPn116d6yNoNoOAqx_LRqMahjbYKUqrhNRiYru4juPv4JEGAv2d0=', 'is_encrypted': True}] (Background on this error at: http://sqlalche.me/e/gkpj)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
context)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 507, in do_execute
cursor.execute(statement, parameters)
psycopg2.IntegrityError: duplicate key value violates unique constraint "variable_key_key"
DETAIL: Key (key)=(sql_path) already exists.
However I checked and before I run the DAG the addhoc query SELECT * FROM variable
returns nothing and afterwards it returns my two variables.
I checked that I don't create the variable twice but I don't think so. Here you see the part of the dag creating the path variables
import airflow
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.settings import Session
import logging
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'provide_context': True
}
def init_staging_airflow():
logging.info('Creating connections, pool and sql path')
session = Session()
new_var = models.Variable()
new_var.key = "sql_path"
new_var.set_val("/usr/local/airflow/sql")
session.add(new_var)
session.commit()
new_var = models.Variable()
new_var.key = "conf_path"
new_var.set_val("/usr/local/airflow/conf")
session.add(new_var)
session.commit()
session.add(new_pool)
session.commit()
session.close()
dag = airflow.DAG(
'init_staging_airflow',
schedule_interval="@once",
default_args=args,
max_active_runs=1)
t1 = PythonOperator(task_id='init_staging_airflow',
python_callable=init_staging_airflow,
provide_context=False,
dag=dag)