How can I write streaming Dataflow pipelines that support schema evolution?
Asked Answered
P

0

7

I'm building some data streaming pipelines that read from Kafka and write to various sinks using Google Cloud Dataflow. The pipeline looks something like this (simplified).

// Example pipeline that writes to BigQuery.
Pipeline.create(options)
    .apply(KafkaIO.read().withTopic(options.topic))
    .apply(/* Convert to a Row type */)
    .setRowSchema(schemaRegistry.lookup(options.topic))
    .apply(
        BigQueryIO.write<Row>()
            .useBeamSchema()
            .withCreateDisposition(CreateDispotion.CREATE_IF_NEEDED)
            .withProject(options.outputProject)
            .withDataset(options.outputDataset)
            .withTable(options.outputTable)
    )

I plan to run a pipeline for each of our Kafka topics, of which there are hundreds. The pipeline looks up the schema for the given topic during the planning stage. This allows BigQueryIO to create the necessary tables before starting the pipeline.

Question: How can I support evolving schemas in my Dataflow pipelines?

I've explored the option of updating an existing Dataflow job (using the --update flag). The thought is that I could automate the process of submitting an updated job whenever a schema changes. But updating a job seems to incur about 3 minutes of downtime. For some of the jobs, that much downtime won't work. I'm looking for other solutions that hopefully have no more than a few seconds of downtime.

Pentalpha answered 2/3, 2020 at 20:32 Comment(7)
You might ask in [email protected] for your question. There you could get better answers.Pavlodar
Thanks for the suggestion. I'll ask in there as well.Pentalpha
a possible solution would be to use protobuf schema, you may refer to the following blog post part1,part2.Peacemaker
@Peacemaker And part 3 here: robertsahlin.com/…Geriatrics
We have an external json file that represents the schema, we watch for changes in DF and if there is a change we apply the change in BQ schema: bqTable.updateSchema(Schema.of(newFieldList));Kaminsky
@JamesHall: Hi, did you find a solution to this ? If yes, glad if you can share ! ThanksBlockade
Unfortunately no, I abandoned this project.Pentalpha

© 2022 - 2024 — McMap. All rights reserved.