Bulk update of rows in Postgres DB using psycopg2
Asked Answered
S

2

12

We need to do bulk updates of many rows in our Postgres DB, and want to use the SQL syntax below. How do we do that using psycopg2?

UPDATE table_to_be_updated
SET msg = update_payload.msg
FROM (VALUES %(update_payload)s) AS update_payload(id, msg)
WHERE table_to_be_updated.id = update_payload.id
RETURNING *

Attempt 1 - Passing values

We need to pass a nested iterable format to the psycopg2 query. For the update_payload, I've tried passing a list of lists, list of tuples, and tuples of tuples. It all fails with various errors.

Attempt 2 - Writing custom class with __conform__

I've tried to write a custom class that we can use for these operations, which would return

(VALUES (row1_col1, row1_col2), (row2_col1, row2_col2), (...))

I've coded up like this following instructions here, but it's clear that I'm doing something wrong. For instance, in this approach I'll have to handle quoting of all values inside the table, which would be cumbersome and prone to errors.

class ValuesTable(list):
    def __init__(self, *args, **kwargs):
        super(ValuesTable, self).__init__(*args, **kwargs)

    def __repr__(self):
        data_in_sql = ""
        for row in self:
            str_values = ", ".join([str(value) for value in row])
            data_in_sql += "({})".format(str_values)
        return "(VALUES {})".format(data_in_sql)

    def __conform__(self, proto):
        return self.__repr__()

    def getquoted(self):
        return self.__repr__()

    def __str__(self):
        return self.__repr__()

EDIT: If doing a bulk update can be done in a faster/cleaner way using another syntax than the one in my original question, then I'm all ears!

Salvatore answered 20/9, 2019 at 17:43 Comment(3)
What errors were you getting with the first approach? How were you executing the sql? If there are a large number of values, it may be easier to insert them into a temporary table and update from there.Coleville
I don't get what you want to do but from the psycopg2 docs you have to pass in parameters as a sequence. Does executing your query (VALUES (%s, %s), (%s, %s), ...) with ['tbl1', 'msg1', 'tbl2', 'msg2'] as the named argument vars help you already?Collaboration
I see. I've updated the answer to make it more clear that I care less about doing it the exact way I had written it up. The important thing is to do a bulk update in a clean/fast way.Salvatore
P
24

Requirements:

  • Postgres table, consisting of the fields id and msg (and potentially other fields)
  • Python data containing new values for msg
  • Postgres table should be updated via psycopg2

Example Table

CREATE TABLE einstein(
   id CHAR(5) PRIMARY KEY,
   msg VARCHAR(1024) NOT NULL
);

Test data

INSERT INTO einstein VALUES ('a', 'empty');
INSERT INTO einstein VALUES ('b', 'empty');
INSERT INTO einstein VALUES ('c', 'empty');

Python Program

Hypothetical, self-contained example program with quotations of a famous physicist.

import sys
import psycopg2
from psycopg2.extras import execute_values


def print_table(con):
    cur = con.cursor()
    cur.execute("SELECT * FROM einstein")
    rows = cur.fetchall()
    for row in rows:
        print(f"{row[0]} {row[1]}")


def update(con, einstein_quotes):
    cur = con.cursor()
    execute_values(cur, """UPDATE einstein 
                           SET msg = update_payload.msg 
                           FROM (VALUES %s) AS update_payload (id, msg) 
                           WHERE einstein.id = update_payload.id""", einstein_quotes)
    con.commit()


def main():
    con = None
    einstein_quotes = [("a", "Few are those who see with their own eyes and feel with their own hearts."),
                       ("b", "I have no special talent. I am only passionately curious."),
                       ("c", "Life is like riding a bicycle. To keep your balance you must keep moving.")]

    try:
        con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
        print_table(con)
        update(con, einstein_quotes)
        print("rows updated:")
        print_table(con)

    except psycopg2.DatabaseError as e:

        print(f'Error {e}')
        sys.exit(1)

    finally:

        if con:
            con.close()


if __name__ == '__main__':
    main()

Prepared Statements Alternative

import sys
import psycopg2
from psycopg2.extras import execute_batch


def print_table(con):
    cur = con.cursor()
    cur.execute("SELECT * FROM einstein")
    rows = cur.fetchall()
    for row in rows:
        print(f"{row[0]} {row[1]}")


def update(con, einstein_quotes, page_size):
    cur = con.cursor()
    cur.execute("PREPARE updateStmt AS UPDATE einstein SET msg=$1 WHERE id=$2")
    execute_batch(cur, "EXECUTE updateStmt (%(msg)s, %(id)s)", einstein_quotes, page_size=page_size)
    cur.execute("DEALLOCATE updateStmt")
    con.commit()


def main():
    con = None
    einstein_quotes = ({"id": "a", "msg": "Few are those who see with their own eyes and feel with their own hearts."},
                       {"id": "b", "msg": "I have no special talent. I am only passionately curious."},
                       {"id": "c", "msg": "Life is like riding a bicycle. To keep your balance you must keep moving."})

    try:
        con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
        print_table(con)
        update(con, einstein_quotes, 100)  #choose some meaningful page_size here
        print("rows updated:")
        print_table(con)

    except psycopg2.DatabaseError as e:

        print(f'Error {e}')
        sys.exit(1)

    finally:

        if con:
            con.close()


if __name__ == '__main__':
    main()

Output

The above program would output the following to the debug console:

a     empty
b     empty
c     empty
rows updated:
a     Few are those who see with their own eyes and feel with their own hearts.
b     I have no special talent. I am only passionately curious.
c     Life is like riding a bicycle. To keep your balance you must keep moving.
Priedieu answered 28/2, 2020 at 22:37 Comment(8)
It is maybe worth to point out that executemany is not faster than calling execute in a loop. It doesn't procude the SQL Syntax OP is asking for. Still an upvote for yout time and the nice quotes of an admirable man.Collaboration
@YannicHamann Thanks. Very good hint indeed, I have updated the answer by using execute_batch, which at most combines page_size update statements in a single batch, thus reducing the number of server round trips. My focus was on the value passing aspect of the question. It could be done even faster by using prepared statements. The FROM clause of the shown desired SQL statement would work on a table, but in this case the data are passed in memory by using an iterable format. Or maybe I'm missing something here?Priedieu
I think the answer you provided is very readable and I would prefer much more to spot your code in a codebase I am working with than the SQL OP wants. However, there may be reasons generate exactly the SQL he is asking for but I think this is already solved by my comment under his post.Collaboration
I agree. This is a better approach. Didn't know it was an option :)Salvatore
Is there an even better way to do a bulk update using a prepared statement?Salvatore
I have updated the answer to use a prepared statement so that the database is able to cache the query plan for faster execution.Priedieu
It's strange to me why executing many statements updating 1 row would be faster than single statement updating many rows. I can't find any information on this. Do you have any that you could share?Salvatore
I added an alternative solution. But often speed depends more on other factors, e.g. indices on large tables etc. Curious how it works out for you.Priedieu
S
8

Short answer! Use execute_values(curs, sql, args), see docs

For those looking for short straightforward answer. Sample code to update users in bulk;

from psycopg2.extras import execute_values

sql = """
    update users u
    set
        name = t.name,
        phone_number = t.phone_number
    from (values %s) as t(id, name, phone_number)
    where u.id = t.id;
"""

rows_to_update = [
    (2, "New name 1", '+923002954332'),
    (5, "New name 2", '+923002954332'),
]
curs = conn.cursor()  # Assuming you already got the connection object
execute_values(curs, sql, rows_to_update)

If you're using the uuid for primary key, and haven't registered the uuid data type in psycopg2 (keeping uuid as a python string), you can always use this condition u.id = t.id::uuid.

Steen answered 4/1, 2023 at 9:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.