Here's my solution that I have within a generic DatabaseHandler
class that provides a lot of flexibility when using pd.DataFrame
as your source.
def update_data(
self,
table: str,
df: pd.DataFrame,
indexes: Optional[list] = None,
column_map: Optional[dict] = None,
commit: Optional[bool] = False,
) -> int:
"""Update data in the media database
Args:
table (str): the "tablename" or "namespace.tablename"
df (pandas.DataFrame): dataframe containing the data to update
indexes (list): the list of columns in the table that will be in the WHERE clause of the update statement.
If not provided, will use df indexes.
column_map (dict): dictionary mapping the columns in df to the columns in the table
columns in the column_map that are also in keys will not be updated
Key = df column.
Value = table column.
commit (bool): if True, the transaction will be committed (default=False)
Notes:
If using a column_map, only the columns in the data_map will be updated or used as indexes.
Order does not matter. If not using a column_map, all columns in df must exist in table.
Returns:
int : rows updated
"""
try:
if not indexes:
# Use the dataframe index instead
indexes = []
for c in df.index.names:
if not c:
raise Exception(
f"Dataframe contains indexes without names. Unable to determine update where clause."
)
indexes.append(c)
update_strings = []
tdf = df.reset_index()
if column_map:
target_columns = [c for c in column_map.keys() if c not in indexes]
else:
column_map = {c: c for c in tdf.columns}
target_columns = [c for c in df.columns if c not in indexes]
for i, r in tdf.iterrows():
upd_params = ", ".join(
[f"{column_map[c]} = %s" for c in target_columns]
)
upd_list = [r[c] if pd.notna(r[c]) else None for c in target_columns]
upd_str = self._cur.mogrify(upd_params, upd_list).decode("utf-8")
idx_params = " AND ".join([f"{column_map[c]} = %s" for c in indexes])
idx_list = [r[c] if pd.notna(r[c]) else None for c in indexes]
idx_str = self._cur.mogrify(idx_params, idx_list).decode("utf-8")
update_strings.append(f"UPDATE {table} SET {upd_str} WHERE {idx_str};")
full_update_string = "\n".join(update_strings)
print(full_update_string) # Debugging
self._cur.execute(full_update_string)
rowcount = self._cur.rowcount
if commit:
self.commit()
return rowcount
except Exception as e:
self.rollback()
raise e
Example usages:
>>> df = pd.DataFrame([
{'a':1,'b':'asdf','c':datetime.datetime.now()},
{'a':2,'b':'jklm','c':datetime.datetime.now()}
])
>>> cls.update_data('my_table', df, indexes = ['a'])
UPDATE my_table SET b = 'asdf', c = '2023-01-17T22:13:37.095245'::timestamp WHERE a = 1;
UPDATE my_table SET b = 'jklm', c = '2023-01-17T22:13:37.095250'::timestamp WHERE a = 2;
>>> cls.update_data('my_table', df, indexes = ['a','b'])
UPDATE my_table SET c = '2023-01-17T22:13:37.095245'::timestamp WHERE a = 1 AND b = 'asdf';
UPDATE my_table SET c = '2023-01-17T22:13:37.095250'::timestamp WHERE a = 2 AND b = 'jklm';
>>> cls.update_data('my_table', df.set_index('a'), column_map={'a':'db_a','b':'db_b','c':'db_c'} )
UPDATE my_table SET db_b = 'asdf', db_c = '2023-01-17T22:13:37.095245'::timestamp WHERE db_a = 1;
UPDATE my_table SET db_b = 'jklm', db_c = '2023-01-17T22:13:37.095250'::timestamp WHERE db_a = 2;
Note however that this is not safe from SQL injection due to the way it generates the where clause.