Using ToList output as input for AsSingleton or AsList in Apache Beam (python)
Asked Answered
V

1

5

I'm getting an unexpected error when I try to use the output of beam.combiners.ToList as the input of beam.pvalue.AsSingleton or beam.pvalue.AsList in order to experiment with side inputs. I was able to use single numbers (e.g.: the mean of a list) as a side input but, for lists and dictionaries, I'm getting exceptions. For beam.pvalue.AsSingleton, I get:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-4-0c1df7400a03> in <module>
     15 chain_total = chain_1 | chain_2
     16 
---> 17 chain_1 | beam.Map(m, beam.pvalue.AsList(chain_2))
     18 
     19 chain_total | beam.Map(print)

~/.cache/pypoetry/virtualenvs/prototyping-with-tensorflow-py3.6/lib/python3.6/site-packages/apache_beam/pvalue.py in __init__(self, pcoll)
    297     self.pvalue = pcoll
    298     self._window_mapping_fn = sideinputs.default_window_mapping_fn(
--> 299         pcoll.windowing.windowfn)
    300 
    301   def _view_options(self):

AttributeError: '_ChainedPTransform' object has no attribute 'windowing'

For beam.pvalue.AsList, I get:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-7-0c1df7400a03> in <module>
     15 chain_total = chain_1 | chain_2
     16 
---> 17 chain_1 | beam.Map(m, beam.pvalue.AsList(chain_2))
     18 
     19 chain_total | beam.Map(print)

~/.cache/pypoetry/virtualenvs/prototyping-with-tensorflow-py3.6/lib/python3.6/site-packages/apache_beam/pvalue.py in __init__(self, pcoll)
    297     self.pvalue = pcoll
    298     self._window_mapping_fn = sideinputs.default_window_mapping_fn(
--> 299         pcoll.windowing.windowfn)
    300 
    301   def _view_options(self):

AttributeError: '_ChainedPTransform' object has no attribute 'windowing'

This is the code I'm running

import apache_beam as beam


def m(x, u):
    print(u)
    return x


p = beam.Pipeline()

data_beam = Create(['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'])

chain_1 = p | data_beam | beam.combiners.Count.PerElement()
chain_2 = beam.Map(lambda x: x[0]) | beam.combiners.ToList()

chain_total = chain_1 | chain_2

chain_1 | beam.Map(m, beam.pvalue.AsSingleton(chain_2))

chain_total | beam.Map(print)

p.run()

Replace beam.pvalue.AsSingleton with beam.pvalue.AsList to get the other error. I'm using Apache Beam python SDK version 2.11.0.

Vivavivace answered 14/4, 2019 at 14:29 Comment(0)
A
7

PCollections are the nouns in Beam, and PTransforms are the verbs. When you begin a pipeline, p = beam.Pipeline() is your only noun. (N.B. Even Create is a verb.)

By applying various verbs on this noun, you can create other nouns according to the following rule:

  • new_noun = existing_noun | verb

The main source of confusion seems to arise because you can also chain verbs together:

  • fancy_verb = verb1 | verb2

While the syntax looks quite similar in these examples, the returned values have different types.

The main issue here is that only nouns can be treated as SideInputs.

In your example provided, chain_2 is a verb created by combining two verbs, and the error message confirms that _ChainedPTransform (indeed a kind of PTransform, as the name suggests) cannot be passed to any of the AsSideInput functions.

Aleece answered 16/4, 2019 at 21:25 Comment(1)
Wow, that is a very unique analogy I've not seen with apache beam, it helped clarify a few things for me, thanks!Bravin

© 2022 - 2024 — McMap. All rights reserved.