Message queue architecture when messages need to access shared data
Asked Answered
G

3

7

I have to build a motion detection service. The motion detection doesn't operate on videos, but instead on just still images.

This microservice will need to be able to receive images out of order (with a timestamp) and figure out if the image differs from an image taken before it (with a timestamp earlier than it). There will need to be multiple motion detection workers.

So, key requirements seem to be:

  1. A web service that takes in images out of order, is able to group them into previous and next pairs, and then compute whether an image has motion compared to its previous image.
  2. Many image producers - throughout seems to be, on average, around 100 images a second
  3. Many motion detection consumers
  4. Prioritise latency over throughput.
  5. Tasks that aren't easily independently consumable.

I was thinking of using a single message queue. The producers push image documents onto the queue. The motion detection workers then read from this queue, and add a 'diff_percentage' field to that document, and update that record in the database.

Given a task in the queue, a worker can operate on that task alone, by fetching the image before it directly from the database, and comparing it, then updating the record in the database. Unfortunately, whilst that would work well enough, it would be horribly slow. I think I need to reduce reads from the database. Ideally I'd like this "queue" to be able to buffer until it has the images needed by a given job. Something like... when a worker reads from the queue, check if the image it needs to compare against is in the queue, if not, go to the database.

Can anyone point me in the right direction? Perhaps a queue is not what I want? Perhaps a queue, and some sort of caching bucket?

Gunthar answered 24/7, 2014 at 22:5 Comment(4)
As stated this question seems a bit too broad for useful answers. Is this a web service or a local system service? How many is "many" (100, 100K, 100M)? Do you want to prioritize throughput or latency? Reduce compute cost or storage cost? These are all factors that may decide what is and isn't a viable approach to the problem.Forbade
Fair point MooseBoys. I'll try and add some more detailGunthar
It is not clear to me what the problem is. Is your current approach too slow?Tegular
In case you want to be reassured that my answer comes from a credible or official source: my master's thesis was about network communication, and I have a serious general interest in dataflow programming.Sidekick
S
4

An image queue with multiple producers and multiple consumers does seem the right approach here. For the remainder of this answer I'll abstract away from the specifics of this queue because those depend on where the producers and consumers are (physically on which machine).

Here's what to do at the consumer end:

Keep the images temporarily in memory in a hash table. The key is the timestamp and the value is a pointer to the image content (as well as any metadata that you may want to keep). An image can be removed from the hash table as soon as it has been compared with the image of the successive timestamp.

Your consumer machine will need have enough working memory for the images. If on average 100 images are received between a given timestamp and the timestamp either directly before or directly after it, and an image is 1MB in size on average, the images will together take 100 * 2 * 1MB = 200MB of memory.

Create a second, in-memory queue to keep track of images that can't be compared yet. Workers put image pointers with their timestamps onto that queue if the image with the previous timestamp is not available from the hash table at the time of receiving the image with the current timestamp. A second set of workers takes timestamps from this queue and tests whether the image of the previous timestamp has become available in the meanwhile. If so, it compares the images, otherwise it pushes the image and timestamp back onto the queue.

The relative size of the first and second set of workers should be proportional to the relative frequency of an image arriving before its direct successor. In other words, if 60% of the time an image enters the hash table before its direct successor (so 40% of the time an image arrives after its direct successor), 60% of the workers should be in the first set and 40% should be in the second set. Alternatively, you may assign workers to a set dynamically depending on demand; this may be appropriate if out-of-order behaviour tends to fluctuate a lot, for example depending on the time of the day.

A third queue with a single consumer is responsible for updating the database. This third queue may or may not be across a network, just like the first queue. After a worker from the previous two sets has compared two successive images, it pushes the result onto this third queue. The consumer of this queue takes the contents of the queue and synchronizes them to the database. It may do this with one transaction for every few (say 10) comparisons, to minimize latency, or pool everything in a single transaction per second, to maximize throughput. Do not create a transaction for each image comparison individually, that will likely be much slower than you want.

The image comparison workers all need to read and update the hash table, so you need a mechanism to prevent race conditions. A locking mechanism is inappropriate in this case because it will probably become the bottleneck of your application. Instead, dedicate a single worker to managing the hash table and have all comparison workers send requests to the hash table manager over a read/insert queue. Since the manager has a relatively light job (storing, retrieving and removing image pointers) it should be able to stay ahead of the read/insert queue most of the time.

When a worker does a read request, it will wait for the reply of the manager (not when it does an insert request). It might pass a callback and sleep, or enter a spinlock checking for the "reply ready" value of a shared variable (depending on your programming environment this may boil down to the same thing under the hood). Of course you would rather not have your workers wait at all, but most waits will be very brief and this approach will certainly be faster than a global locking approach.

After an image is first successfully retrieved from the hash table, the manager can remove the image from the table (because the image will only ever be requested for comparison with the successive image). The manager should remove the pointer from the hash table, not delete the image itself. You can use reference counting to determine when an image should be completely purged from memory. While the reference counting needs to be locked or atomic, this will not be a bottleneck since at most two workers will be accessing an image at any given time, mostly without directly affecting the reference count.

Notes

In the above design I have not discussed when images enter the permanent database. Most likely this should happen on the producer end, before images enter the first queue. Alternatively, the database synchronizing agent from the third queue that I discussed may do it. You don't want to burder your comparison workers or your hash table manager with this responsibility.

If you think my answer is promising, I'm willing to provide additional documentation, for example (minimalist) flow diagrams, pseudocode algorithms for the workers or crude dataflow traffic profiles.

Sidekick answered 27/7, 2014 at 10:32 Comment(5)
Thanks Julian. I'll have a closer read tomorrow. This seems like a pretty good solution.Gunthar
You're welcome. FYI, I have made a slight simplification to the paragraph on deleting image pointers from the hash table: images will only be requested for comparison with the successive image, so the manager does not need to count before deleting a pointer. Images that are still to be compared with their predecessor are retained on the secondary queue.Sidekick
@DominicBou-Samra: I'm starting to think I might have misinterpreted "out of order". Did you just mean "sequences from different producers are mixed" or also "new timestamps may appear before older timestamps"? In the former case, my answer could be simplified by leaving out the second set of comparison workers, although I would still recommend using an in-memory hash table with a manager.Sidekick
Yes, I mean snapshots can come through in any order. The timestamps will always be correct (i.e a snapshot with a timestamp field of 7:30, was taken before a snapshot with timestamp field of 7:31, even if they sent out by the producer out of order).Gunthar
In that case I stand by my current answer.Sidekick
R
2

I think your problem comes from trying to use a single queue for many image producers. This causes the images to become lumped together and then a need to untangle the image sequences.

My approach would be to stream the images into timestamped directories and files. Don't mix the image producers, keep them separate. Then it's very easy to scan through the files and mark them with a diff_percentage.

For example, Image Producer #1 stores files in directory /IP1/date/time/sequence, where date is like 20140727 (2014-07-27), time is like 1542 (3:42pm) and sequence is a counter from 1 to 6000 (up to 100 frames per second, 60 seconds per minute). Copy this structure for other Image Producers. This way, the images get stored independently of the workers and don't have a bottleneck with a queue or database.

Then, run as many consumers as you want in parallel that can wake up, process a chunk of images and go to sleep when they run out of files to process. They need to work exclusively from each other by having them work on separate directories. I would have them add files to the directories with a diff_percentage for each image, and also another file when a directory (time or date) is complete. This makes it easy for them to restart and catch up in case they stop unexpectedly.

This is the old "divide and conquer" approach. By keeping the image streams separate, it is easier to divide up the work between the consumer processes.

Rummel answered 27/7, 2014 at 5:38 Comment(7)
Using the filesystem is probably not going to be any faster than accessing the database.Sidekick
Thanks Brent. Will take a closer look tomorrow. Seems like a good solution too.Gunthar
@Julian, a database is a file system with a data management layer on top of it. Why do you think a simple file system is not any faster?Rummel
Well, why do you think it would be any faster? Both depend on the speed of the storage device and both have a management layer on top, but RDBMSs have the advantage that they are specifically optimized for situations where storage and retrieval have to be fast. Plain old filesystems on the other hand are generally only used when something has to be persistent, or when there is really no other solution.Sidekick
This discussion on database vs filesystem has already been beaten to death: dba.stackexchange.com/q/23124/24357 , programmers.stackexchange.com/q/150669/144312Rummel
I forgot to mention this before, but this answer does not address how to cope with the fact that the frames of a single producer arrive out-of-order.Sidekick
This is a passive approach to handling frames out-of-order. The producer is responsible for delivering the frames to the right directory with a timestamp for each one. They may arrive in any order.Rummel
B
1
  1. I would recommend you do not store the images directly in the database. Pushing all that binary information in and out of a database can easily cause a bottleneck. Store the images on a blob store (such as Amazon s3 or Google Cloud Storage), and just store references to them in the database.
  2. I would recommend taking a look at Apache Kafka as an option for your messaging queue. It supports many concurrent readers reading all messages from a single queue.
Boris answered 27/7, 2014 at 3:56 Comment(1)
Yep. They are in s3. I meant we will have to go to the dB to fetch metadataGunthar

© 2022 - 2024 — McMap. All rights reserved.