How to add a failure callback for kafka-python kafka.KafkaProducer#send()?
Asked Answered
T

1

7

I would like to set a callback to be fired if a produced records fail. Initially, I would just like to log the failed record.

The Confluent Kafka python library provides a mechanism for adding a callback:

produce(topic[, value][, key][, partition][, on_delivery][, timestamp])
...
    on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery

How can I achieve similar behaviour with kafka-python kafka.KafkaProducer#send() without having to use the deprecated SimpleClient using kafka.SimpleClient#send_produce_request()

Thimble answered 24/9, 2017 at 8:27 Comment(0)
S
12

Although it isn't documented, this is relatively straightforward. Whenever you send a message, you immediately get a Future back. You can append callbacks/errback's to that Future:

F = producer.send(topic=topic, value=message, key=key)
F.add_callback(callback, message=message, **kwargs_to_pass_to_callback_method)
F.add_errback(erback, message=message, **kwargs_to_pass_to_errback_method)

Relevant source code here: https://github.com/dpkp/kafka-python/blob/1937ce59b4706b44091bb536a9b810ae657c3225/kafka/future.py#L48-L64

We really should document this, I filed https://github.com/dpkp/kafka-python/issues/1256 to track it.

Salpingitis answered 13/10, 2017 at 18:33 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.