from kafka import KafkaConsumer
from kafka import TopicPartition
TOPIC = "test_topic"
PARTITION = 0
consumer = KafkaConsumer(
group_id=TOPIC,
auto_offset_reset="earliest",
bootstrap_servers="localhost:9092",
request_timeout_ms=100000,
session_timeout_ms=99000,
max_poll_records=100,
)
topic_partition = TopicPartition(TOPIC, PARTITION)
# format: topic, partition
consumer.assign([topic_partition])
consumer.seek(topic_partition, 1660000)
# format: TopicPartition, offset. 1660000 is the offset been set.
for message in consumer:
# do something
- This only assigns one partition and sets offset for that partition, if there are more that one partition, you need to assign one for each of them and then set the offset.
- aalmeida88's answer works for me sometimes, when in some situations, it does work, and aalmeida88 gave me ideas to seek and it seems it is also a useful method.
- Another thing you may need to notice is that when you assign partitions by yourself, it seems that kafka manager could not get the consumer info, this might be because when you assign partitions, you set it in kafka instead of zookeeper, so kafka manager may not get that info.
Hope it helps!
---edit-----
Find a better way to do it.
topic_partition = TopicPartition(TOPIC,
message.partition)
consumer.seek(topic_partition, offset_value)
consumer.commit()
This will extract the partition info from message obtained from kafka and save the clause to assign partition manually, thus brings convenience when there are more than one partitions' offset (not uncommon) need to be set in program.
ps: in order to ensure that one partition is set only once, a flag should be set according to your application.