Kafka produce.send never sends the message
Asked Answered
F

3

9

I am using Kafka 2.12 and kafka-python module as Kafka client. I am trying to test a simple producer:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))

When this process is instantiated, the message is never received by the consumer

If I flush the producer and change linger_ms param (making it sync), the message is sent and read by the consumer:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))
    producer.flush()

In previous Kafka versions, there was the param queue.buffering.max.ms to specify how long the producer will wait until send the messages in the queue, but it is not present in the latest version (kafka-python 1.3.3). How could I specify this in newer Kafka versions to keep my comm asyncronous?

Thanks!

Floccose answered 14/7, 2017 at 0:33 Comment(0)
K
18

As you observed, messages are queued up for asynchronous sending, and there is no guarantee that it will send immediately. So if you want to force the message to be sent to the broker, you need to explicitly call producer.flush() which will block until the message is sent (although flush() does not guarantee acks).

Note: Because flush() is a blocking call, it's typically only recommended for low-throughput systems or at application close. The throughput hit of sync sends vs async sends typically isn't feasible for high-volume systems. My experience has been that the producer generally sends pretty quickly without needing to call flush(), except for test suites / developing where you need it to happen immediately.

I'm fairly sure the param queue.buffering.max.ms was replaced by linger_ms: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer

So you're already using that param in your working example.

Kremer answered 13/10, 2017 at 18:43 Comment(3)
Wow, thanks. Pretty irritating that on kafka-python.readthedocs.io/en/master/usage.html there is a dozen different examples of sending, but only one .flush() statement at the bottom, so easy to miss.Devito
True, that can be confusing for someone new to Kafka. It's open source, so I'm sure they'd welcome a PR improving their docs. Also, async send + flush() is the most common API model across the kafka client library ecosystem in other languages, so folks who know Kafka but not Python are probably less surprised by it.Kremer
You're right, it's open source, so I can't complain. I had been reading confluent documentation earlier in the day, and forgot which one I was referring to. The flush paradigm is common outside of Kafka as well, but I couldn't find it mentioned in the docs or the code (except at the bottom)Devito
E
6
producer = KafkaProducer(bootstrap_servers='kafkaIp:kafkaPort')
producer.send("topic_name", b'Your string here')
producer.flush()

Use send and flush.

Emeric answered 30/12, 2019 at 12:37 Comment(0)
K
0

We wanted to ensure that our messages were being sent quickly, so we just added in a separate thread that ran a while loop that did nothing but call producer.flush(timeout = 0.1) and sleep* for 100 ms.

We didn't want to eliminate all the throughput benefits of batching, but we also wanted to ensure that when traffic volume was low, messages were processed with minimal delay (ms, not minutes.)

* We're using gevent. The sleep might not be needed if you're using ordinary threading.

Kaliningrad answered 15/3, 2022 at 19:27 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.