I am trying to gracefully shutdown a kafka consumer, but the script blocks with Stopping HeartBeat thread. How can i gracefully close the consumer on a SIGTERM with kafka-python. This is what i have done
import logger as logging
import time
import sys
from kafka import KafkaConsumer
import numpy as np
import signal
log = logging.getLogger(__name__)
class Cons:
def __init__(self):
signal.signal(signal.SIGINT, self.sigterm_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
self.consumer = KafkaConsumer('dummy-topic', group_id='poll-test', bootstrap_servers=['b1'])
def sigterm_handler(self, signum, frame):
log.info("Sigterm handler")
self.consumer.close(autocommit=False)
sys.exit(0)
def consume(self):
try:
while True:
records = self.consumer.poll(timeout_ms=500, max_records=500)
for topic_partition, consumer_records in records.items():
for record in consumer_records:
log.info("Got Record - {}".format(record))
#code to manually commit
except ValueError as e:
log.exception("exception")
if __name__ == '__main__':
c=Cons()
c.consume()
With debug logs enabled, this is the output i get and the code gets blocked on this.
^C2020-04-28 07:18:33,050 - MainThread - __main__ - INFO - Sigterm handler
2020-04-28 07:18:33,050 - MainThread - kafka.consumer.group - DEBUG - Closing the KafkaConsumer.
2020-04-28 07:18:33,051 - MainThread - kafka.coordinator - INFO - Stopping heartbeat thread
What is the reason behind this? and what is right way to close a consumer on SIGTERM or SIGINT?