Kafka get to know when related messages are consumed
Asked Answered
M

2

7

is there any way, in Kafka, to produce a message once several related messages have been consumed ? (without having to manually control it at the application code...)

The use case would be to pick a huge file, split it into several chunks, publish a message for each of these chunks in a topic, and once all these messages are consumed produce another message notifying the result on another topic.

We can do it with a database, or REDIS, to control the state but I wonder if there's any higher level approach leveraging only Kafka ecosystem.

Mishap answered 11/9, 2020 at 17:52 Comment(2)
How does your consumer in "and once all these messages are consumed" look like? Is it also a Kafka Streams application or something else?Straggle
Originally it would be a Spring-Boot Kotlin application consumer, but we would be open to options...Mishap
N
4

You can use ConsumerGroupCommand to check if certain consumer group has finished processing all messages in a particular topic:

  1. $ kafka-consumer-groups --bootstrap-server broker_host:port --describe --group chunk_consumer

OR

  1. $ kafka-run-class kafka.admin.ConsumerGroupCommand ...

Zero lag for every partition will indicate that the messages have been consumed successfully, and offsets committed by the consumer.

Alternatively, you can choose to subscribe to the __consumer_offsets topic and process messages from it yourself, but using ConsumerGroupCommand seems like a more straightforward solution.

Negrete answered 11/9, 2020 at 19:17 Comment(2)
As far as I understaand consumergroups would be tied to a specific application and not rather created dinamically for each of the files. I suspect the other answer, using kafka streams make more sense for the particular use case.Mishap
Not sure I follow - a commit of an offset by consumer IS a confirmation that message was consumed successfully. So if, from producer side, you monitor offsets and ensure that all offsets are committed, you know that all your "chunks" are consumed. Once that happens, you can then publish a confirmation or do whatever else you need to do.Negrete
P
3

Approach can be as follow:

  1. After consuming each chunk application should produce message with status (Consumed, and chunk number)
  2. Second application (Kafka Streams once) should aggregate result and, when process messages with all chunks produce final message, that file is processed.
Paleozoology answered 14/9, 2020 at 9:9 Comment(2)
it does make sense to me and sounds promising, but how would we, on the kafka streams once, know that all chunks were processed (excuse my ignorance, never really used streams). Do you have any documentation or snippet pointing to that ?Mishap
for instance: message with chunk status can be as follow:` (key: fileUniqueName, value: chunkNumber, numberOfChunks)`. At Kafka streams application you can use ProcessorApi (kafka.apache.org/10/documentation/streams/developer-guide/…) and aggregate it in custom way - using state store you can keep status regarding number of processed chunksWorldling

© 2022 - 2024 — McMap. All rights reserved.