How to do an upsert with SqlAlchemy?
Asked Answered
G

12

138

I have a record that I want to exist in the database if it is not there, and if it is there already (primary key exists) I want the fields to be updated to the current state. This is often called an upsert.

The following incomplete code snippet demonstrates what will work, but it seems excessively clunky (especially if there were a lot more columns). What is the better/best way?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

Is there a better or less verbose way of doing this? Something like this would be great:

sess.upsert_this(desired_default, unique_key = "name")

although the unique_key kwarg is obviously unnecessary (the ORM should be able to easily figure this out) I added it just because SQLAlchemy tends to only work with the primary key. eg: I've been looking at whether Session.merge would be applicable, but this works only on primary key, which in this case is an autoincrementing id which is not terribly useful for this purpose.

A sample use case for this is simply when starting up a server application that may have upgraded its default expected data. ie: no concurrency concerns for this upsert.

Grassquit answered 23/8, 2011 at 18:46 Comment(4)
Why can't you make the name field a primary key if it is unique (and merge would work in this case). Why do you need a separate primary key?Numerical
@abbot: I don't want to get into an id field debate, but... the short answer is "foreign keys". Longer is that although the name is indeed the only required unique key, there are two problems. 1) when a template record is referenced by 50 million records in another table having that FK as a string field is nuts. An indexed integer is better, hence the seemingly pointless id column. and 2) extending on that, if the string was used as the FK, there are now two locations to update the name if/when it changes, which is annoying and rife with dead relationship issues. The id never changes.Grassquit
you might try a new (beta) upsert library for python... it's compatible with psycopg2, sqlite3, MySQLdbSeaworthy
see also this thread: Does SQLAlchemy have an equivalent of Django's get or create?Siphonostele
B
81

SQLAlchemy supports ON CONFLICT with two methods on_conflict_do_update() and on_conflict_do_nothing().

Copying from the documentation:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
Bambibambie answered 6/6, 2017 at 17:12 Comment(8)
MySQL is also supported with on_duplicate_key_updateRotation
just execute can't got the returning idTripura
This code yes, I would think (answer is 3+ years old), but maybe Michaels comment works for MySQL. Generally speaking my (this) answer is kind of jumping to conclusion that postgres is used as database. It is not great because it does not really answer the generic question that was asked. But based on the upvotes I got, I gahered it was useful to some people so I left it up.Bambibambie
why do we have exluded in set_ ? set_=dict(data=stmt.excluded.data)Edging
Had to change set_=dict(data=stmt.excluded.data) to set_=dict(stmt.excluded.items()) to get this working as stmt.excluded has no data attribute.Aerophone
@JamesBridgewater you misunderstood the example. set_=dict(data=stmt.excluded.data) is to update only the data column with the value of the excluded data (as data is the name of a column in this table schema). set_=dict(stmt.excluded.items()) will simply update all excluded columnsMink
Incidentally, if you just want to update all excluded columns, stmt.excluded is a ColumnCollection which works as a mapping, so you can simply say set_=stmt.excludedMink
how would you get a list of columns with unique constraints? in my case my primary key is a serial, but the unique contraint exists on another columnTearoom
M
76

SQLAlchemy does have a "save-or-update" behavior, which in recent versions has been built into session.add, but previously was the separate session.saveorupdate call. This is not an "upsert" but it may be good enough for your needs.

It is good that you are asking about a class with multiple unique keys; I believe this is precisely the reason there is no single correct way to do this. The primary key is also a unique key. If there were no unique constraints, only the primary key, it would be a simple enough problem: if nothing with the given ID exists, or if ID is None, create a new record; else update all other fields in the existing record with that primary key.

However, when there are additional unique constraints, there are logical issues with that simple approach. If you want to "upsert" an object, and the primary key of your object matches an existing record, but another unique column matches a different record, then what do you do? Similarly, if the primary key matches no existing record, but another unique column does match an existing record, then what? There may be a correct answer for your particular situation, but in general I would argue there is no single correct answer.

That would be the reason there is no built in "upsert" operation. The application must define what this means in each particular case.

Mauri answered 23/8, 2011 at 19:37 Comment(2)
12 years on, and you just gifted me illumination.. thanks!Torpor
@Torpor Reading stackoverflow on Christmas day, mad respectContemptuous
V
34

Nowadays, SQLAlchemy provides two helpful functions on_conflict_do_nothing and on_conflict_do_update. Those functions are useful but require you to swich from the ORM interface to the lower-level one - SQLAlchemy Core.

Although those two functions make upserting using SQLAlchemy's syntax not that difficult, these functions are far from providing a complete out-of-the-box solution to upserting.

My common use case is to upsert a big chunk of rows in a single SQL query/session execution. I usually encounter two problems with upserting:

For example, higher level ORM functionalities we've gotten used to are missing. You cannot use ORM objects but instead have to provide ForeignKeys at the time of insertion.

I'm using this following function I wrote to handle both of those issues:

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)
Vaccaro answered 28/7, 2018 at 2:54 Comment(3)
on_conflict is only available for backends that support native ON CONFLICT clauases. Hence, only postgresqlFarmhand
@Farmhand Now SQLAlchemy also supports ON DUPLICATE KEY UPDATE for MySQL.Rotation
@Nirlze Seems it's missing, what are the 2 problems you mentioned?He
B
14

I use a "look before you leap" approach:

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

The advantage is that this is db-neutral and I think it's clear to read. The disadvantage is that there's a potential race condition in a scenario like the following:

  • we query the db for a switch_command and don't find one
  • we create a switch_command
  • another process or thread creates a switch_command with the same primary key as ours
  • we try to commit our switch_command
Bilious answered 19/10, 2017 at 20:0 Comment(3)
This question handles the race condition with a try/catchBilious
The entire goal of upsert is to avoid the race condition described here.Fortyfour
@Fortyfour I know- that's why it's really sad that SQLALchemy makes it difficult to do cleanly and portably... I've highlighted the race condition in my answerBilious
L
4

There are multiple answers and here comes yet another answer (YAA). Other answers are not that readable due to the metaprogramming involved. Here is an example that

  • Uses SQLAlchemy ORM

  • Shows how to create a row if there are zero rows using on_conflict_do_nothing

  • Shows how to update the existing row (if any) without creating a new row using on_conflict_do_update

  • Uses the table primary key as the constraint

A longer example in the original question what this code is related to.


import sqlalchemy as sa
import sqlalchemy.orm as orm
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

class PairState(Base):

    __tablename__ = "pair_state"

    # This table has 1-to-1 relationship with Pair
    pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
    pair = orm.relationship(Pair,
                        backref=orm.backref("pair_state",
                                        lazy="dynamic",
                                        cascade="all, delete-orphan",
                                        single_parent=True, ), )


    # First raw event in data stream
    first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # Last raw event in data stream
    last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # The last hypertable entry added
    last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    @staticmethod
    def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Sets the first event value if not exist yet."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, first_event_at=ts).
            on_conflict_do_nothing()
        )

    @staticmethod
    def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_event_at for a named pair."""
        # Based on the original example of https://mcmap.net/q/56883/-programmingerror-sqlalchemy-on_conflict_do_update
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_event_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
        )

    @staticmethod
    def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_interval_at for a named pair."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_interval_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
        )
Laudable answered 14/6, 2021 at 11:24 Comment(2)
An interesting approach adding the functionality to the Model. I wonder if this could be moved to a base model that other models could inherit, and thus be available by default across all models.Regelation
I was able to adapt the above to my situation, using sessions and execute. Needed from sqlalchemy.dialects.postgresql import insert. Thanks!Messing
T
3

The below works fine for me with redshift database and will also work for combined primary key constraint.

SOURCE : this

Just few modifications required for creating SQLAlchemy engine in the function def start_engine()

from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql

Base = declarative_base()

def start_engine():
    engine = create_engine(os.getenv('SQLALCHEMY_URI', 
    'postgresql://localhost:5432/upsert'))
     connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    return engine


class DigitalSpend(Base):
    __tablename__ = 'digital_spend'
    report_date = Column(Date, nullable=False)
    day = Column(Date, nullable=False, primary_key=True)
    impressions = Column(Integer)
    conversions = Column(Integer)

    def __repr__(self):
        return str([getattr(self, c.name, None) for c in self.__table__.c])


def compile_query(query):
    compiler = query.compile if not hasattr(query, 'statement') else 
  query.statement.compile
    return compiler(dialect=postgresql.dialect())


def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
    table = model.__table__

    stmt = insert(table).values(rows)

    update_cols = [c.name for c in table.c
                   if c not in list(table.primary_key.columns)
                   and c.name not in no_update_cols]

    on_conflict_stmt = stmt.on_conflict_do_update(
        index_elements=table.primary_key.columns,
        set_={k: getattr(stmt.excluded, k) for k in update_cols},
        index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
        )

    print(compile_query(on_conflict_stmt))
    session.execute(on_conflict_stmt)


session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])
Tory answered 26/3, 2019 at 12:41 Comment(0)
D
2

This allows access to the underlying models based on string names

def get_class_by_tablename(tablename):
  """Return class reference mapped to table.
  https://mcmap.net/q/56884/-sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to-a-metaclass-constructor-as-far-as-i-can-see
  :param tablename: String with name of table.
  :return: Class reference or None.
  """
  for c in Base._decl_class_registry.values():
    if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
      return c


sqla_tbl = get_class_by_tablename(table_name)

def handle_upsert(record_dict, table):
    """
    handles updates when there are primary key conflicts

    """
    try:
        self.active_session().add(table(**record_dict))
    except:
        # Here we'll assume the error is caused by an integrity error
        # We do this because the error classes are passed from the
        # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
        # them with it's own code - this should be updated to have
        # explicit error handling for each new db engine

        # <update>add explicit error handling for each db engine</update> 
        active_session.rollback()
        # Query for conflic class, use update method to change values based on dict
        c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
        c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk

        c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
        c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols

        c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()

        # apply new data values to the existing record
        for k, v in record_dict.items()
            setattr(c_target_record, k, v)
Delozier answered 5/4, 2019 at 17:25 Comment(0)
F
1

This works for me with sqlite3 and postgres. Albeit it might fail with combined primary key constraints and will most likely fail with additional unique constraints.

    try:
        t = self._meta.tables[data['table']]
    except KeyError:
        self._log.error('table "%s" unknown', data['table'])
        return

    try:
        q = insert(t, values=data['values'])
        self._log.debug(q)
        self._db.execute(q)
    except IntegrityError:
        self._log.warning('integrity error')
        where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
        update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
        q = update(t, values=update_dict).where(*where_clause)
        self._log.debug(q)
        self._db.execute(q)
    except Exception as e:
        self._log.error('%s: %s', t.name, e)
Fortunetelling answered 8/11, 2018 at 9:1 Comment(0)
A
1

As we had problems with generated default-ids and references which lead to ForeignKeyViolation-Errors like

update or delete on table "..." violates foreign key constraint
Key (id)=(...) is still referenced from table "...".

we had to exclude the id for the update dict, as otherwise the it will be always generated as new default value.

In addition the method is returning the created/updated entity.

from sqlalchemy.dialects.postgresql import insert # Important to use the postgresql insert


def upsert(session, data, key_columns, model):

    stmt = insert(model).values(data)
    
    # Important to exclude the ID for update!
    exclude_for_update = [model.id.name, *key_columns]
    update_dict = {c.name: c for c in stmt.excluded if c.name not in exclude_for_update}

    stmt = stmt.on_conflict_do_update(
        index_elements=key_columns,
        set_=update_dict
    ).returning(model)

    orm_stmt = (
        select(model)
        .from_statement(stmt)
        .execution_options(populate_existing=True)
    )

    return session.execute(orm_stmt).scalar()

Example:


class UpsertUser(Base):
    __tablename__ = 'upsert_user'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    name: str = Column(sa.String, nullable=False)
    user_sid: str = Column(sa.String, nullable=False, unique=True)
    house_admin = relationship('UpsertHouse', back_populates='admin', uselist=False)


class UpsertHouse(Base):
    __tablename__ = 'upsert_house'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    admin_id: Id = Column(Id, ForeignKey('upsert_user.id'), nullable=False)
    admin: UpsertUser = relationship('UpsertUser', back_populates='house_admin', uselist=False)

# Usage

upserted_user = upsert(session, updated_user, [UpsertUser.user_sid.name], UpsertUser)

Note: Only tested on postgresql but could work also for other DBs which support ON DUPLICATE KEY UPDATE e.g. MySQL

Advocate answered 29/7, 2022 at 10:24 Comment(0)
C
0

In case of sqlite, the sqlite_on_conflict='REPLACE' option can be used when defining a UniqueConstraint, and sqlite_on_conflict_unique for unique constraint on a single column. Then session.add will work in a way just like upsert. See the official documentation.

Cathe answered 19/3, 2022 at 8:48 Comment(0)
H
0

I use this code for upsert Before using this code, you should add primary keys to table in database.

from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.inspection import inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.dialects.postgresql import insert

def upsert(df, engine, table_name, schema=None, chunk_size = 1000):

    metadata = MetaData(schema=schema)
    metadata.bind = engine

    table = Table(table_name, metadata, schema=schema, autoload=True)
    
   # olny use common columns between df and table.
    table_columns = {column.name for column in table.columns}
    df_columns = set(df.columns)
    intersection_columns = table_columns.intersection(df_columns)
    
    df1 = df[intersection_columns] 
    records  = df1.to_dict('records')

    # get list of fields making up primary key
    primary_keys = [key.name for key in inspect(table).primary_key]
    

    with engine.connect() as conn:
        chunks = [records[i:i + chunk_size] for i in range(0, len(records), chunk_size)]
        for chunk in chunks:
            stmt = insert(table).values(chunk)
            update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
            s = stmt.on_conflict_do_update(
                index_elements= primary_keys,
                set_=update_dict)
            conn.execute(s)
Hobbs answered 9/2, 2023 at 5:38 Comment(0)
H
0

I used the following pattern(my usecase more simple),

is_add = True
try:
   stmt = select(Model).where(Model.id == todo.id)
   _ = session.scalars(stmt).one()
except NoResultFound as err:
   is_add = True or Model.id is None

if _is_add:
   # insert the record
else:
   # update the record
Hazelton answered 13/3, 2024 at 10:4 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.