kafka-python - How do I commit a partition?
Asked Answered
C

6

12

Using kafka-python-1.0.2.

If I have a topic with 10 partitions, how do I go about committing a particular partition, while looping through the various partitions and messages. I just cant seem find an example of this anywhere, in the docs or otherwise

From the docs, I want to use:

consumer.commit(offset=offsets)

Specifically, how do I create the partition and OffsetAndMetadata dictionary required for offsets (dict, optional) – {TopicPartition: OffsetAndMetadata}.

I was hoping the function call would just be something like:

consumer.commit(partition, offset)

but this does not seem to be the case.

Thanks in advance.

Christiansen answered 12/4, 2016 at 17:8 Comment(0)
C
13

So it looks like I may have figured it out, funny how that happens when you write down your questions. This seems to work:

meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)

More testing is needed, but will update if anything changes.

Christiansen answered 12/4, 2016 at 17:43 Comment(3)
This is the way to do it, I reached out to the kafka team on GitLab. Response: "the metadata is really just an opaque string. You can also pass None. Nothing uses metadata internally, it is there as a way for you to store application-specific data if needed. But very few folks actually use that functionality, so beware if you go down that path.Christiansen
Here is the link to that thread: github.com/dpkp/kafka-python/issues/645Christiansen
If this works then you should probably accept your answerAutobiography
G
7

Is not necessary to use the metadata. Look this example:

from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
...
topic = 'your_topic'
partition = 0
tp = TopicPartition(topic,partition)
kafkaConsumer = createKafkaConsumer()
kafkaConsumer.assign([tp])
offset = 15394125
kafkaConsumer.commit({
    tp: OffsetAndMetadata(offset, None)
})

Hope this helps.

Gapeworm answered 14/10, 2016 at 12:0 Comment(0)
P
5
from kafka import KafkaConsumer
from kafka import TopicPartition

TOPIC = "test_topic"
PARTITION = 0

consumer = KafkaConsumer(
    group_id=TOPIC,
    auto_offset_reset="earliest",
    bootstrap_servers="localhost:9092",
    request_timeout_ms=100000,
    session_timeout_ms=99000,
    max_poll_records=100,
)
topic_partition = TopicPartition(TOPIC, PARTITION)
# format: topic, partition
consumer.assign([topic_partition])
consumer.seek(topic_partition, 1660000)
# format: TopicPartition, offset. 1660000 is the offset been set.
for message in consumer:
    # do something
  1. This only assigns one partition and sets offset for that partition, if there are more that one partition, you need to assign one for each of them and then set the offset.
  2. aalmeida88's answer works for me sometimes, when in some situations, it does work, and aalmeida88 gave me ideas to seek and it seems it is also a useful method.
  3. Another thing you may need to notice is that when you assign partitions by yourself, it seems that kafka manager could not get the consumer info, this might be because when you assign partitions, you set it in kafka instead of zookeeper, so kafka manager may not get that info. Hope it helps!

---edit-----

Find a better way to do it.

topic_partition = TopicPartition(TOPIC,
                                 message.partition)
consumer.seek(topic_partition, offset_value)
consumer.commit()

This will extract the partition info from message obtained from kafka and save the clause to assign partition manually, thus brings convenience when there are more than one partitions' offset (not uncommon) need to be set in program.

ps: in order to ensure that one partition is set only once, a flag should be set according to your application.

Pyrrho answered 18/5, 2017 at 1:29 Comment(0)
U
4

Just need to call consumer.commit()

from kafka import KafkaConsumer

KAFKA_TOPIC_NAME='KAFKA_TOPIC_NAME'
KAFKA_CONSUMER_GROUP='KAFKA_CONSUMER_GROUP'
consumer = KafkaConsumer(
    KAFKA_TOPIC_NAME,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id=KAFKA_CONSUMER_GROUP
)
for message in consumer:
    print(message.value)
    consumer.commit()    # <--- This is what we need
    # Optionally, To check if everything went good
    from kafka import TopicPartition
    print('New Kafka offset: %s' % consumer.committed(TopicPartition(KAFKA_TOPIC_NAME, message.partition)))
Ultramundane answered 9/7, 2019 at 1:38 Comment(0)
F
1
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
        
consumer = KafkaConsumer("topic_name", enable_auto_commit= False, bootstrap_servers=["128.0.0.1:9092"],group_id= "group_name")
msg = next(consumer)
consumer.commit({TopicPartition("topic_name", msg.partition): OffsetAndMetadata(msg.offset+1, '')})
Fenske answered 25/3, 2021 at 5:30 Comment(0)
R
0
from kafka import OffsetAndMetadata
from kafka import TopicPartition

consumer.subscribe(topic_config)
while(True):
    try:
        raw_messages = consumer.poll(timeout_ms=1000)
        for tp, messages in raw_messages.items():
            print ("%s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition,
                                                  message.offset, message.key,
                                                  message.value))
            meta = consumer.partitions_for_topic(message.topic)
            partition = TopicPartition(message.topic, message.partition)
            offsets = OffsetAndMetadata(message.offset+1, meta)
            options = {partition: offsets}
            consumer.commit(offsets=options)
    except Exception as e:
        print(e)

reference here: https://github.com/dpkp/kafka-python/issues/645

Resor answered 5/6, 2023 at 8:46 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.