Converting PySpark dataframe to a Delta Table
Asked Answered
B

2

5

I am working in AWS Glue environment. I read the data from Glue catalog as a Dynamic dataframe and convert it to Pyspark dataframe for my custom transformations. To do an upsert of the new/updated data, I am intending to use delta tables.

But I'm only finding options to read data as a delta table from a path. I need to convert my Pyspark dataframe to a Delta table for doing merge operations. Is there any way to do this?

Bard answered 30/8, 2021 at 8:12 Comment(0)
U
5

You need to have only a destination table as Delta table. The data that you're planning to merge into is not required to be a Delta table. It's really depends on what API you're using:

  • If you're using Python API, then you can just use dataframe as is (example is based on docs):
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")
updatesDF = .... # your transformed dataframe

deltaTable.alias("target").merge(
    updatesDF.alias("updates"),
    "target.col1 = updates.col1") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()
  • If you're using SQL MERGE command - you can just register the temp view for your dataframe, and use it as input into the MERGE SQL command:
updates_df.createOrReplaceTempView(updates)
merge_sql = f"""
      merge into target
      using updates
        
      ON source.col1 == target.col1
      WHEN MATCHED THEN UPDATE SET *
      WHEN NOT MATCHED THEN INSERT *
    """
updates_df._jdf.sparkSession().sql(merge_sql)

The only catch here is that you need to use df._jdf.sparkSession().sql to execute the SQL command in the same context where you have registered the temp view.

Unteach answered 30/8, 2021 at 10:47 Comment(0)
H
3
df.write.partitionBy("your_colum").format("delta").mode("overwrite").save(your_path)

PartitionBy will depend of yor df size. In small df you could skip that

Humberto answered 24/7, 2022 at 8:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.