When using unbounded PCollection from TextIO to BigQuery, data is stuck in Reshuffle/GroupByKey inside of BigQueryIO
Asked Answered
S

1

7

I'm using TextIO for reading from Cloud Storage. As I want to have the job running continously, I use watchForNewFiles.

For completeness, the data I read is working fine if I use bounded PCollections (no watchForNewFiles and BigQueryIO in batch mode), so there is no data issue.

I have p.run().waitUntilFinish(); in my code, so the pipeline runs. And it does not give any error.

Apache beam version is 2.8.0

PCollection<String> stream =
        p.apply("Read File", TextIO
                .read()
                .from(options.getInput())
                .watchForNewFiles(
                        Duration.standardMinutes(1),
                        Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
                )
                .withCompression(Compression.AUTO));

This works perfectly fine and reads files as soon as they are available. the PCollection is unbounded and contains lines of text from these files.

After doing some transformations

PCollection<List<String>> lines = stream.apply("Parse CSV",
        ParDo.of(new ParseCSV())
);

PCollection<TableRow> rows = lines.apply("Convert to BQ",
        ParDo.of(new BigQueryConverter(schema))
);

The ParseCSV step adds timestamps to its receiver via outputWithTimestamp.

I end up with a PCollection of TableRows ready to stream to BigQuery. For that, I use

WriteResult result = rows.apply("WriteToBigQuery",
        BigQueryIO.
                <TableRow>write()
                .withFormatFunction(input -> input)
                .withSchema(bqSchema)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .withExtendedErrorInfo()
                .to(options.getOutput())

);

This never writes data to BigQuery. If I take a look into the UI, I see that BigQueryIO does

  • ShardTableWrites
  • TagWithUniqueId
  • Reshuffle
    • Window.into
    • GroupByKey

Data enters and leaves the first two steps. But never the Reshuffle. This only reads data but never passes data on. The step inside Reshuffle which causes that is GroupByKey.

As the collection is unbounded, I tried to configure the window with

lines = lines.apply(Window.configure()
        .<List<String>>into(FixedWindows
                .of(Duration.standardSeconds(10))
        )
);

which should force anything doing a GroupByKey to release a window after 10 seconds. But it does not.

lines = lines.apply(Window.configure()
        .<List<String>>into(FixedWindows
                .of(Duration.standardSeconds(10))
        )
        .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
        .withAllowedLateness(Duration.standardSeconds(0))
        .discardingFiredPanes()
);

Adding a specific trigger on processing time also did not help. Any clue? Thanks in advance!

Slacks answered 12/11, 2018 at 16:50 Comment(3)
This seems like an issue, as you have already open a issue in the Public Issue Tracker (issuetracker.google.com/119886479) , I will follow up on it.Ham
(Also a problem for an (unbounded) PubSub source.)Snooze
Interestingly, if you read multiple files, small ones gets through even if the big ones are already stuck.Photozincography
A
2

One workaround could be (which worked for me) to assagin a new key to every element and force the Dataflow to decouple transformations with a Reshuffle or a GroupByKey.

streams.apply(WithKeys.of(input -> 1)).setCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
       .apply(Reshuffle.of())
       .apply(MapElements.via(new SimpleFunction<KV<Integer, String>, String>() {
           @Override
           public String apply(KV<Integer, String> input) {
               return input.getValue();
           }
       }))
       .apply("convertToTableRow", ...)
       .apply("WriteToBigQuery", ...)

The key can be a constant like in the example or a random. If you choose random then you have to set the range small enough to fit in the JVM memory. Like ThreadLocalRandom.current().nextInt(0, 5000)

Amidships answered 14/6, 2019 at 12:21 Comment(1)
Per this link, Reshuffle will be deprecated?Zoogloea

© 2022 - 2024 — McMap. All rights reserved.