I have pulled down a copy of the Pub/Sub to BigQuery Dataflow template from Google's github repository. I am running it on my local machine using the direct-runner.
In testing I confirmed that the template only writes failures to the "deadletter" table if an error occurs during UDF processing or conversion from JSON to TableRow.
I wish to also handle failures that occur at time of insert into BigQuery more gracefully by sending them into a separate TupleTag as well so they can also be sent to the deadletter table or another output for review and processing. Currently, when executing with the dataflow-runner those errors only getting written to the Stackdriver logs and continue to be retried indefinitely until the issue is resolved.
Question One: While testing locally and publishing a message with a format not matching the destination table's schema an insert is retried 5 times and then the pipeline crashes with a RuntimeException along with the error returned from the HTTP response to Google's API. I believe this behavior is being set within BigQueryServices.Impl here:
private static final FluentBackoff INSERT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
However, based on Google's documentation,
"When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall."
As Beam's Pub/Sub.IO,
create and consume unbounded PCollections
I am under the impression that streaming mode should be enabled by default when reading from Pub/Sub. I even went as far as adding the Streaming_Inserts method on my call to writeTableRows() and it did not impact this behavior.
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
- Is this behavior somehow being influenced by which runner I am using? If not, where is the flaw in my understanding?
Question Two:
- Is there a difference in performance when using BigQueryIO.write vs BigQueryIO.writeTableRows?
I ask because I do not see how I can capture the insert related errors without creating my own static class that overrides the expand method and uses a ParDo and DoFn where I can add my own custom logic to create separate TupleTags for successful records and failure records, similar to how this was done within JavascriptTextTransformer for FailsafeJavascriptUdf.
Update:
public static PipelineResult run(DirectOptions options) {
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<PubsubMessage, String> coder =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
PCollectionTuple transformOut =
pipeline
//Step #1: Read messages in from Pub/Sub
.apply(
"ReadPubsubMessages",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
//Step #2: Transform the PubsubMessages into TableRows
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
WriteResult writeResult = null;
try {
writeResult =
transformOut
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to("myproject:MyDataSet.MyTable"));
} catch (Exception e) {
System.out.print("Cause of the Standard Insert Failure is: ");
System.out.print(e.getCause());
}
try {
writeResult
.getFailedInserts()
.apply(
"WriteFailedInsertsToDeadLetter",
BigQueryIO.writeTableRows()
.to(options.getOutputDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
} catch (Exception e) {
System.out.print("Cause of the Error Insert Failure is: ");
System.out.print(e.getCause());
}
PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
.and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
Error:
Cause of the Error Insert Failure is: null[WARNING]
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)