Apache Beam Python SDK ReadFromKafka does not receive data
Asked Answered
V

0

11

I'm trying out a simple example of reading data off a Kafka topic into Apache Beam. Here's the relevant snippet:

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from Kafka' >> ReadFromKafka(
            consumer_config={'bootstrap.servers': 'localhost:29092'},
            topics=['test'])
        | 'Print' >> beam.Map(print))

Using the above Beam pipeline snippet, I don't see any messages coming in. Kafka is running locally in a docker container, and I'm able to use kafkacat from the host (outside the container) to publish and subscribe to messages. So, I guess there are no issues on that front.

It appears that Beam is able to connect to Kafka and get notified of new messages, as I see the offset changes in the Beam logs as I publish data from kafkacat:

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for partition test-0 to offset 29."
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

This is how I'm publishing data using kafkacat:

$ kafkacat -P -b localhost:29092 -t test -K:
1:foo
1:bar

and I can confirm that its being received, again using kafkacat:

$ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
Key: 1 Value: foo
Key: 1 Value: bar

But despite this, I don't see the actual message being printed by Beam as I expected. Any pointers to what's missing here are appreciated. I'm suspecting this could be a decoding issue on the Beam pipeline side, but could be incorrect.

Edit (17 Mar 2021):

After following this issue with the Beam Python Kafka connector developers, the underlying reason why Python ReadFromKafka is not behaving as expected is that the portable Flink runner cannot execute unbounded Splittable DoFns (SDFs) as it only supports self-checkpoints. Portable streaming Flink doesn't issue checkpoint requests to the SDK regularly. That's why all Kafka records are buffered at the first ReadFromKafka stage. The Jira tracking this issue is https://issues.apache.org/jira/browse/BEAM-11991. Additionally, there is another Jira that is tracking the feature request to support this: https://issues.apache.org/jira/browse/BEAM-11998. Hope this helps!

Vasoinhibitor answered 11/2, 2021 at 9:23 Comment(3)
LocalRunner also suffers from this issue.Wimble
Thank you for the update. I was having similar issues. Saved me a lot of time.Premedical
How did you resolve this issue? I'm facing the same issue; I'm publishing data to kafka and trying to read those data from beam using portable flink runner but it's not consuming those data directlyGault

© 2022 - 2024 — McMap. All rights reserved.