Apache Beam/Google Dataflow PubSub to BigQuery Pipeline: Handling Insert Errors and Unexpected Retry Behavior
Asked Answered
T

1

5

I have pulled down a copy of the Pub/Sub to BigQuery Dataflow template from Google's github repository. I am running it on my local machine using the direct-runner.

In testing I confirmed that the template only writes failures to the "deadletter" table if an error occurs during UDF processing or conversion from JSON to TableRow.

I wish to also handle failures that occur at time of insert into BigQuery more gracefully by sending them into a separate TupleTag as well so they can also be sent to the deadletter table or another output for review and processing. Currently, when executing with the dataflow-runner those errors only getting written to the Stackdriver logs and continue to be retried indefinitely until the issue is resolved.

Question One: While testing locally and publishing a message with a format not matching the destination table's schema an insert is retried 5 times and then the pipeline crashes with a RuntimeException along with the error returned from the HTTP response to Google's API. I believe this behavior is being set within BigQueryServices.Impl here:

private static final FluentBackoff INSERT_BACKOFF_FACTORY =
        FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);

However, based on Google's documentation,

"When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall."

As Beam's Pub/Sub.IO,

create and consume unbounded PCollections

I am under the impression that streaming mode should be enabled by default when reading from Pub/Sub. I even went as far as adding the Streaming_Inserts method on my call to writeTableRows() and it did not impact this behavior.

.apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
  1. Is this behavior somehow being influenced by which runner I am using? If not, where is the flaw in my understanding?

Question Two:

  1. Is there a difference in performance when using BigQueryIO.write vs BigQueryIO.writeTableRows?

I ask because I do not see how I can capture the insert related errors without creating my own static class that overrides the expand method and uses a ParDo and DoFn where I can add my own custom logic to create separate TupleTags for successful records and failure records, similar to how this was done within JavascriptTextTransformer for FailsafeJavascriptUdf.

Update:

public static PipelineResult run(DirectOptions options) {

options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    FailsafeElementCoder<PubsubMessage, String> coder =
        FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

     PCollectionTuple transformOut =
        pipeline
             //Step #1: Read messages in from Pub/Sub
            .apply(
                "ReadPubsubMessages",
  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))

             //Step #2: Transform the PubsubMessages into TableRows
            .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));

    WriteResult writeResult = null;

    try {
      writeResult = 
            transformOut
        .get(TRANSFORM_OUT)
        .apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
                .withoutValidation()
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .to("myproject:MyDataSet.MyTable"));
    } catch (Exception e) {
        System.out.print("Cause of the Standard Insert Failure is: ");
        System.out.print(e.getCause());
    }

    try {
        writeResult
            .getFailedInserts()
            .apply(
                    "WriteFailedInsertsToDeadLetter",
                    BigQueryIO.writeTableRows()
                        .to(options.getOutputDeadletterTable())
                        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    } catch (Exception e) {
        System.out.print("Cause of the Error Insert Failure is: ");
        System.out.print(e.getCause());
    }

     PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
        .and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
        .apply("Flatten", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            WritePubsubMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
                .build());

    return pipeline.run();
  }

Error:

Cause of the Error Insert Failure is: null[WARNING] 
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
    at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)
Thoma answered 27/8, 2018 at 17:34 Comment(0)
M
8

In the latest versions of Beam, the BigQueryIO.Write transform returns back a WriteResult object which enables you to retrieve a PCollection of TableRows that failed output to BigQuery. Using this, you can easily retrieve the failures, format them in the structure of your deadletter output, and resubmit the records to BigQuery. This eliminates the need for a separate class to manage successful and failed records.

Below is an example of what that could look like for your pipeline.

// Attempt to write the table rows to the output table.
WriteResult writeResult =
    pipeline.apply(
        "WriteRecordsToBigQuery",
        BigQueryIO.writeTableRows()
            .to(options.getOutputTable())
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

/*
 * 1) Get the failed inserts
 * 2) Transform to the deadletter table format.
 * 3) Output to the deadletter table.
*/
writeResult
  .getFailedInserts()
    .apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
    .apply(
        "WriteFailedInsertsToDeadletter",
        BigQueryIO.writeTableRows()
            .to(options.getDeadletterTable())
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND));

Additionally, to answer your questions:

  1. According to the beam docs, you must set the streaming option to true for the DirectRunner.
  2. There should be no performance difference. In either case, you'll need to convert the input records to TableRow objects. It should make no difference if you do that in a ParDo beforehand or within a serializable function using the BigQueryIO.Write.withFormatFunction.
Moats answered 27/8, 2018 at 18:56 Comment(3)
Thanks, I didn't notice the mention of the streaming option on the direct-runner documentation. I just added that arg but it didn't change anything. The exception still crashed the pipeline after 5 retries. Giving your other suggestions for capturing the failures now.Thoma
I've updated my question to show the changes applied to my pipeline in an attempt to capture the WriteResults based on your suggestion. I am now getting the following error: "java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null."Thoma
Adding .withJsonSchema to the writeTableRows() call resolved the error message and I'm now seeing the failed record attempting to be inserted into the DeadletterTable following the 5th failure. At this point, I think I just need to implement the FailedInsertFormatter to massage the data into something which the DeadletterTable can accept. Lastly, it seems that .getFailedInserts() only returns the TableRows. How can I also grab the associated error returned in the http response from the initial insert attempt?Thoma

© 2022 - 2024 — McMap. All rights reserved.