Error while running beam streaming pipeline (Python) with pub/sub io in embedded Flinkrunner (apache_beam [GCP])
Asked Answered
N

1

6

I am facing the following error while running a streaming pipeline (python) in Apache Beam on Flinkrunner. The pipeline contains a GCP pub/sub io source and pub/sub target.

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.

ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "23 Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
Traceback (most recent call last):
  File "<stdin>", line 5, in <module>
  File "/usr/local/lib64/python3.6/site-packages/apache_beam/pipeline.py", line 586, in __exit__
    self.result.wait_until_finish()
  File "/usr/local/lib64/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", line 599, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline BeamApp-swarna0kpaul-0712135603-763999c_45da372e-757d-4690-8e25-1a5ed0a5cc84 failed in state FAILED: java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "23 Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced

I am trying to run the following code in Python I am trying to run using 2 pub/sub topics I created in my GCP account ({input topic},{output topic}) The topics are in this format - projects/{project name}/topics/{topic name}

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
input_topic=<input topic>
output_topic=<output topic>
options = PipelineOptions(["--runner=FlinkRunner", "--checkpointing_interval=1000","--streaming"])
with beam.Pipeline(options=options ) as pipeline:
  input1 = pipeline | " Read from Pub/Sub" >> beam.io.ReadFromPubSub(topic=input_topic).with_output_types(bytes)
  output = (input1
            |beam.WindowInto(beam.transforms.window.FixedWindows(5))
            |"Write to Pub/Sub" >>beam.io.WriteToPubSub(topic=output_topic, with_attributes=False).with_input_types(bytes))

The following versions of the software are available in the system

Python 3.6.8
apache_beam [gcp]==2.30.0
java -version
openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)

I tried running this with flink cluster and portable flink runner as per specification in this page, but got the same error.

The same code is running fine when I used the following options

options = PipelineOptions(["--streaming"])
Nikolas answered 12/7, 2021 at 4:58 Comment(10)
Is it possible to provide full code? Also could you provide all steps you've done?Choplogic
I apologize there was some error in the earlier code. I have updated the code I am trying to run using 2 pub/sub topics I created in my GCP account (<input topic>,<output topic>) The topics are in this format - projects/<project name>/topics/<topic name> Here is the list of steps I performed - created a centos 8 GCP compute instance - Installed java - Installed following python libraries Cython apache_beam [gcp]==2.31.0 google-cloud-pubsub==1.7.0 grpcio==1.34.1 pandas flask flask_cors psycopg2 - Ran the above codeNikolas
So if I good understand, when you changed options = PipelineOptions(["--runner=FlinkRunner", "--checkpointing_interval=1000","--streaming"]) to options = PipelineOptions(["--streaming"]) it worked fine? If so, please provide answer with this solution which might be useful for other users. If you still have issue please provide more details.Choplogic
Running with options = PipelineOptions(["--streaming"]) is working fine. But I am still facing the same issues with FlinkRunner as mentioned above and I want to make it run with the FlinkRunner as the end goal is to run it in Flink clusterNikolas
where you able to solve this issue? we're having trouble running FlinkRunner locally with a pubsub emulator.Interlope
To clarify @Choplogic DirectRunner works just fine. It is only in FlinkRunner mode that has issues.Interlope
@Interlope not yet! Earlier I tried running in a flink cluster but same issue there also.Nikolas
how did u eventually solve it? i encountered exactly same error with SparkRunner: #72152950Iritis
Not able to solve it yet. ☹️Nikolas
Did this work for anyone?Blagoveshchensk
T
1

The apache_beam.io.ReadFromPubsub() transform only works with DirectRunner and Dataflow Runner, but there is an external transform you can try: apache_beam.io.external.gcp.pubsub.ReadFromPubSub, see: https://github.com/apache/beam/blob/release-2.39.0/sdks/python/apache_beam/io/external/gcp/pubsub.py#L39

Trestlework answered 20/5, 2022 at 14:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.