Multithreaded approach to process SQS item Queue
Asked Answered
S

4

4

In this scenerio, I have to Poll AWS SQS messages from a queue, each async request can fetch upto 10 sqs items/messages. Once I Poll the items, Then I have to process those items on a kubernetes pod. Item processing includes getting response from few API calls, it may take some time & then saving the item to DB & S3. I did some R&D & reach on following conclusion

  1. To use consumer producer model, 1 thread will poll items & another thread will process the item or to use multi-threading for item processing
  2. Maintain a data structure that will containes sqs polled items ready for processing, DS could be Blocking collection or Concurrent queue
  3. Using Task Parellel Library for threadpooling & in item processing.
  4. Channels can be used

My Queries

  1. What would be best approach to achieve best performance or increase TPS.
  2. Can/Should I use data flow TPL
  3. Multi threaded or single threaded with asyn tasks
Spitter answered 28/8, 2021 at 2:53 Comment(8)
How many messages per second are you anticipating? You could do all kind of magic tricks but if the rate is, say 1 msg per second, you can make other choices.Milkfish
We want to achieve 500 TPS, I mentioned it incorrectly that processing through API call takes seconds, but it takes millisecond get result for an API callSpitter
Do you need to process them in order?Milkfish
No that's not requiredSpitter
Hi Peter , any comments you want to add?Spitter
Could you please share with some code that shows how do you imagine a single threaded processing?Dipsomaniac
I don't have code yet but idea is to use tasks & async requestsSpitter
@pankysharma have you considered using autoscaling based on the SQS queue?Hone
U
2

This is very dependant on the specifics of your use-case and how much effort would you want to put in.

I will, however, explain the thought process I would use when making such a decision.

The naive solution to handle SQS messages would be to do it one at a time sequentially (i.e. without concurrency). It doesn't mean that you're limited to a single message at a time since you can add more pods to the cluster.

So even in that naive solution you have one concurrency point you can utilize but it has a lot of overhead. The way to reduce overhead is usually to utilize the same overhead but process more messages with it. That's why, for example, SQS allows you to get 1-10 messages in a single call and not just one. It spreads the call overhead over 10 messages. In the naive solution the overhead is the cost of starting a whole process. Using the process for more messages means concurrent processing.

I've found that for stable and flexible concurrency you want many points of concurrency, but have each of them capped at some configurable degree of parallelism (whether hardcoded or actual configuration). That way you can tweak each of them to achieve optimal output (increase when you have free CPU and memory and decrease otherwise).

So, where can the additional concurrency be introduced? This is a progression where each step utilizes resources better but requires more effort.

  • Fetch 10 messages instead of one for every SQS API call and process them concurrently. That way you have 2 points of concurrency you can control: Number of pods, number of messages (up to 10) concurrently.
  • Have a few tasks each fetching 1-10 tasks and processing them concurrently. That's 3 concurrency points: Pods, tasks and messages per task. Both these solutions suffer from messages with varying processing time, meaning that a single long running message will "hold up" all the other 1-9 "slots" of work effectively reducing the concurrency to lower than configured.
  • Set up a TPL Dataflow block to process the messages concurrently and a task (or few) continuously fetching messages and pumping into the block. Keep in mind that SQS messages need to be explicitly deleted so the block needs to receive the message handle too so the message can be deleted after processing.
  • TPL Dataflow "pipe" consisting of a few blocks where each has it's own concurrency degree. That's useful when you have different steps of processing of the message where each step has different limitations (e.g. different APIs with different throttling configurations).

I personally am very fond of, and comfortable with, the Dataflow library so I would go straight to it. But simpler solutions are also valid when performance is less of an issue.

Unwarranted answered 5/9, 2021 at 13:1 Comment(3)
Thanks, I achieved around 100 TPS with data flow having 50 threads (mix degree of parellelism) but when I add parellel for loop on my action block then I get more than 500 TPS, so is it ok to use parellel for inside data flowSpitter
It's ok conceptually.. but it depends on implementation. Parallel.For isn't fit for async usage, so as long as you're not doing that it's fine. It can be just another step in the progression with another point of concurrency.Unwarranted
Also, if the for loop is just a part of the processing the block is doing that makes sense. But if it's all the block is doing than you may just want to put the individual "iterations" in the block if that makes sense and increase its parallelism.Unwarranted
D
0

I'm not familiar with Kubernetes but there are many things to consider when maximising throughput.

All the things which you have mentioned is IO bound not CPU bound. So, using TPL is overcomplicating the design for marginal benefit. See: https://learn.microsoft.com/en-us/dotnet/csharp/async#recognize-cpu-bound-and-io-bound-work

Your Kubernetes pods are likely to have network limitations. For example, with Azure Function Apps on Consumption Plans is limited to 1,200 outbound connections. Other services will have some defined limits, too. https://learn.microsoft.com/en-us/azure/azure-functions/manage-connections?tabs=csharp#connection-limit. Due to the nature of your work, it is likely that you will reach these limits before you need to process IO work on multiple threads.

You may also need to consider limits of the services which you are dependent on and ensure they are able to handle the throughput.

You may want to consider using Semaphores to limit the number of active connections to satisfy both your infrastructure and external dependency limits https://learn.microsoft.com/en-us/dotnet/api/system.threading.semaphoreslim?view=net-5.0

That being said, 500 messages per second is a realistic amount. To improve it further, you can look at having multiple processes with independent resource limitations processing the queue.

Demetricedemetris answered 31/8, 2021 at 9:16 Comment(0)
L
0

Not familiar with your use case, or specifically with the tech you are using, but this sounds like a very common message handling scenario.

Few guidelines:

  • First, these are guidelines, your usecase might be very different then what the ones commenting here are used to.
  • Whenever you want to increase your throughput you need to identify your bottlenecks, and thrive towards CPU bottleneck, making sure you fully utilize it. CPU load is usually the most expensive, and generally makes for a more reliable metric for autoscaling. Obviously, depending on your remote api calls and your DB you might reach other bottlenecks - SQS queue size also makes for a good autoscaling metric, but keep in mind that autoscalling isn't guaranteed to increase you throughput if your bottleneck is DB or API related.
  • I would not go for a fancy solution with complex data structures, again, not familiar with your usecase, so I might be wrong - but keep it simple. There should be one thread that is responsible for polling the queue, and when it finds new messages it should create a Task that processes a batch. There should generally be one Task per processing batch - let the ThreadPool handle the number of threads.
  • Not familiar with .net SQS library. However, I am familiar with other libraries for very similar solutions. Most Libraries for queues out there already do it all for you, and you don't really have to worry about it. You should probably just have a callback function that is called when the highly optimized library already finds new messages. Those libraries probably already create a new task for each of those batches - you just need to register to their callback, and make sure you await any I/O bound code.

Edit: The solution I am proposing does have a limitation in that a single message can block an entire batch, this is not necessarily a bad thing - if your solution requires different processing for different messages, and you don't want to create this inner batch dependency, a TPL DataFlow could definitely be a good solution for your usecase.

Linked answered 5/9, 2021 at 13:20 Comment(0)
G
0

Yeah, this sounds very much like the task for TPL Dataflow, it is very versatile yet powerful instrument. Your first chain link would acquire messages from the queue (not neccessarily one-threaded-ly, you just pass some delegates in). You will also be in control of how many items are "queued" locally this way.

Then you "subscribe" your workers in any way you desire – you can even customize it so that "faulted" processings would be put back into your queue — and it woudn't even matter if your processing is IO bound or not. If it is — well, nice, TPL dataflow is asyncronous, if not — well, not a problem, TPL dataflow can also be syncronous. Or you can fire up some thread pool threads, no biggie.

Glick answered 5/9, 2021 at 14:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.