Monitoring WriteToBigQuery
Asked Answered
D

1

5

In my pipeline I use WriteToBigQuery something like this:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

This returns a Dict as described in the documentation as follows:

The beam.io.WriteToBigQuery PTransform returns a dictionary whose BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the rows that failed to be written.

How do I print this dict and turn it into a pcollection or how do I just print the FAILED_ROWS?

If I do: | "print" >> beam.Map(print)

Then I get: AttributeError: 'dict' object has no attribute 'pipeline'

I must have read a hundred pipelines but never have I seen anything after the WriteToBigQuery.

[edit] When I finish the pipeline and store the results in a variable I have the following:

{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}

But I do not know how to use this result in the pipeline like this:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)
Dielu answered 29/11, 2019 at 9:55 Comment(0)
M
18

Dead letters to handle invalid inputs are a common Beam/Dataflow usage and work with both Java and Python SDKs but there are not many examples for the latter.

Imagine that we have some dummy input data with 10 good lines and a bad row that does not conform to the table schema:

schema = "index:INTEGER,event:STRING"

data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')

Then, what I do is name the write result (events in this case):

events = (p
    | "Create data" >> beam.Create(data)
    | "CSV to dict" >> beam.ParDo(CsvToDictFn())
    | "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
        "{0}:dataflow_test.good_lines".format(PROJECT),
        schema=schema,
    )
 )

and then access the FAILED_ROWS side output:

(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
    | "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))

This works well with the DirectRunner and writes the good lines to BigQuery:

enter image description here

and the bad one to a local file:

$ cat error_log.txt-00000-of-00001 
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})

If you run it with the DataflowRunner you'll need some additional flags. If you face the TypeError: 'PDone' object has no attribute '__getitem__' error you'll need to add --experiments=use_beam_bq_sink to use the new BigQuery sink.

If you get a KeyError: 'FailedRows' it's because the new sink will default to load BigQuery jobs for batch pipelines:

STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.

You can override the behavior by specifying method='STREAMING_INSERTS' in WriteToBigQuery:

enter image description here

Full code for both DirectRunner and DataflowRunner here.

Mirella answered 29/11, 2019 at 23:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.