Python-Kafka: Keep polling topic infinitely
Asked Answered
L

3

5

I am using python-kafka to listen to a kafka topic and use that the records. I want to keep it polling infinitely without any exit. This is my code below:

def test():
    consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
    for msg in consumer:
        print(msg.value)

This code just reads the data and exits directly. Is there a way to keep listening to topics even if message is not pushed to it?

Any relevant example where the topic is continuously monitored is also great for me.

Lilalilac answered 8/2, 2021 at 12:15 Comment(0)
P
7

Using confluent_kafka

import time
from confluent_kafka import Consumer


consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-1',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])

while True:
    try: 
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # Sleep for 2 minutes

        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        print(f"Received message: {message.value().decode('utf-8')}")
    except:
        # Handle any exception here
        ...
    finally:
        consumer.close()
        print("Goodbye")

Using kafka-python

import time
from kafka import KafkaConsumer

consumer = KafkaConsumer(
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     group_id='my-consumer-1',
)
consumer.subscribe(['topicName'])

while True:
    try: 
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # Sleep for 2 minutes

        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        print(f"Received message: {message.value().decode('utf-8')}")
    except:
        # Handle any exception here
        ...
    finally:
        consumer.close()
        print("Goodbye")
  
Polarity answered 8/2, 2021 at 12:27 Comment(4)
can you please why we need to do consume.poll here? what does that 10 mean?Lilalilac
@Lilalilac It's the timeout: kafka-python.readthedocs.io/en/master/apidoc/…Polarity
does the message returned from consumer.poll when using kafka-python have error() or value()? I got an AttributeError - 'dict' object has no attribute 'value' exception when calling message.value() or message.error(). I use kafka-python 2.0.2 and KafkaConsumer.poll() returns dictionary as a response.Headpin
same thing, poll result doesn't have value() or error()Caseose
B
4

for kafka-python following solution worked

from kafka import KafkaConsumer


    def consume_message(topic_name):
        consumer = KafkaConsumer(
            bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest'
        )
        consumer.subscribe(topic_name)
        while True:
            try:
                records = consumer.poll(timeout_ms=1000)
    
                for topic_data, consumer_records in records.items():
                    for consumer_record in consumer_records:
                        print("Received message: " + str(consumer_record.value.decode('utf-8')))
                continue
            except Exception as e:
                print(e)
                continue
Blowy answered 4/7, 2022 at 14:50 Comment(0)
H
0

An alternative option is to use aiokafka, a client built upon kafka-python that provides coroutine-based analogues; since this keeps the asyncio event loop hydrated with tasks, it's possible to run a coroutine using the high-level asyncio.run() API that polls a topic and stays alive until the program is terminated:

import asyncio
import logging
import os
from aiokafka import AIOKafkaConsumer

consumer_group_id = os.environ["APPLICATION_CONSUMER_GROUP_ID"]
logger = logging.getLogger(consumer_group_id)

logging.basicConfig(level=logging.INFO, force=True)

async def consume():
    consumer = AIOKafkaConsumer(
        os.environ["APPLICATION_CONSUMER_KAFKA_TOPIC"],
        group_id=consumer_group_id,
        bootstrap_servers=["kafka:29092"],
        value_deserializer=
    )

    await consumer.start()

    try:
        async for message in consumer:
            logger.info(message.value)
    finally:
        await consumer.stop()


asyncio.run(consume())
Hesperian answered 5/12, 2023 at 22:4 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.