Here a simple code to test how it works.
doc -> https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
public class KafkaTest {
public static final String KAFKA_TOPIC_NAME = "kafka-xx-test-topic";
public static final String CONSUMER_GROUP_ID = "test-consumer-xx";
public static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
final KafkaProducer<Object, Object> kafkaProducer = new KafkaProducer<>(getProps());
for (int i = 0; i < 1000; i++) {
kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC_NAME, "Data_" + i));
}
final Consumer<Long, String> consumer = new KafkaConsumer<>(getProps());
consumer.subscribe(Collections.singletonList(KAFKA_TOPIC_NAME));
TopicPartition actualTopicPartition = new TopicPartition(KAFKA_TOPIC_NAME, 0);
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(60));
consumerRecords.forEach(record -> {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
}
});
final long committedOffset = consumer.committed(Collections.singleton(actualTopicPartition)).get(actualTopicPartition).offset();
final long consumerCurrentOffset = consumer.position(actualTopicPartition);
System.out.println("Poll finish.. consumer-offset: " + consumerCurrentOffset + " - committed-offset: " + committedOffset + " " + LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
}
}
private static Map<String, Object> getProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // Default: latest
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Default: true
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // Default: 500
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Default: 5000
return props;
}
}
- Polls every 2 sec
- Auto commit every 5 sec
output like below
Poll finish.. consumer-offset: 1010 - committed-offset: 1000 17:07:05
Poll finish.. consumer-offset: 1020 - committed-offset: 1000 17:07:07
Poll finish.. consumer-offset: 1030 - committed-offset: 1000 17:07:09
Poll finish.. consumer-offset: 1040 - committed-offset: 1030 17:07:11 -> commit when poll finish because of elapsed time(6 sec) > commit interval(5 sec)
Poll finish.. consumer-offset: 1050 - committed-offset: 1030 17:07:13
Poll finish.. consumer-offset: 1060 - committed-offset: 1030 17:07:15
Poll finish.. consumer-offset: 1070 - committed-offset: 1060 17:07:17 -> auto commit
Poll finish.. consumer-offset: 1080 - committed-offset: 1060 17:07:19
Poll finish.. consumer-offset: 1090 - committed-offset: 1060 17:07:21
Poll finish.. consumer-offset: 1100 - committed-offset: 1090 17:07:23 -> auto commit