Kafka Consumer first poll(0) returns no data
C

2

0

I'm using confluent-kafka-client. I have one producer producing into a topic with one partition and one consumer within one group ID. First, I create a producer (with default configs) for the topic (if the topic doesn't exist, I create one with that name)

self.producer = confluent_kafka.Producer({"bootstrap.servers": bootstrap_servers})

Then, I create a consumer and subscribe it to the topic (with default configs, auto.offset.reset="latest")

self.consumer = confluent_kafka.Consumer(
            {"bootstrap.servers": self.bootstrap_servers,
                "group.id": self.group_id},
            logger=logger,
        )
self.consumer.subscribe(self.topic_names, on_assign=print_assignment)
self.consumer.poll(0) # first call

I realized that self.consumer.poll(0) doesn't register that consumer to the topic since there is no data on that topic yet. After, the producer produces a record. Then, I call consumer.poll(0) # second call expecting to get data. However, it returns None. In fact, after data is produced, the call of poll(0) registers the consumer. I can get data by calling the third time. How to register a consumer to a topic if there is no data on that topic yet?

References: Kafka consumer.poll returns no records

Cytosine answered 25/4 at 2:14 Comment(3)
@Gremi64, I followed your post. Can you pls guide me?Cytosine
can you please elaborate on "register a consumer to a topic", i'm not sure i answered your real question. if you want to "commit an offset", i did not answer that, but i canBraunschweig
@Gremi64, I mean the rebalancing process and partition assignment. It takes time to assign a partition to a consumer. A single call of consumer.poll(0) doesn't guarantee that a partition is assigned.Cytosine
C
0

I discovered that once a consumer subscribes to a topic using consumer.subscribe(topic), the initial calls of poll(0) may return no data as no partition is assigned yet. In fact, processes such as partition assignment and rebalance take time to be finally ready to fetch data. To circumvent "no data return", I implemented a strategy where I repeatedly call poll(0) until a partition is assigned to the consumer. This approach can be implemented in the Python confluent-kafka-client as shown below:

self.consumer.subscribe(self.topic_names, on_assign=print_assignment)
while len(self.consumer.assignment()) == 0:
    self.consumer.poll(0)

This method ensures that following this setup, the first call to poll(0) will return data immediately if new data is present in the topic. It is important to note that this solution is optimal when auto.offset.reset is set to "latest".

Cytosine answered 30/4 at 14:54 Comment(0)
B
1

I'm not familiar with the PythonKafkaApi, so i will answer as "in java it works like that".

  1. i'll explain your code
  2. i'll explain why it doesn't work
  3. i'll explain what i think you should modify

1st

you create a consumer

you subscribe to a topic

you try to get messages from the subscribed topics

2nd

your consumer creation seems good

the subscription looks good too

the poll here is not in an infinite loop and wait for 0ms, so, imho it doesn't have enough time to even complete the "partition assignation" and the consumer will disapear from the cluster knowledge

3rd

The poll(...) should be in an infinite loop, and you probably want to increase 0 to 1000.

Why infinite loop : because if not, after the poll, the consumer stop talking to the cluster, then the cluster consider your consumer dead (after a short period of time, defined in the consumer properties)

Why 1000 : because you want to wait more than 0ms if you want to read some messages (this value is arbitrary, you can go higher or lower, but not too low)

Be aware that "the first time your consumer try to consume some messages", it could take between 3s to 10s (depending from your cluster size, ram, cpu, ...)

If you keep auto.offset.reset="latest", you have to launch your consumer before you produce your first message.

How to register a consumer to a topic if there is no data on that topic yet?

You CAN register a consumer to a topic even if it has no data, but it has to "stay alive" (ie : infinite loop).

Braunschweig answered 26/4 at 9:58 Comment(2)
Thank you for your helpful response. I cannot increase timeout nor put poll(0) in an infinite loop as it's not compatible with the existing code. I figured out that consumer.assignment() is empty (meaning no partition assigned to that consumer) when I read data in a consumer. I solved that by calling poll until a partition is assigned to the consumer. while len(consumer.assignment()) == 0: consumer.poll(0)Cytosine
Well, you just put poll(0) in a loop... not infinite, but you get the idea. If my answer helped you, mark it as answer and/or +1Braunschweig
C
0

I discovered that once a consumer subscribes to a topic using consumer.subscribe(topic), the initial calls of poll(0) may return no data as no partition is assigned yet. In fact, processes such as partition assignment and rebalance take time to be finally ready to fetch data. To circumvent "no data return", I implemented a strategy where I repeatedly call poll(0) until a partition is assigned to the consumer. This approach can be implemented in the Python confluent-kafka-client as shown below:

self.consumer.subscribe(self.topic_names, on_assign=print_assignment)
while len(self.consumer.assignment()) == 0:
    self.consumer.poll(0)

This method ensures that following this setup, the first call to poll(0) will return data immediately if new data is present in the topic. It is important to note that this solution is optimal when auto.offset.reset is set to "latest".

Cytosine answered 30/4 at 14:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.