Writing different values to different BigQuery tables in Apache Beam
Asked Answered
I

2

11

Suppose I have a PCollection<Foo> and I want to write it to multiple BigQuery tables, choosing a potentially different table for each Foo.

How can I do this using the Apache Beam BigQueryIO API?

Invalidity answered 19/4, 2017 at 20:32 Comment(0)
I
24

This is possible using a feature recently added to BigQueryIO in Apache Beam.

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

Depending on whether the input PCollection<Foo> is bounded or unbounded, under the hood this will either create multiple BigQuery import jobs (one or more per table depending on amount of data), or it will use the BigQuery streaming inserts API.

The most flexible version of the API uses DynamicDestinations, which allows you to write different values to different tables with different schemas, and even allows you to use side inputs from the rest of the pipeline in all of these computations.

Additionally, BigQueryIO has been refactored into a number of reusable transforms that you can yourself combine to implement more complex use cases - see files in the source directory.

This feature will be included in the first stable release of Apache Beam and into the next release of Dataflow SDK (which will be based on the first stable release of Apache Beam). Right now you can use this by running your pipeline against a snapshot of Beam at HEAD from github.

Invalidity answered 19/4, 2017 at 20:32 Comment(6)
Hi @jkff, is that feature available in Python SDK?Eu
No, only in Java for now.Invalidity
Is this available in Java SDK 1.9.0?Neale
No, only in 2.0 and above.Invalidity
is this feature available in Python SDK now ?Bib
This is planned for the Python SDK, but with no date yet.Goodkin
G
3

As of Beam 2.12.0, this feature is available in the Python SDK as well. It is marked as experimental, so you will have to pass --experiments use_beam_bq_sink to enable it. You'd do something like so:

def get_table_name(element):
  if meets_some_condition(element):
    return 'mytablename1'
  else:
    return 'mytablename2'


p = beam.Pipeline(...)

my_input_pcoll = p | ReadInMyPCollection()

my_input_pcoll | beam.io.gcp.bigquery.WriteToBigQuery(table=get_table_name)

The new sink supports a number of other options, which you can review in the pydoc

Goodkin answered 30/5, 2019 at 0:46 Comment(1)
This does not seem to work if you use table decorators to try and write to a specific partition. You get Not found errors.Glidden

© 2022 - 2024 — McMap. All rights reserved.