Kafka - Delayed Queue implementation using high level consumer
Asked Answered
W

5

30

Want to implement a delayed consumer using the high level consumer api

main idea:

  • produce messages by key (each msg contains creation timestamp) this makes sure that each partition has ordered messages by produced time.
  • auto.commit.enable=false (will explicitly commit after each message process)
  • consume a message
  • check message timestamp and check if enough time has passed
  • process message (this operation will never fail)
  • commit 1 offset

    while (it.hasNext()) {
      val msg = it.next().message()
      //checks timestamp in msg to see delay period exceeded
      while (!delayedPeriodPassed(msg)) { 
         waitSomeTime() //Thread.sleep or something....
      }
      //certain that the msg was delayed and can now be handled
      Try { process(msg) } //the msg process will never fail the consumer
      consumer.commitOffsets //commit each msg
    }
    

some concerns about this implementation:

  1. commit each offset might slow ZK down
  2. can consumer.commitOffsets throw an exception? if yes i will consume the same message twice (can solve with idempotent messages)
  3. problem waiting long time without committing the offset, for example delay period is 24 hours, will get next from iterator, sleep for 24 hours, process and commit (ZK session timeout ?)
  4. how can ZK session keep-alive without commit new offsets ? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognising it)
  5. any other problems im missing?

Thanks!

Wakeless answered 2/8, 2015 at 18:10 Comment(9)
1. starting from 0.8.2 you can commit offsets to kafka (zk is still used widely though) 2. yes, and it's fundamental problem (mind exactly once processing) 3. your zk session will expire (and if you have many consumers in group message may be rebalanced away from the original consumer). Frankly speaking kafka doesn't sound like a good fit if you have 1 message per dayMasonite
i have many messages (lets say ~10k rpm), but in some cases i want to delay the message consumption (for example to have a delayed retry mechanism after some message process fails). if a rebalance occurs this will still work, the new consumer will delay the messageWakeless
in this case you will end up with message being scheduled on many consumers: A consumes message 1, schedules it to run in 24h, do not commit offset and session is expired. B kicks in, consumes the very same message, schedules it to run in 24h, ... ultimately, this will spread like a virus. If you do commit message, it may be lost in case of faulty consumer, you can pick whichever is prefered for you (personally I would go for later one, it simplifies semantics). Is it an option to have kinda busy waiting?Masonite
i dont schedule to run in 24 hours. i check the time it was submitted (its part of the message) and check the current time and see if 24 hours have passed. this way it wont "spread" like a virus and will be consumed. how can i set the session not to expire ?Wakeless
there is zookeeper.session.timeout.ms parameter, which by default set to 6 seconds, but setting it to extreme value sounds like abuse of technology (zk would not be able to track which consumers are actualy died because of that).Masonite
en.wikipedia.org/wiki/Busy_waitingPalaeolithic
If the message shouldn't be processed in such a long time as 24 hours Why not consume the message immediately after sticking it in some persistent storage? Then have a background task that (an akka actor perhaps since you are using Scala) that checks if it should process any new messages based on the time passed.Marjie
@EmilH - in trying not to introduce more complexity into the solution, DB writs can fail after consume from kafka, akka(jvm) can die with messages in mailboxes. but its a valid solutionWakeless
Kafka is not a message queue and the requirement you have is handled by message queue solutions. Search for "delivery delay" - something present in JMS 2.0.Affaire
M
25

One way to go about this would be to use a different topic where you push all messages that are to be delayed. If all delayed messages should be processed after the same time delay this will be fairly straight forward:

while(it.hasNext()) {
    val message = it.next().message()
    
    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

All regular messages will now be processed as soon as possible while those that need a delay gets put on another topic.

The nice thing is that we know that the message at the head of the delayed topic is the one that should be processed first since its delayTo value will be the smallest. Therefore we can set up another consumer that reads the head message, checks if the timestamp is in the past and if so processes the message and commits the offset. If not it does not commit the offset and instead just sleeps until that time:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

In case there are different delay times you could partition the topic on the delay (e.g. 24 hours, 12 hours, 6 hours). If the delay time is more dynamic than that it becomes a bit more complex. You could solve it by introducing having two delay topics. Read all messages off delay topic A and process all the messages whose delayTo value are in the past. Among the others you just find the one with the closest delayTo and then put them on topic B. Sleep until the closest one should be processed and do it all in reverse, i.e. process messages from topic B and put the once that shouldn't yet be proccessed back on topic A.

To answer your specific questions (some have been addressed in the comments to your question)

  1. Commit each offset might slow ZK down

You could consider switching to storing the offset in Kafka (a feature available from 0.8.2, check out offsets.storage property in consumer config)

  1. Can consumer.commitOffsets throw an exception? if yes, I will consume the same message twice (can solve with idempotent messages)

I believe it can, if it is not able to communicate with the offset storage for instance. Using idempotent messages solves this problem though, as you say.

  1. Problem waiting long time without committing the offset, for example delay period is 24 hours, will get next from iterator, sleep for 24 hours, process and commit (ZK session timeout?)

This won't be a problem with the above outlined solution unless the processing of the message itself takes more than the session timeout.

  1. How can ZK session keep-alive without commit new offsets? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognizing it)

Again with the above you shouldn't need to set a long session timeout.

  1. Any other problems I'm missing?

There always are ;)

Marjie answered 20/8, 2015 at 9:23 Comment(5)
thanks for the detailed answer. why use it.peek().message() and not that it.next() ?Wakeless
ConsumerIterator.peek() inherited from IteratorTemplate doesn't change anything in the ConsumerIterator. It will consistently give you the same value until ConsumerIterator.next() method is called. Compare: github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/… with github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/… . In short it doesn't move the iterator forward.Marjie
I cannot find the ConsumerIterator class. Is it still present in latest releases of kafka?Subatomic
@Subatomic That specific class seem to have been removed. Latest version I could see it in was 0.7 and the release is up to 2.0 at this point with plenty of updates in between.Marjie
Putting the listener thread on sleep until the delay is elapsed is not a good idea. You can quickly exhaust all the listener threads.Abandon
P
6

Use Tibco EMS or other JMS Queue's. They have retry delay built in . Kafka may not be the right design choice for what you are doing

Pleader answered 4/4, 2017 at 14:34 Comment(1)
That's the correct answer. I'm surprised how many people believe Kafka is a general purpose message queue.Affaire
I
2

I would suggest another route in your cases.

It doesn't make sense to address the waiting time in the main thread of the consumer. This will be an anti-pattern in how the queues are used. Conceptually, you need to process the messages as fastest as possible and keep the queue at a low loading factor.

Instead, I would use a scheduler that will schedule jobs for each message you are need to delay. This way you can process the queue and create asynchronous jobs that will be triggered at predefined points in time.

The downfall of using this technique is that it is sensible to the status of the JVM that holds the scheduled jobs in memory. If that JVM fails, you loose the scheduled jobs and you don't know if the task was or was not executed.

There are scheduler implementations, though that can be configured to run in a cluster environment, thus keeping you safe from JVM crashes.

Take a look at this java scheduling framework: http://www.quartz-scheduler.org/

Irradiation answered 23/9, 2016 at 9:52 Comment(3)
" schedule jobs " is very hard to do ... this adds complexity but will work in the end. im looking for something simpleWakeless
Use Tibco EMS or other JMS Queue's. They have retry delay built in . Kafka may not be the right design choice for what you are doing .Pleader
@Nimrod007, I agree.Carver
M
2

We had the same issue during one of our tasks. Although, eventually, it was solved without using delayed queues, but when exploring the solution, the best approach we found was to use pause and resume functionality provided by the KafkaConsumer API. This approach and its motivation is perfectly described here: https://medium.com/naukri-engineering/retry-mechanism-and-delay-queues-in-apache-kafka-528a6524f722

Maxine answered 7/10, 2021 at 9:56 Comment(0)
C
0

Keyed-list on schedule or its redis alternative may be best approaches.

Carver answered 25/1, 2018 at 10:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.