"Resequencing" messages after processing them out-of-order
Asked Answered
M

1

6

I'm working on what's basically a highly-available distributed message-passing system. The system receives messages from someplace over HTTP or TCP, perform various transformations on it, and then sends it to one or more destinations (also using TCP/HTTP).

The system has a requirement that all messages sent to a given destination are in-order, because some messages build on the content of previous ones. This limits us to processing the messages sequentially, which takes about 750ms per message. So if someone sends us, for example, one message every 250ms, we're forced to queue the messages behind each other. This eventually introduces intolerable delay in message processing under high load, as each message may have to wait for hundreds of other messages to be processed before it gets its turn.

In order to solve this problem, I want to be able to parallelize our message processing without breaking the requirement that we send them in-order.

We can easily scale our processing horizontally. The missing piece is a way to ensure that, even if messages are processed out-of-order, they are "resequenced" and sent to the destinations in the order in which they were received. I'm trying to find the best way to achieve that.

Apache Camel has a thing called a Resequencer that does this, and it includes a nice diagram (which I don't have enough rep to embed directly). This is exactly what I want: something that takes out-of-order messages and puts them in-order.

But, I don't want it to be written in Java, and I need the solution to be highly available (i.e. resistant to typical system failures like crashes or system restarts) which I don't think Apache Camel offers.

Our application is written in Node.js, with Redis and Postgresql for data persistence. We use the Kue library for our message queues. Although Kue offers priority queueing, the featureset is too limited for the use-case described above, so I think we need an alternative technology to work in tandem with Kue to resequence our messages.

I was trying to research this topic online, and I can't find as much information as I expected. It seems like the type of distributed architecture pattern that would have articles and implementations galore, but I don't see that many. Searching for things like "message resequencing", "out of order processing", "parallelizing message processing", etc. turn up solutions that mostly just relax the "in-order" requirements based on partitions or topics or whatnot. Alternatively, they talk about parallelization on a single machine. I need a solution that:

  • Can handle processing on multiple messages simultaneously in any order.
  • Will always send messages in the order in which they arrived in the system, no matter what order they were processed in.
  • Is usable from Node.js
  • Can operate in a HA environment (i.e. multiple instances of it running on the same message queue at once w/o inconsistencies.)

Our current plan, which makes sense to me but which I cannot find described anywhere online, is to use Redis to maintain sets of in-progress and ready-to-send messages, sorted by their arrival time. Roughly, it works like this:

  1. When a message is received, that message is put on the in-progress set.
  2. When message processing is finished, that message is put on the ready-to-send set.
  3. Whenever there's the same message at the front of both the in-progress and ready-to-send sets, that message can be sent and it will be in order.

I would write a small Node library that implements this behavior with a priority-queue-esque API using atomic Redis transactions. But this is just something I came up with myself, so I am wondering: Are there other technologies (ideally using the Node/Redis stack we're already on) that are out there for solving the problem of resequencing out-of-order messages? Or is there some other term for this problem that I can use as a keyword for research? Thanks for your help!

Mcginnis answered 29/8, 2016 at 18:10 Comment(2)
It's going to be challenging to reorder messages that have been arbitrarily distributed across a cluster. The problem with ordering messages in that manner is the asynchronous network model assumes unbounded message delays, so it can potentially require infinite resources to reorder a queue of messages. What you should look for is a way to partition the messages such that messages that must be ordered amongst one another go to a single machine.Allhallowtide
Thanks for the comment. We're already partitioning messages, but we're looking to improve our performance within single high-volume partitions where further partitioning is impossible, based on our contracts. Although it may be impractical to handle arbitrarily large in-progress queues, in practice this should be limited by the size of our clusters, which will not be over 10-20 machines (so far). We have a separate queueing system for storing messages that are waiting to be picked up by a processor.Mcginnis
B
3

This is a common problem, so there are surely many solutions available. This is also quite a simple problem, and a good learning opportunity in the field of distributed systems. I would suggest writing your own.

You're going to have a few problems building this, namely

2: Exactly-once delivery
1: Guaranteed order of messages
2: Exactly-once delivery

You've found number 1, and you're solving this by resequencing them in redis, which is an ok solution. The other one, however, is not solved.

It looks like your architecture is not geared towards fault tolerance, so currently, if a server craches, you restart it and continue with your life. This works fine when processing all requests sequentially, because then you know exactly when you crashed, based on what the last successfully completed request was.

What you need is either a strategy for finding out what requests you actually completed, and which ones failed, or a well-written apology letter to send to your customers when something crashes.

If Redis is not sharded, it is strongly consistent. It will fail and possibly lose all data if that single node crashes, but you will not have any problems with out-of-order data, or data popping in and out of existance. A single Redis node can thus hold the guarantee that if a message is inserted into the to-process-set, and then into the done-set, no node will see the message in the done-set without it also being in the to-process-set.

How I would do it

Using redis seems like too much fuzz, assuming that the messages are not huge, and that losing them is ok if a process crashes, and that running them more than once, or even multiple copies of a single request at the same time is not a problem.

I would recommend setting up a supervisor server that takes incoming requests, dispatches each to a randomly chosen slave, stores the responses and puts them back in order again before sending them on. You said you expected the processing to take 750ms. If a slave hasn't responded within say 2 seconds, dispatch it again to another node randomly within 0-1 seconds. The first one responding is the one we're going to use. Beware of duplicate responses.

If the retry request also fails, double the maximum wait time. After 5 failures or so, each waiting up to twice (or any multiple greater than one) as long as the previous one, we probably have a permanent error, so we should probably ask for human intervention. This algorithm is called exponential backoff, and prevents a sudden spike in requests from taking down the entire cluster. Not using a random interval, and retrying after n seconds would probably cause a DOS-attack every n seconds until the cluster dies, if it ever gets a big enough load spike.

There are many ways this could fail, so make sure this system is not the only place data is stored. However, this will probably work 99+% of the time, it's probably at least as good as your current system, and you can implement it in a few hundred lines of code. Just make sure your supervisor is using asynchronous requests so that you can handle retries and timeouts. Javascript is by nature single-threaded, so this is slightly trickier than normal, but I'm confident you can do it.

Brazzaville answered 31/8, 2016 at 9:8 Comment(2)
Thank you for the well-thought-out answer. I like the idea of a supervisor server, but I'm concerned that it would be a scaling bottleneck. I will try to investigate this option more, based on your advice. If the "supervisor" can be HA or have automatic failover, that might be the best option. Also, I forgot to mention this in the question, but the messages are persisted to Postgresql as well for long-term storage.Mcginnis
You could use raft or a similar implementation to get some fault tolerance for your supervisor later on, but that's much harder to get right.Brazzaville

© 2022 - 2024 — McMap. All rights reserved.