Insert or Update a delta table from a dataframe in Pyspark
Asked Answered
G

3

11

I have a pyspark dataframe currently from which I initially created a delta table using below code -

df.write.format("delta").saveAsTable("events")

Now, since the above dataframe populates the data on daily basis in my requirement, hence for appending new records into delta table, I used below syntax -

df.write.format("delta").mode("append").saveAsTable("events")

Now this whole thing I did in databricks and in my cluster. I want to know how can I write generic pyspark code in python that will create delta table if it does not exists and append records if delta table exists.This thing I want to do because if I give my python package to someone, they will not have the same delta table in their environment so it should get created dynamically from code.

Greaves answered 23/2, 2021 at 20:38 Comment(1)
This is exactly the definition of append mode when writing.Photopia
D
5

If you don't have Delta table yet, then it will be created when you're using the append mode. So you don't need to write any special code to handle the case when table doesn't exist yet, and when it exits.

P.S. You'll need to have such code only in case if you're performing merge into the table, not append. In this case the code will looks like this:

if table_exists:
  do_merge
else:
  df.write....

P.S. here is a generic implementation of that pattern

Desrochers answered 27/2, 2021 at 9:54 Comment(0)
M
6

There are eventually two operations available with spark

  • saveAsTable:- create or replace the table if present or not with the current DataFrame
  • insertInto:- Successful if the table present and perform operation based on the mode('overwrite' or 'append'). it requires the table to be available in the database.

The .saveAsTable("events") basically rewrites the table every time you call it. which means that, even if you have a table present earlier or not, it will replace the table with the current DataFrame value. Instead, you can perform the below operation to be in the safer side:

Step 1: Create the table even if it is present or not. If present, remove the data from the table and append the new data frame records, else create the table and append the data.

df.createOrReplaceTempView('df_table')

spark.sql("create table IF NOT EXISTS table_name using delta select * from df_table where 1=2")

df.write.format("delta").mode("append").insertInto("events")

So, every time it will check if the table is available or not, else it will create the table and move to next step. Else, if the table is available, then append the data into the table.

Mitziemitzl answered 31/5, 2022 at 9:7 Comment(1)
You don’t need to explicitly create table - just use .saveAsTable - it will be created if doesn’t exist yetDesrochers
D
5

If you don't have Delta table yet, then it will be created when you're using the append mode. So you don't need to write any special code to handle the case when table doesn't exist yet, and when it exits.

P.S. You'll need to have such code only in case if you're performing merge into the table, not append. In this case the code will looks like this:

if table_exists:
  do_merge
else:
  df.write....

P.S. here is a generic implementation of that pattern

Desrochers answered 27/2, 2021 at 9:54 Comment(0)
P
2

use the following syntax.

updates = df
dest = DeltaTable.forPath(spark, path)
dest.alias("events").merge(
    updates.alias("updates"),   
        'events.Id = updates.Id and \
        events.Type = updates.Type'
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()
Punchinello answered 10/11, 2023 at 20:50 Comment(1)
There should be AND in SQL thereJarl

© 2022 - 2024 — McMap. All rights reserved.