How to combine two results and pipe it to next step in apache-beam pipeline
Asked Answered
U

2

5

See below code snippet, I want ["metric1", "metric2"] to be my input for RunTask.process. However it was run twice with "metric1" and "metric2" respectively

def run():
  
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  p = beam.Pipeline(options=pipeline_options)

  root = p | 'Get source' >> beam.Create([
      "source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
  ])

  metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
  metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]

  metric3 = (metric1, metric2) | beam.Flatten() | beam.ParDo(RunTask()) # I want ["metric1", "metric2"] to be my input for RunTask.process. However it was run twice with "metric1" and "metric2" respectively

  

Ungotten answered 23/7, 2020 at 8:38 Comment(0)
H
11

I understand that you want to join two PCollections in a way they follow this syntax : ['element1','element2']. In order to achieve that you can use CoGroupByKey() instead of Flatten().

Considering your code snippet, the syntax would:

def run():
  
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  p = beam.Pipeline(options=pipeline_options)

  root = p | 'Get source' >> beam.Create([
      "source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
  ])

  metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
  metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]

  metric3 = (
       (metric1, metric2) 
       | beam.CoGroupByKey() 
       | beam.ParDo(RunTask()) 
 )

I would like to point out the difference between Flatten() and CoGroupByKey().

1) Flatten() receives two or more PCollections, which stores the same data type, and merge them into one logical PCollection. For example,

import apache_beam as beam

from apache_beam import Flatten, Create, ParDo, Map

p = beam.Pipeline()

adress_list = [
    ('leo', 'George St. 32'),
    ('ralph', 'Pyrmont St. 30'),
    ('mary', '10th Av.'),
    ('carly', 'Marina Bay 1'),
]
city_list = [
    ('leo', 'Sydney'),
    ('ralph', 'Sydney'),
    ('mary', 'NYC'),
    ('carly', 'Brisbane'),
]

street = p | 'CreateEmails' >> beam.Create(adress_list)
city = p | 'CreatePhones' >> beam.Create(city_list)

resul =(
    (street,city)
    |beam.Flatten()
    |ParDo(print)
)

p.run()

And the output,

('leo', 'George St. 32')
('ralph', 'Pyrmont St. 30')
('mary', '10th Av.')
('carly', 'Marina Bay 1')
('leo', 'Sydney')
('ralph', 'Sydney')
('mary', 'NYC')
('carly', 'Brisbane')

Notice that, both PCollections are in the output. However, one is appended to the other.

2) CoGroupByKey() performs a relational join between two or more key value PCollections, which have the same key type. Using this method you will perform a join by key, not appending as done in Flatten(). Below is an example,

import apache_beam as beam

from apache_beam import Flatten, Create, ParDo, Map

p = beam.Pipeline()

address_list = [
    ('leo', 'George St. 32'),
    ('ralph', 'Pyrmont St. 30'),
    ('mary', '10th Av.'),
    ('carly', 'Marina Bay 1'),
]
city_list = [
    ('leo', 'Sydney'),
    ('ralph', 'Sydney'),
    ('mary', 'NYC'),
    ('carly', 'Brisbane'),
]

street = p | 'CreateEmails' >> beam.Create(address_list)
city = p | 'CreatePhones' >> beam.Create(city_list)

results = (
    (street, city)
    | beam.CoGroupByKey()
    |ParDo(print)
    #| beam.io.WriteToText('delete.txt')
    
)

p.run()

And the output,

('leo', (['George St. 32'], ['Sydney']))
('ralph', (['Pyrmont St. 30'], ['Sydney']))
('mary', (['10th Av.'], ['NYC']))
('carly', (['Marina Bay 1'], ['Brisbane']))

Notice that you need a primary key in order to join the results. Also, this output is what you expect in your case.

Hyperbole answered 24/7, 2020 at 9:25 Comment(0)
U
1

Alternatively, use side input:

metrics3 = metric1 | beam.ParDo(RunTask(), metric2=beam.pvalue.AsIter(metric2))

in RunTask process():

def process(self, element_from_metric1, metric2):
  ...
Ungotten answered 30/7, 2020 at 20:27 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.