Under what circumstances is endOffset > lastMsg.offset + 1?
Asked Answered
R

3

6

Kafka returns endOffset 15 for a partition, but the last message that can be consumed from has the offset 13, rather than 14, which I would expect. I wonder why.

The Kafka docs read

In the default read_uncommitted isolation level, the end offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For read_committed consumers, the end offset is the last stable offset (LSO), which is the minimum of the high watermark and the smallest offset of any open transaction.

Here's kafkacat's output. I'm using kafkacat, because it can print the message offsets:

$ kafkacat -Ce -p0 -tTK -f'offset: %o key: %k\n'
offset: 0 key: 0108
offset: 1 key: 0253
offset: 4 key: 0278
offset: 5 key: 0198
offset: 8 key: 0278
offset: 9 key: 0210
offset: 10 key: 0253
offset: 11 key: 1058
offset: 12 key: 0141
offset: 13 key: 1141
% Reached end of topic TK [0] at offset 15: exiting

What's also baffling - and it may very well be related - is that the offsets are not consecutive, although I have not set up compaction etc.

Some more details:

$ kafka-topics.sh --bootstrap-server localhost:9092 --topic TK --describe
Topic: TK       PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: TK       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: TK       Partition: 1    Leader: 0       Replicas: 0     Isr: 0

Printing the keys via kafka-console-consumer.sh:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TK \
  --offset earliest --partition 0 --timeout-ms 5000 \
  --property print.key=true --property print.value=false
0108
0253
0278
0198
0278
0210
0253
1058
0141
1141
[2021-09-15 10:54:06,556] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 10 messages

N.B. This topic has been produced to without involvement of transactions, and *) consumption is being done in read_uncommitted mode.

*) Actually, processing.guarantee is set to exactly_once_beta, so that would amount to using transactions.


More info It turns out I can reliably reproduce this case with my Streams app (1. wipe kafka/zookeeper data, 2. recreate topics, 3. run app), whose output is the topic that shows this problem. I've meanwhile trimmed down the Streams app to this no-op topology and can still reproduce it:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [TK1])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: TK)
      <-- KSTREAM-SOURCE-0000000000

News Meanwhile I have replaced the locally running Kafka broker (2.5.0) with one running in a Docker container (wurstmeister/kafka:2.13-2.6.0). The problem persists.

The app is using kafka libraries versioned 6.0.1-ccs, corresponding to 2.6.0.

Ruthy answered 15/9, 2021 at 2:8 Comment(0)
I
2

You should avoid doing calculations on offsets, Kafka ensures any new offset will merely be greater than the last one. You may wish to use keys and track whether or not you have received the proper amount of messages by verifying the proper number of keys have been received.

Kafka has many things to juggle such as Exactly-Once Semantics, re-sending messages, and other internal tasks related to the topic. Those messages will be discard (not share with you). You will only see your messages, and those message offsets will only go up.

These transaction markers are not exposed to applications, but are used by consumers in read_committed mode to filter out messages from aborted transactions and to not return messages which are part of open transactions

Identification answered 16/12, 2022 at 2:17 Comment(0)
R
1

When I remove the setting processing.guarantee: exactly_once_beta the problem goes away. In terms of this problem, it doesn't matter whether I use exactly_once_beta or exactly_once.

I still wonder why that happens with exactly_once(_beta) - after all, in my tests there is smooth sailing and no transaction rollbacks etc.

In my latest tests this rule seems to apply to all partitions with at least one item in them:

endOffset == lastMsg.offset + 3

Which is 2 more than expected.

The Kafka docs mentioned in the question say that

For read_committed consumers, the end offset is the last stable offset (LSO), which is the minimum of the high watermark and the smallest offset of any open transaction.

So is Kafka perhaps pre-allocating offsets for 2 (???) transactions per partition?

Ruthy answered 15/9, 2021 at 6:41 Comment(0)
T
1

This seems to be the answer: https://mcmap.net/q/1445037/-kafka-streams-does-not-increment-offset-by-1-when-producing-to-topic

each commit (or abort) of a transaction writes a commit (or abort) marker into the topic -- those transactional markers also "consume" one offset

Therein answered 15/12, 2022 at 15:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.