What is the best way to migrate delta tables created in spark streaming jobs to a new location?
Asked Answered
Z

0

6

I have two pyspark streaming jobs:

  1. streaming_job_a reads from kafka, writes a dataframe containing the raw data in one column and a timestamp in another column to location A in s3, and creates unmanaged delta table table_a using location A
  2. streaming_job_b reads from delta table table_a, extracts the raw data into separate columns, writes to location B in s3, and creates unmanaged delta table table_b.

If I want to change the locations and table names used by both of these jobs, how do I do so in a way that preserves the data, doesn't cause problems with the checkpoints, and takes the least amount of time? Both tables have to be preserved because other teams read from both of them. The end result would ideally look like this:

  1. streaming_job_a reads from kafka, writes to location A_new in s3 and creates delta table table_a_new
  2. streaming_job_b reads from delta table table_a_new, writes to location B_new in s3, and creates delta table table_b_new.

I know I can read from the old location and write to the new location like this:

incoming_df = spark.readStream.format("delta").table("table_a")

writer_df = (
    incoming_df
    .writeStream.format("delta")
    .option("checkpointLocation", "A_new/_checkpoints")
    .option("path", "A_new")
    .trigger(once=True)
)

writer_df.start()

and then create the new table:

spark.sql("create table table_a_new using delta location 'A_new'")

and then do something similar for streaming_job_b, but in this approach I'm concerned about missing new data that gets written to location A while the migration for streaming_job_b takes place. I'm fairly new to spark streaming in general so any advice is greatly appreciated!

Zaragoza answered 12/9, 2022 at 17:54 Comment(2)
what kind of data transformation happens in your streaming pipelines? Do you do any aggregations, or something else that require storing actual data in state? Also, how do you read from location A?Nb
streaming_job_a reads from kafka and essentially writes a dataframe containing the raw data in one column and a timestamp in another column to location A. Then streaming_job_b reads from table_a and extracts the raw data into separate columns before writing it to location B. We don't read from location A explicitly; we read from table_a, an unmanaged delta table in databricks. Other teams currently read from both table_a and table_b, so we have to preserve both tables for now.Zaragoza

© 2022 - 2024 — McMap. All rights reserved.