Collecting output from Apache Beam pipeline and displaying it to console
Asked Answered
C

6

18

I have been working on Apache Beam for a couple of days. I wanted to quickly iterate on the application I am working and make sure the pipeline I am building is error free. In spark we can use sc.parallelise and when we apply some action we get the value that we can inspect.

Similarly when I was reading about Apache Beam, I found that we can create a PCollection and work with it using following syntax

with beam.Pipeline() as pipeline:
    lines = pipeline | beam.Create(["this is test", "this is another test"])
    word_count = (lines 
                  | "Word" >> beam.ParDo(lambda line: line.split(" "))
                  | "Pair of One" >> beam.Map(lambda w: (w, 1))
                  | "Group" >> beam.GroupByKey()
                  | "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
    result = pipeline.run()

I actually wanted to print the result to console. But I couldn't find any documentation around it.

Is there a way to print the result to console instead of saving it to a file each time?

Catamnesis answered 25/9, 2017 at 13:26 Comment(1)
I have the same question as this post's. I'm working with Java and don't know how to print intermediate values onto the console. Would appreciate if anybody could help me out.Pacifa
C
10

After exploring furthermore and understanding how I can write testcases for my application I figure out the way to print the result to console. Please not that I am right now running everything to a single node machine and trying to understand functionality provided by apache beam and how can I adopt it without compromising industry best practices.

So, here is my solution. At the very last stage of our pipeline we can introduce a map function that will print result to the console or accumulate the result in a variable later we can print the variable to see the value

import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
    | "split" >> beam.ParDo(lambda row: row.split(" "))
    | "pair" >> beam.Map(lambda w: (w, 1))
    | "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
    output.append(row)
    return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available
result.wait_until_finish()

# print the output
print output
Catamnesis answered 26/9, 2017 at 15:25 Comment(3)
Nice idea, but this won't work if your pipeline is executed in a distributed manner as for instance in Apache Yarn (Hadoop) or within Google Dataflow. There must be another way to collect the results. But I'm still searching for it.Counselor
When I am using pipeline.run() I am getting this error - 'PBegin' object has no attribute 'windowing'Civet
This is great for unit tests in a DirectRunner.Loathe
L
15

You don't need the temp list. In python 2.7 the following should be sufficient:

def print_row(row):
    print row

(pipeline 
    | ...
    | "print" >> beam.Map(print_row)
)

result = pipeline.run()
result.wait_until_finish()

In python 3.x, print is a function so the following is sufficient:

(pipeline 
    | ...
    | "print" >> beam.Map(print)
)

result = pipeline.run()
result.wait_until_finish()
Lolitaloll answered 2/4, 2018 at 18:54 Comment(1)
Note that if you try to add this in the middle of your pipeline, you may get the error TypeError: 'NoneType' object is not subscriptable from your pipeline. This is because print returns None, which gets passed along to your following instructions. In this case you will need somewhat different code, to print the value and then return it.Arezzo
C
10

After exploring furthermore and understanding how I can write testcases for my application I figure out the way to print the result to console. Please not that I am right now running everything to a single node machine and trying to understand functionality provided by apache beam and how can I adopt it without compromising industry best practices.

So, here is my solution. At the very last stage of our pipeline we can introduce a map function that will print result to the console or accumulate the result in a variable later we can print the variable to see the value

import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
    | "split" >> beam.ParDo(lambda row: row.split(" "))
    | "pair" >> beam.Map(lambda w: (w, 1))
    | "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
    output.append(row)
    return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available
result.wait_until_finish()

# print the output
print output
Catamnesis answered 26/9, 2017 at 15:25 Comment(3)
Nice idea, but this won't work if your pipeline is executed in a distributed manner as for instance in Apache Yarn (Hadoop) or within Google Dataflow. There must be another way to collect the results. But I'm still searching for it.Counselor
When I am using pipeline.run() I am getting this error - 'PBegin' object has no attribute 'windowing'Civet
This is great for unit tests in a DirectRunner.Loathe
M
4

Maybe logging info instead of print?

def _logging(elem):
    logging.info(elem)
    return elem

P | "logging info" >> beam.Map(_logging)
Midweek answered 9/12, 2021 at 20:22 Comment(0)
H
0

Follow an example from pycharm Edu

import apache_beam as beam

class LogElements(beam.PTransform):
    class _LoggingFn(beam.DoFn):

        def __init__(self, prefix=''):
            super(LogElements._LoggingFn, self).__init__()
            self.prefix = prefix

        def process(self, element, **kwargs):
            print self.prefix + str(element)
            yield element

    def __init__(self, label=None, prefix=''):
        super(LogElements, self).__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._LoggingFn(self.prefix))

class MultiplyByTenDoFn(beam.DoFn):

    def process(self, element):
        yield element * 10

p = beam.Pipeline()

(p | beam.Create([1, 2, 3, 4, 5])
   | beam.ParDo(MultiplyByTenDoFn())
   | LogElements())

p.run()

Output

10
20
30
40
50
Out[10]: <apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x7ff41418a210>
Heartfelt answered 28/6, 2019 at 15:55 Comment(0)
C
0
with beam.Pipeline() as pipeline:
lines = pipeline | beam.Create(["this is test", "this is another test"])
word_count = (lines 
              | "Word" >> beam.ParDo(lambda line: line.split(" "))
              | "Pair of One" >> beam.Map(lambda w: (w, 1))
              | "Group" >> beam.GroupByKey()
              | "Count" >> beam.Map(lambda o: (o[0],str(sum(o[1])))))
word_count | beam.ParDo(lambda x: print(x))
result = pipeline.run()

This works like a charm !

Output: ('this', '2') ('is', '2') ('test', '2') ('another', '1') ('this', '2') ('is', '2') ('test', '2') ('another', '1')

Casillas answered 9/9, 2023 at 18:37 Comment(0)
P
-2

I know it isn't what you asked for but why don't you store it to a text file? It's always better than printing it via stdout and it isn't volatile

Photic answered 26/9, 2017 at 16:45 Comment(2)
In the more general case of not printing, but having the value available in runtime, I do have a use case (although I might be using it wrong). In the context of Tensorflow and Tensorflow Transform which I am dealing with, I wanted to count during the transform context, which uses Beam, and then use this value in operations during training. So keeping the count in memory is more handy than saving it to file and loading it again. But as said, this is not printing.Langille
This is more of a comment than an answerOdeen

© 2022 - 2024 — McMap. All rights reserved.