Kafka Streams does not increment offset by 1 when producing to topic
Asked Answered
D

2

7

I have implemented a simple Kafka Dead letter record processor.

It works perfectly when using records produced from the Console producer.

However I find that our Kafka Streams applications do not guarantee that producing records to the sink topics that the offsets will be incremented by 1 for each record produced.

Dead Letter Processor Background:

I have a scenario where records may be received before all data required to process it is published. When records are not matched for processing by the streams app they are move to a Dead letter topic instead of continue to flow down stream. When new data is published we dump the latest messages from the Dead letter topic back in to the stream application's source topic for reprocessing with the new data.

The Dead Letter processor:

  • At the start of the run application records the ending offsets of each partition
  • The ending offsets marks the point to stop processing records for a given Dead Letter topic to avoid infinite loop if reprocessed records return to Dead Letter topic.
  • Application resumes from the last Offsets produced by the previous run via consumer groups.
  • Application is using transactions and KafkaProducer#sendOffsetsToTransaction to commit the last produced offsets.

To track when all records in my range are processed for a topic's partition my service compares its last produced offset from the producer to the the consumers saved map of ending offsets. When we reach the ending offset the consumer pauses that partition via KafkaConsumer#pause and when all partitions are paused (meaning they reached the saved Ending offset)then calls it exits.

The Kafka Consumer API States:

Offsets and Consumer Position Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5.

The Kafka Producer API references the next offset is always +1 as well.

Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.

But you can clearly see in my debugger that the records consumed for a single partition are anything but incremented 1 at a time... enter image description here

I thought maybe this was a Kafka configuration issue such as max.message.bytes but none really made sense. Then I thought perhaps it is from joining but didn't see any way that would change the way the producer would function.

Not sure if it is relevant or not but all of our Kafka applications are using Avro and Schema Registry...

Should the offsets always increment by 1 regardless of method of producing or is it possible that using Kafka streams API does not offer the same guarantees as the normal Producer Consumer clients?

Is there just something entirely that I am missing?

Dioptase answered 11/2, 2019 at 18:3 Comment(0)
L
12

It is not an official API contract that message offsets are increased by one, even if the JavaDocs indicate this (it seems that the JavaDocs should be updated).

  • If you don't use transactions, you get either at-least-once semantics or no guarantees (some call this at-most-once semantics). For at-least-once, records might be written twice and thus, offsets for two consecutive messages are not really increased by one as the duplicate write "consumes" two offsets.

  • If you use transactions, each commit (or abort) of a transaction writes a commit (or abort) marker into the topic -- those transactional markers also "consume" one offset (this is what you observe).

Thus, in general you should not rely on consecutive offsets. The only guarantee you get is, that each offset is unique within a partition.

Levis answered 11/2, 2019 at 18:36 Comment(11)
In example the end offset was 71495 when initialized and last record in the topic had offset 41491. It seems like I should know if I've consumed every record within a range or not (last offset from consumer group through last record currently in Partition). But the ending offset is literally the next records offset and the current record when consumed has no idea what the next offset would be or how by how much the offset would be incremented. So only way to know if all records have been consumed is wait for the next record to be produced? Unless I can know I am and the end of a partition...Dioptase
#54544574 I noticed the same behavior. However, in my case I am using transactions with kafka to guarantee exactly-once-delivery. By any chance are you using transactions ?Borrell
@JR ibkr Yes I am also using transactions and commit consumer group offsets with KafkaProducer#sendOffsetsToTransaction call.Dioptase
To guarantee exactly-once delivery, producer may add an additional message. In some case, producer may try to resend some messages. Hence, offsets would always increment but a developer should not assume that they will necessarily increment by 1.Borrell
In any case; producer/consumer should not be concerned about offsets. Their job is to simply push/pull from the queue. If you want to add some metadata then use headers cwiki.apache.org/confluence/display/KAFKA/…Borrell
It's a known issue: issues.apache.org/jira/browse/KAFKA-6607 -- unfortunately, it's not easy to fix.Levis
@MatthiasJ.Sax Can you edit the line For at-least... "consumers" to "consumes". I have to read the line multiple lines to understand the statement.Burgomaster
@KamalChandraprakash Done. Thanks for pointing out. Btw: you can also edit (or suggest an edit to) an answer directly to fix typos.Levis
By the way, as far as I can tell "init_transaction" may increase offset by more than 1 (This is not the behaviour for "begin_transaction" and "commit_transaction" they increase offset by 1.) I wonder if this is the case?Topside
What you say sounds off -- neither initTransaction() not beginTransaction() should increase offset at all. For commitTransaction() however, a commit marker would be written occupying an offset. -- Not sure how your exactly monitor this, and how you get to your conclusion? Maybe you misinterpret what you observe. -- I could imagine that a consumer sit's at offset X and after you do some initTransaction() and some send(), it steps to X+2. However, this would just step over the commit marker of the previous transaction (and doing a beginTransaction() + send() should show the same).Levis
Most consumer examples propose to commit offset X as X = offsetOfLastConsumedMessage + 1 -- this is not incorrect, however, also not 100% ideal, because of commit markers... (cf issues.apache.org/jira/browse/KAFKA-6607). -- It's a little bit more difficult to commit offset "correctly" when the input topic is transactional, and thus could lead to wrong conclusions easily when observing the system from outside.Levis
B
0

I know that knowing offset of messages can be useful. However, Kafka will only guarantee that the offset of a message-X would be greater than the last message(X-1)'s offset. BTW an ideal solution should not be based on offset calculations.

Under the hood, kafka producer may try to resend messages. Also, if a broker goes down then re-balancing may occur. Exactly-once-semantics may append an additional message. Therefore, offset of your message may change if any of above events occur.

Kafka may add additional messages for internal purpose to the topic. But Kafka's consumer API might be discarding those internal messages. Therefore, you can only see your messages and your message's offsets might not necessarily increment by 1.

Borrell answered 11/2, 2019 at 19:59 Comment(3)
We've many use cases where we are concerned with getting records from a range of offsets. This example because the destination topic is upstream & likely will result in messages cycling through this process multiple times without it. Our other use cases like when messages can potentially be lost between disparate Kafka clusters when sent over a restful API & we want to 'replay' a range of records that were not received. The reason this case was different is the last offset is being inferred by the ending offset, which is actually the offset of the next message produced not the last in a rangeDioptase
Well the bottom line is that don't rely on offsets, it may not be what you expect. Use meta-data or keys to mark and use them for your advantage. If you are using Streams API then you can also use local store provided by kafka. kafka.apache.org/20/documentation/streams/developer-guide/…Borrell
You should use keys to identify each message that you want to send i.e. if you want to read all messages from 100th message. Read from 100th offset until message.key<100 discard.Borrell

© 2022 - 2024 — McMap. All rights reserved.