kafka-python: Closing the kafka producer with 0 vs inf secs timeout
Asked Answered
E

0

7

I am trying to produce the messages to a Kafka topic using kafka-python 2.0.1 using python 2.7 (can't use Python 3 due to some workplace-related limitations)

I created a class as below in a separate and compiled the package and installed in virtual environment:

import json
from kafka import KafkaProducer


class KafkaSender(object):
    def __init__(self):
        self.producer = self.get_kafka_producer()

    def get_kafka_producer(self):
        return KafkaProducer(
            bootstrap_servers=['locahost:9092'],
            value_serializer=lambda x: json.dumps(x),
            request_timeout_ms=2000,
        )

    def send(self, data):
        self.producer.send("topicname", value=data)

My driver code is something like this:

from mypackage import KafkaSender

# driver code
data = {"a":"b"}
kafka_sender = KafkaSender()
kafka_sender.send(data)

Scenario 1:
I run this code, it runs just fine, no errors, but the message is not pushed to the topic. I have confirmed this as offset or lag is not increased in the topic. Also, nothing is getting logged at the consumer end.

Scenario 2:
Commented/removed the initialization of Kafka producer from __init__ method.
I changed the sending line from self.producer.send("topicname", value=data) to self.get_kafka_producer().send("topicname", value=data) i.e. creating kafka producer not in advance (during class initialization) but right before sending the message to topic. And when I ran the code, it worked perfectly. The message got published to the topic.

My intention using scenario 1 is to create a Kafka producer once and use it multiple times and not to create Kafka producer every time I want to send the messages. This way I might end up creating millions of Kafka producer objects if I need to send millions of messages.

Can you please help me understand why is Kafka producer behaving this way.

NOTE: If I write the Kafka Code and Driver code in same file it works fine. It's not working only when I write the Kafka code in separate package, compile it and import it in my another project.

LOGS: https://www.diffchecker.com/dTtm3u2a

Update 1: 9th May 2020, 17:20:
Removed INFO logs from the question description. I enabled the DEBUG level and here is the difference between the debug logs between first scenario and the second scenario

https://www.diffchecker.com/dTtm3u2a

Update 2: 9th May 2020, 21:28:
Upon further debugging and looking at python-kafka source code, I was able to deduce that in scenario 1, kafka sender was forced closed while in scenario 2, kafka sender was being closed gracefully.

    def initiate_close(self):
        """Start closing the sender (won't complete until all data is sent)."""
        self._running = False
        self._accumulator.close()
        self.wakeup()

    def force_close(self):
        """Closes the sender without sending out any pending messages."""
        self._force_close = True
        self.initiate_close()

And this depends on whether kafka producer's close() method is called with timeout 0 (forced close of sender) or without timeout (in this case timeout takes value float('inf') and graceful close of sender is called.)

Kafka producer's close() method is called from __del__ method which is called at the time of garbage collection. close(0) method is being called from method which is registered with atexit which is called when interpreter terminates.
Question is why in scenario 1 interpreter is terminating?

Effortful answered 8/5, 2020 at 9:45 Comment(1)
Hey, it's been a long time, but have you found any answer?Bespectacled

© 2022 - 2025 — McMap. All rights reserved.