I'm trying out a simple example of reading data off a Kafka topic into Apache Beam. Here's the relevant snippet:
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:29092'},
topics=['test'])
| 'Print' >> beam.Map(print))
Using the above Beam pipeline snippet, I don't see any messages coming in. Kafka is running locally in a docker container, and I'm able to use kafkacat
from the host (outside the container) to publish and subscribe to messages. So, I guess there are no issues on that front.
It appears that Beam is able to connect to Kafka and get notified of new messages, as I see the offset changes in the Beam logs as I publish data from kafkacat
:
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for partition test-0 to offset 29."
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"
This is how I'm publishing data using kafkacat
:
$ kafkacat -P -b localhost:29092 -t test -K:
1:foo
1:bar
and I can confirm that its being received, again using kafkacat
:
$ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
Key: 1 Value: foo
Key: 1 Value: bar
But despite this, I don't see the actual message being printed by Beam as I expected. Any pointers to what's missing here are appreciated. I'm suspecting this could be a decoding issue on the Beam pipeline side, but could be incorrect.
Edit (17 Mar 2021):
After following this issue with the Beam Python Kafka connector developers, the underlying reason why Python ReadFromKafka
is not behaving as expected is that the portable Flink runner cannot execute unbounded Splittable DoFns (SDFs) as it only supports self-checkpoints. Portable streaming Flink doesn't issue checkpoint requests to the SDK regularly. That's why all Kafka records are buffered at the first ReadFromKafka
stage. The Jira tracking this issue is https://issues.apache.org/jira/browse/BEAM-11991. Additionally, there is another Jira that is tracking the feature request to support this: https://issues.apache.org/jira/browse/BEAM-11998. Hope this helps!