Side output in ParDo | Apache Beam Python SDK
Asked Answered
S

1

5

As the documentation is only available for JAVA, I could not really understand what it means.

It states - "While ParDo always produces a main output PCollection (as the return value from apply), you can also have your ParDo produce any number of additional output PCollections. If you choose to have multiple outputs, your ParDo will return all of the output PCollections (including the main output) bundled together. For example, in Java, the output PCollections are bundled in a type-safe PCollectionTuple."

I understand what bundled together means, but if i am yielding a tag in my DoFn, does it yields with a bundle with all other outputs empty on the go and yield other outputs when they are encountered in code? or it waits for all yields to be ready for a input and the outputs them all together in a bundle?

There isnt much clarity around it in the documentation. Although i think it doesnt wait and just yields when encountered, but I still need understand what is happening.

Swee answered 14/9, 2018 at 20:9 Comment(0)
C
10

The best way to answer this is with an example. This example is available in Beam.

Suppose that you want to run a word count pipeline (e.g. count the number of times each word appears in a document). For this you need to split lines in a file into individual words. Consider that you also want to count word lengths individually. Your splitting transform would be like so:

with beam.Pipeline(options=pipeline_options) as p:

    lines = p | ReadFromText(known_args.input)  # Read in the file

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    split_lines_result = (lines
                          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
                              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
                              main='words'))

    short_words = split_lines_result['words']
    character_count = split_lines_result[
        SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]

In this case, each is a different PCollection, with the right elements. The DoFn would be in charge of splitting its outputs, and it does it by tagging elements. See:

class SplitLinesToWordsFn(beam.DoFn):
  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'

  def process(self, element):
    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
    # collection.
    yield pvalue.TaggedOutput(
        self.OUTPUT_TAG_CHARACTER_COUNT, len(element))

    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      # yield word to add it to the main collection.
      yield word

As you can see, for the main output, you do not need to tag the elements, but for the other outputs you do.

Cousteau answered 14/9, 2018 at 23:18 Comment(3)
Well, I understand that. Maybe I did not ask the question right, what i want to know is, in case of the example you provided. Do the main output and side output yield happen at the same time. I mean after the DoFn is called for one element of pcollection, do all yields wait for each other to be ready and return value together bundled together. I know this might sound silly, but documentation explains it in a very weird way.Swee
I see. Each yield should be propagated independently. If you return an iterable (like a list), of course that iterable object needs to be fully created first, before any element goes down the stream; but if you are yielding elements one by one, then each one should go down the steps in a stage by itself. Do you have a specific use case that has you confused?Cousteau
Umm, Not really, I was just worried looking at the documentation. I understood the bundled part though, that it means while accessing it is bundled, i just wanted to ask someone to be sure of it. Thanks a lotSwee

© 2022 - 2024 — McMap. All rights reserved.