Following on from this question, when I try to create a postgresql table from a dask.dataframe with more than one partition I get the following error:
IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "pg_type_typname_nsp_index"
DETAIL: Key (typname, typnamespace)=(test1, 2200) already exists.
[SQL: '\nCREATE TABLE test1 (\n\t"A" BIGINT, \n\t"B" BIGINT, \n\t"C" BIGINT, \n\t"D" BIGINT, \n\t"E" BIGINT, \n\t"F" BIGINT, \n\t"G" BIGINT, \n\t"H" BIGINT, \n\t"I" BIGINT, \n\t"J" BIGINT, \n\tidx BIGINT\n)\n\n']
You can recreate the error with the following code:
import numpy as np
import dask.dataframe as dd
import dask
import pandas as pd
import sqlalchemy_utils as sqla_utils
import sqlalchemy as sqla
DATABASE_CONFIG = {
'driver': '',
'host': '',
'user': '',
'password': '',
'port': 5432,
}
DBNAME = 'dask'
url = '{driver}://{user}:{password}@{host}:{port}/'.format(
**DATABASE_CONFIG)
db_url = url.rstrip('/') + '/' + DBNAME
# create db if non-existent
if not sqla_utils.database_exists(db_url):
print('Creating database \'{}\''.format(DBNAME))
sqla_utils.create_database(db_url)
conn = sqla.create_engine(db_url)
# create pandas df with random numbers
df = pd.DataFrame(np.random.randint(0,40,size=(100, 10)), columns=list('ABCDEFGHIJ'))
# add index so that it can be used as primary key later on
df['idx'] = df.index
# create dask df
ddf = dd.from_pandas(df, npartitions=4)
# Write to psql
dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'test', db_url, if_exists='append', index=False, index_label='idx')
for d in ddf.to_delayed()]
dask.compute(*out)
The code doesn't produce an error if npartitions is set to 1. So I'm guessing it has to do with postgres not being able to handle parallel requests to write to a same sql table...? How can I fix this?