I have two pyspark streaming jobs:
streaming_job_a
reads from kafka, writes a dataframe containing the raw data in one column and a timestamp in another column to locationA
in s3, and creates unmanaged delta tabletable_a
using locationA
streaming_job_b
reads from delta tabletable_a
, extracts the raw data into separate columns, writes to locationB
in s3, and creates unmanaged delta tabletable_b
.
If I want to change the locations and table names used by both of these jobs, how do I do so in a way that preserves the data, doesn't cause problems with the checkpoints, and takes the least amount of time? Both tables have to be preserved because other teams read from both of them. The end result would ideally look like this:
streaming_job_a
reads from kafka, writes to locationA_new
in s3 and creates delta tabletable_a_new
streaming_job_b
reads from delta tabletable_a_new
, writes to locationB_new
in s3, and creates delta tabletable_b_new
.
I know I can read from the old location and write to the new location like this:
incoming_df = spark.readStream.format("delta").table("table_a")
writer_df = (
incoming_df
.writeStream.format("delta")
.option("checkpointLocation", "A_new/_checkpoints")
.option("path", "A_new")
.trigger(once=True)
)
writer_df.start()
and then create the new table:
spark.sql("create table table_a_new using delta location 'A_new'")
and then do something similar for streaming_job_b
, but in this approach I'm concerned about missing new data that gets written to location A
while the migration for streaming_job_b
takes place. I'm fairly new to spark streaming in general so any advice is greatly appreciated!