Postgres 9.5 upsert command in pandas or psycopg2?
Asked Answered
C

3

6

Most of the examples I see are people inserting a single row into a database with the ON CONFLICT DO UPDATE syntax.

Does anyone have any examples using SQLAlchemy or pandas.to_sql?

99% of my inserts are using psycopg2 COPY command (so I save a csv or stringio and then bulk insert), and the other 1% are pd.to_sql. All of my logic to check for new rows or dimensions is done in Python.

def find_new_rows(existing, current, id_col):
        current[id_col] = current[id_col].astype(int)
        x = existing[['datetime', id_col, 'key1']]
        y = current[['datetime', id_col, 'key2']]
        final = pd.merge(y, x, how='left', on=['datetime', id_col])
        final = final[~(final['key2'] == final['key1'])]
        final = final.drop(['key1'], axis=1)
        current = pd.merge(current, final, how='left', on=['datetime', id_col])
        current = current.loc[current['key2_y'] == 1]
        current.drop(['key2_x', 'key2_y'], axis=1, inplace=True)
        return current

Can someone show me an example of using the new PostgreSQL syntax for upsert with pyscopg2? A common use case is to check for dimension changes (between 50k - 100k rows daily which I compare to existing values) which is CONFLICT DO NOTHING to only add new rows.

Another use case is that I have fact data which changes over time. I only take the most recent value (I currently use a view to select distinct), but it would be better to UPSERT, if possible.

Chilung answered 9/3, 2017 at 12:26 Comment(0)
C
0

FYI, this is the solution I am using currently.

It seems to work fine for my purposes. I had to add a line to replace null (NaT) timestamps with None though, because I was getting an error when I was loading each row into the database.

def create_update_query(table):
    """This function creates an upsert query which replaces existing data based on primary key conflicts"""
    columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS])
    constraint = ', '.join([f'{col}' for col in PRIMARY_KEY])
    placeholder = ', '.join([f'%({col})s' for col in DATABASE_COLUMNS])
    updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in DATABASE_COLUMNS])
    query = f"""INSERT INTO {table} ({columns}) 
                VALUES ({placeholder}) 
                ON CONFLICT ({constraint}) 
                DO UPDATE SET {updates};"""
    query.split()
    query = ' '.join(query.split())
    return query


def load_updates(df, table, connection):
    conn = connection.get_conn()
    cursor = conn.cursor()
    df1 = df.where((pd.notnull(df)), None)
    insert_values = df1.to_dict(orient='records')
    for row in insert_values:
        cursor.execute(create_update_query(table=table), row)
        conn.commit()
    row_count = len(insert_values)
    logging.info(f'Inserted {row_count} rows.')
    cursor.close()
    del cursor
    conn.close()
Chilung answered 12/7, 2017 at 11:42 Comment(3)
may i know what does this mean f'%({col})s'Prohibitive
Using Python format string and the col variable in a list comprehension. So, the variable columns is generated from a list of all of the columns I have in the table. It takes those values and adds a comma (column1, column2, column3...). In the end, the function generates a long query which specifies which columns are being replaced, which are unique, etc.Chilung
You can try this in python (3.6+): DATABASE_COLUMNS = ['column1', 'column2', 'column3'], columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS]), query = f"""INSERT INTO {table} ({columns})""", print(query)Chilung
R
2

Here is my code for bulk insert & insert on conflict update query for postgresql from pandas dataframe:

Lets say id is unique key for both postgresql table and pandas df and you want to insert and update based on this id.

import pandas as pd
from sqlalchemy import create_engine, text

engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f""" 
                INSERT INTO schema.table(name, title, id)
                VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
                ON CONFLICT (id)
                DO  UPDATE SET name= excluded.name,
                               title= excluded.title
         """)
engine.execute(query)

Make sure that your df columns must be same order with your table.

Reticular answered 1/12, 2020 at 8:13 Comment(0)
C
0

FYI, this is the solution I am using currently.

It seems to work fine for my purposes. I had to add a line to replace null (NaT) timestamps with None though, because I was getting an error when I was loading each row into the database.

def create_update_query(table):
    """This function creates an upsert query which replaces existing data based on primary key conflicts"""
    columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS])
    constraint = ', '.join([f'{col}' for col in PRIMARY_KEY])
    placeholder = ', '.join([f'%({col})s' for col in DATABASE_COLUMNS])
    updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in DATABASE_COLUMNS])
    query = f"""INSERT INTO {table} ({columns}) 
                VALUES ({placeholder}) 
                ON CONFLICT ({constraint}) 
                DO UPDATE SET {updates};"""
    query.split()
    query = ' '.join(query.split())
    return query


def load_updates(df, table, connection):
    conn = connection.get_conn()
    cursor = conn.cursor()
    df1 = df.where((pd.notnull(df)), None)
    insert_values = df1.to_dict(orient='records')
    for row in insert_values:
        cursor.execute(create_update_query(table=table), row)
        conn.commit()
    row_count = len(insert_values)
    logging.info(f'Inserted {row_count} rows.')
    cursor.close()
    del cursor
    conn.close()
Chilung answered 12/7, 2017 at 11:42 Comment(3)
may i know what does this mean f'%({col})s'Prohibitive
Using Python format string and the col variable in a list comprehension. So, the variable columns is generated from a list of all of the columns I have in the table. It takes those values and adds a comma (column1, column2, column3...). In the end, the function generates a long query which specifies which columns are being replaced, which are unique, etc.Chilung
You can try this in python (3.6+): DATABASE_COLUMNS = ['column1', 'column2', 'column3'], columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS]), query = f"""INSERT INTO {table} ({columns})""", print(query)Chilung
H
0

For my case, I wrote to a temporary table first, then merged the temp table into the actual table I wanted to upsert to. Performing the upsert this way avoids any conflicts where the strings may have single quotes in them.

    def upsert_dataframe_to_table(self, table_name: str, df: pd.DataFrame, schema: str, id_col:str):
    """
    Takes the given dataframe and inserts it into the table given. The data is inserted unless the key for that
    data already exists in the dataframe. If the key already exists, the data for that key is overwritten.

    :param table_name: The name of the table to send the data
    :param df: The dataframe with the data to send to the table
    :param schema: the name of the schema where the table exists
    :param id_col: The name of the primary key column
    :return: None
    """
    engine = create_engine(
        f'postgresql://{postgres_configs["username"]}:{postgres_configs["password"]}@{postgres_configs["host"]}'
        f':{postgres_configs["port"]}/{postgres_configs["db"]}'
    )
    df.to_sql('temp_table', engine, if_exists='replace')
    updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in df.columns if col != id_col])
    columns = ', '.join([f'{col}' for col in df.columns])
    query = f'INSERT INTO "{schema}".{table_name} ({columns}) ' \
            f'SELECT {columns} FROM temp_table ' \
            f'ON CONFLICT ({id_col}) DO ' \
            f'UPDATE SET {updates} '

    self.cursor.execute(query)
    self.cursor.execute('DROP TABLE temp_table')
    self.conn.commit()
Horner answered 6/4, 2022 at 12:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.