I'm using confluent-kafka-client. I have one producer producing into a topic with one partition and one consumer within one group ID. First, I create a producer (with default configs) for the topic (if the topic doesn't exist, I create one with that name)
self.producer = confluent_kafka.Producer({"bootstrap.servers": bootstrap_servers})
Then, I create a consumer and subscribe it to the topic (with default configs, auto.offset.reset="latest"
)
self.consumer = confluent_kafka.Consumer(
{"bootstrap.servers": self.bootstrap_servers,
"group.id": self.group_id},
logger=logger,
)
self.consumer.subscribe(self.topic_names, on_assign=print_assignment)
self.consumer.poll(0) # first call
I realized that self.consumer.poll(0)
doesn't register that consumer to the topic since there is no data on that topic yet. After, the producer produces a record. Then, I call
consumer.poll(0) # second call
expecting to get data. However, it returns None
. In fact, after data is produced, the call of poll(0)
registers the consumer. I can get data by calling the third time.
How to register a consumer to a topic if there is no data on that topic yet?
References: Kafka consumer.poll returns no records
consumer.poll(0)
doesn't guarantee that a partition is assigned. – Cytosine