No current assignment for partition occurs even after poll in Kafka
Asked Answered
P

3

18

I have Java 8 application working with Apache Kafka 2.11-0.10.1.0. I need to use the seek feature to poll old messages from partitions. However I faced an exception of No current assignment for partition which is occurred every time I am trying to seekByOffset. Here's my class which is responsible for seeking topics to the specified timestamp:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
 * The main purpose of this class is to move fetching point for each partition of the {@link KafkaConsumer}
 * to some offset which is determined either by timestamp or by offset number.
 */
public class KafkaSeeker {
    public static final long APP_STARTUP_TIME = Instant.now().toEpochMilli();

    private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
    private final KafkaConsumer<String, String> kafkaConsumer;
    private ConsumerRecords<String, String> polledRecords;

    public KafkaSeeker(KafkaConsumer<String, String> kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
        this.polledRecords = new ConsumerRecords<>(Collections.emptyMap());
    }

    /**
     * For each assigned or subscribed topic {@link org.apache.kafka.clients.consumer.KafkaConsumer#seek(TopicPartition, long)}
     * fetching pointer to the specified {@code timestamp}.
     * If no messages were found in each partition for a topic,
     * then {@link org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(Collection)} will be called.
     *
     * Due to {@link KafkaConsumer#subscribe(Pattern)} and {@link KafkaConsumer#assign(Collection)} laziness
     * method needs to execute dummy {@link KafkaConsumer#poll(long)} method. All {@link ConsumerRecords} which were
     * polled from buffer are swallowed and produce warning logs.
     *
     * @param timestamp is used to find proper offset to seek to
     * @param topics are used to seek only specific topics. If not specified or empty, all subscribed topics are used.
     */
    public Map<TopicPartition, OffsetAndTimestamp> seek(long timestamp, Collection<String> topics) {
        this.polledRecords = kafkaConsumer.poll(0);
        Collection<TopicPartition> topicPartitions;
        if (CollectionUtils.isEmpty(topics)) {
            topicPartitions = kafkaConsumer.assignment();
        } else {
            topicPartitions = topics.stream()
                    .map(it -> {
                        List<Integer> partitions = kafkaConsumer.partitionsFor(it).stream()
                                .map(PartitionInfo::partition).collect(Collectors.toList());
                        return partitions.stream().map(partition -> new TopicPartition(it, partition));
                    })
                    .flatMap(it -> it)
                    .collect(Collectors.toList());
        }

        if (topicPartitions.isEmpty()) {
            throw new IllegalStateException("Kafka consumer doesn't have any subscribed topics.");
        }

        Map<TopicPartition, Long> timestampsByTopicPartitions = topicPartitions.stream()
                .collect(Collectors.toMap(Function.identity(), topicPartition -> timestamp));
        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
        Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampsByTopicPartitions);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            if (entry.getValue() != null) {
                LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to [{} offset].",
                        topicPartition.topic(),
                        topicPartition.partition(),
                        beginningOffsets.get(topicPartition),
                        entry.getValue());
                kafkaConsumer.seek(topicPartition, entry.getValue().offset());
            } else {
                LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to the end of partition.",
                        topicPartition.topic(),
                        topicPartition.partition());
                kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
            }
        }
        return offsets;
    }

    public ConsumerRecords<String, String> getPolledRecords() {
        return polledRecords;
    }
}

Before calling the method I have consumer subscribed to a single topic like this consumer.subscribe(singletonList(kafkaTopic));. When I get kafkaConsumer.assignment() it returns zero TopicPartitions assigned. But if I specify the topic and get its partitions then I have valid TopicPartitions, although they are failing on seek call with the error in the title. What is something I forgot?

Petard answered 1/2, 2019 at 13:39 Comment(0)
G
27

The correct way to reliably seek and check current assignment is to wait for the onPartitionsAssigned() callback after subscribing. On a newly created (still not connected) consumer, calling poll() once does not guarantees it will immedaitely be connected and assigned partitions.

As a basic example, see the code below that subscribes to a topic, and in the assigned callback, seeks to the desired position. Finally you'll notice that the poll loop correctly only sees records from the seek location and not from the previous committed or reset offset.

public static final Map<TopicPartition, Long> offsets = Map.of(new TopicPartition("testtopic", 0), 5L);

public static void main(String args[]) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {

        consumer.subscribe(Collections.singletonList("testtopic"), new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("Assigned " + partitions);
                for (TopicPartition tp : partitions) {
                    OffsetAndMetadata oam = consumer.committed(tp);
                    if (oam != null) {
                        System.out.println("Current offset is " + oam.offset());
                    } else {
                        System.out.println("No committed offsets");
                    }
                    Long offset = offsets.get(tp);
                    if (offset != null) {
                        System.out.println("Seeking to " + offset);
                        consumer.seek(tp, offset);
                    }
                }
            }
        });

        for (int i = 0; i < 10; i++) {
            System.out.println("Calling poll");
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
            for (ConsumerRecord<String, String> r : records) {
                System.out.println("record from " + r.topic() + "-" + r.partition() + " at offset " + r.offset());
            }
        }
    }
}
Ginseng answered 2/2, 2019 at 11:51 Comment(6)
Your answer seems correct, but with this approach I'm getting java.lang.IllegalStateException: org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@5b202a3a has been closed already exceptionPetard
This answer works in a vanilla Kafka app. Your error about Spring's ApplicationContext being already closed is an unrelated issue.Cosmonaut
There are a ton of examples on the Internet using poll to get assignments. Finally found this answer. Wish this answer would float to top of google result, probably save a lot of people a lot of time.Goodson
@Goodson Very well said... Almost all the docs and all references I found are complete rubbish. Wish I stumbled upon this earlier...Nonlegal
I think that In the provided example there might be couple of poll calls completed before the ConsumerRebalanceListener is executed and seek is performed. I guess some 'skip until seek completed' mechanism should be in place before consumption can be performed. Maybe with CompletableFuture or sth of this sort.Distant
Nice one ☝️ It worked for meSilk
M
5
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
// Get topic partitions
List<TopicPartition> partitions = consumer
                    .partitionsFor(topic)
                    .stream()
                    .map(partitionInfo ->
                            new TopicPartition(topic, partitionInfo.partition()))
                    .collect(Collectors.toList());
// Explicitly assign the partitions to our consumer
consumer.assign(partitions);
//seek, query offsets, or poll

Please note that this disables consumer group management and rebalancing operations. When possible use @Mickael Maison's approach.

Merridie answered 6/3, 2021 at 1:16 Comment(0)
M
-2

on a similar line with @Mickael. The Kafka consumer example from the Apache Kafka repo can be used.

public class Consumer extends Thread implements ConsumerRebalanceListener {
    private final String bootstrapServers;
    private final String topic;
    private final String groupId;
    private final Optional<String> instanceId;
    private final boolean readCommitted;
    private final int numRecords;
    private final CountDownLatch latch;
    private volatile boolean closed;
    private int remainingRecords;

    public Consumer(String threadName,
                    String bootstrapServers,
                    String topic,
                    String groupId,
                    Optional<String> instanceId,
                    boolean readCommitted,
                    int numRecords,
                    CountDownLatch latch) {
        super(threadName);
        this.bootstrapServers = bootstrapServers;
        this.topic = topic;
        this.groupId = groupId;
        this.instanceId = instanceId;
        this.readCommitted = readCommitted;
        this.numRecords = numRecords;
        this.remainingRecords = numRecords;
        this.latch = latch;
    }

    @Override
    public void run() {
        // the consumer instance is NOT thread safe
        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
            // subscribes to a list of topics to get dynamically assigned partitions
            // this class implements the rebalance listener that we pass here to be notified of such events
            consumer.subscribe(singleton(topic), this);
            Utils.printOut("Subscribed to %s", topic);
            while (!closed && remainingRecords > 0) {
                try {
                    // if required, poll updates partition assignment and invokes the configured rebalance listener
                    // then tries to fetch records sequentially using the last committed offset or auto.offset.reset policy
                    // returns immediately if there are records or times out returning an empty record set
                    // the next poll must be called within session.timeout.ms to avoid group rebalance
                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
                    for (ConsumerRecord<Integer, String> record : records) {
                        Utils.maybePrintRecord(numRecords, record);
                    }
                    remainingRecords -= records.count();
                } catch (AuthorizationException | UnsupportedVersionException
                         | RecordDeserializationException e) {
                    // we can't recover from these exceptions
                    Utils.printErr(e.getMessage());
                    shutdown();
                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
                    // invalid or no offset found without auto.reset.policy
                    Utils.printOut("Invalid or no offset found, using latest");
                    consumer.seekToEnd(e.partitions());
                    consumer.commitSync();
                } catch (KafkaException e) {
                    // log the exception and try to continue
                    Utils.printErr(e.getMessage());
                }
            }
        } catch (Throwable e) {
            Utils.printOut("Unhandled exception");
            e.printStackTrace();
        }
        Utils.printOut("Fetched %d records", numRecords - remainingRecords);
        shutdown();
    }

    public void shutdown() {
        if (!closed) {
            closed = true;
            latch.countDown();
        }
    }

    public KafkaConsumer<Integer, String> createKafkaConsumer() {
        Properties props = new Properties();
        // bootstrap server config is required for consumer to connect to brokers
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // client id is not required, but it's good to track the source of requests beyond just ip/port
        // by allowing a logical application name to be included in server-side request logging
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
        // consumer group id is required when we use subscribe(topics) for group management
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // sets static membership to improve availability (e.g. rolling restart)
        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
        // disables auto commit when EOS is enabled, because offsets are committed with the transaction
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");
        // key and value are just byte arrays, so we need to set appropriate deserializers
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        if (readCommitted) {
            // skips ongoing and aborted transactions
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        }
        // sets the reset offset policy in case of invalid or no offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new KafkaConsumer<>(props);
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        Utils.printOut("Revoked partitions: %s", partitions);
        // this can be used to commit pending offsets when using manual commit and EOS is disabled
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        Utils.printOut("Assigned partitions: %s", partitions);
        // this can be used to read the offsets from an external store or some other initialization
    }

    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        Utils.printOut("Lost partitions: %s", partitions);
        // this is called when partitions are reassigned before we had a chance to revoke them gracefully
        // we can't commit pending offsets because these partitions are probably owned by other consumers already
        // nevertheless, we may need to do some other cleanup
    }
}

source: KafkaConsumerExample (Link may change that's why pasted the content of the file).

Mckim answered 28/11, 2023 at 7:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.