How to avoid concurrency issues when scaling writes horizontally?
Asked Answered
U

8

12

Assume there is a worker service that receives messages from a queue, reads the product with the specified Id from a document database, applies some manipulation logic based on the message, and finally writes the updated product back to the database (a).

horizontally scaling writes

This work can be safely done in parallel when dealing with different products, so we can scale horizontally (b). However, if more than one service instance works on the same product, we might end up with concurrency issues, or concurrency exceptions from the database, in which case we should apply some retry logic (and still the retry might fail again and so on).

Question: How do we avoid this? Is there a way I can ensure two instances are not working on the same product?

Example/Use case: An online store has a great sale on productA, productB and productC that ends in an hour and hundreds of customers are buying. For each purchase, a message is enqueued (productId, numberOfItems, price). Goal: How can we run three instances of our worker service and make sure that all messages for productA will end up in instanceA, productB to instanceB and productC to instanceC (resulting in no concurrency issues)?

Notes: My service is written in C#, hosted on Azure as a Worker Role, I use Azure Queues for messaging, and I'm thinking to use Mongo for storage. Also, the Entity IDs are GUID.

It's more about the technique/design, so if you use different tools to solve the problem I'm still interested.

Unaccountedfor answered 8/3, 2015 at 19:10 Comment(2)
+1 for @GregD's answer "Understand your data model and usage patterns", though I'd go a step further. To remove concurrency, you need to redesign both your data model and business logic so that you don't actually update the data, but rather append to it only. Append-only data models are concurrency friendly - meaning they don't block, but you'd likely need to redesign the logic that reads from the datamodel.Recreate
@UdiDahan Append-only model indeed sounds appropriate for the problem here, so an answer with few details would be highly appreciatedUnaccountedfor
E
2

Any solution attempting to divide the load upon different items in the same collection (like orders) are doomed to fail. The reason is that if you got a high rate of transactions flowing you'll have to start doing one of the following things:

  1. let nodes to talk each other (hey guys, are anyone working with this?)
  2. Divide the ID generation into segments (node a creates ID 1-1000, node B 1001-1999) etc and then just let them deal with their own segment
  3. dynamically divide a collection into segments (and let each node handle a segment.

so what's wrong with those approaches?

The first approach is simply replicating transactions in a database. Unless you can spend a large amount of time optimizing the strategy it's better to rely on transactions.

The second two options will decrease performance as you have to dynamically route messages upon ids and also change the strategy at run-time to also include newly inserted messages. It will fail eventually.

Solutions

Here are two solutions that you can also combine.

Retry automatically

Instead you have an entry point somewhere that reads from the message queue.

In it you have something like this:

while (true)
{
    var message = queue.Read();
    Process(message);
}

What you could do instead to get very simple fault tolerance is to retry upon failure:

while (true)
{
    for (i = 0; i < 3; i++)
    {
       try
       {
            var message = queue.Read();
            Process(message);
            break; //exit for loop
       }
       catch (Exception ex)
       {
           //log
           //no throw = for loop runs the next attempt
       }
    }
}

You could of course just catch db exceptions (or rather transaction failures) to just replay those messages.

Micro services

I know, Micro service is a buzz word. But in this case it's a great solution. Instead of having a monolithic core which processes all messages, divide the application in smaller parts. Or in your case just deactivate the processing of certain types of messages.

If you have five nodes running your application you can make sure that Node A receives messages related to orders, node B receives messages related to shipping etc.

By doing so you can still horizontally scale your application, you get no conflicts and it requires little effort (a few more message queues and reconfigure each node).

Enharmonic answered 9/3, 2015 at 7:16 Comment(3)
About retry: Surely, it's necessary to have some retry policy in case of db transaction/concurrency failures, because no matter how you design your app, such exceptions might eventually appear. I just wonder how to minimize the chance of these failures before letting them happen. About microservices: This is already a microservice, it only accepts productSold messages from a dedicated queue and updates products. However scaling horizontally a microservice is still a concern, thus the question.Unaccountedfor
It's really hard to give a more elaborate advice based on the given information. For instance, how many messages to you process per second (all product messages)? Why did you create a service for just one message type (productSold)? How have you scaled the DB? How many transactions per second do the DB handle for products? To me it's a bit strange that not a single service processes all product messages (in one message queue) as the DB ought to be the bottleneck and not the message processing. I'm just interested in your motivation.Enharmonic
It's a microservice dedicated only to update product sales info. The queue is extremely fast and highly available, the db can easily scale (using both sharding and replication), but the service cannot scale if we can't solve the problem we discuss here. So in periods of peak demand, the bottleneck will be the service.Unaccountedfor
B
1

For this kind of a thing I use blob leases. Basically, I create a blob with the ID of an entity in some known storage account. When worker 1 picks up the entity, it tries to acquire a lease on the blob (and create the blob itself, if it doesn't exist). If it is successful in doing both, then I allow the processing of the message to occur. Always release the lease afterwards. If I am not successfull, I dump the message back onto the queue

I follow the apporach originally described by Steve Marx here http://blog.smarx.com/posts/managing-concurrency-in-windows-azure-with-leases although tweaked to use new Storage Libraries

Edit after comments: If you have a potentially high rate of messages all talking to the same entity (as your commend implies), I would redesign your approach somewhere.. either entity structure, or messaging structure.

For example: consider CQRS design pattern and store changes from processing of every message independently. Whereby, product entity is now an aggregate of all changes done to the entity by various workers, sequentially re-applied and rehydrated into a single object

Breeding answered 8/3, 2015 at 21:55 Comment(7)
I see, like a distributed lock. +1 because indeed it solves the problem but I was wondering if we can do better. Eg. if queue has 10 subsequent items, all for the same product, and we have 10 instances, 1 will do the actual work and the other 9 will fail to acquire lock and re-enqueue items, in the 2nd iteration 1 will work and 8 will fail, in 3rd iteration 1 will work 7 will fail, and we end up to have 45 failures - wasted resources and time.Unaccountedfor
I think you mean something more like event-sourcing, not cqrs (cqrs is already what we have here, this is the command part)Unaccountedfor
I see this technique used a lot but I see two problems. One it messes the fifo character of the queue (usually no a problem) and two a lot of overhead if such cases are too often. I believe message queues (ie servicebus on azure) would be more appropriate than simple queues for such casesPlan
@mxa055 how would service bus fix the problem?Breeding
@Unaccountedfor correct, sorry bout that. BTW, what tool did you use for the arch diagram mockup?Breeding
@Breeding I'll disappoint you, I just used shapes and text boxes on keynote and took a snapshotUnaccountedfor
@Breeding a simplistic approach would be to partition your data into topics and then have worker role instances subscribe to specific topics exclusively thus avoiding concurrent work between instances. It certainly is much more complex to implement but also more efficient.Plan
P
1

If you want to always have the database up to date and always consistent with the already processed units then you have several updates on the same mutable entity.

In order to comply with this you need to serialize the updates for the same entity. Either you do this by partitioning your data at producers, either you accumulate the events for the entity on the same queue, either you lock the entity in the worker using an distributed lock or a lock at the database level.

You could use an actor model (in java/scala world using akka) that is creating a message queue for each entity or group of entities that process them serially.

UPDATED You can try an akka port to .net and here. Here you can find a nice tutorial with samples about using akka in scala. But for general principles you should search more about [actor model]. It has drawbacks nevertheless.

In the end pertains to partition your data and ability to create a unique specialized worker(that could be reused and/or restarted in case of failure) for a specific entity.

Prune answered 9/3, 2015 at 7:58 Comment(1)
actor model that is creating a message queue for each entity, this sounds interesting, could you please provide some references so I can look it up?Unaccountedfor
E
0

I assume you have a means to safely access the product queue across all worker services. Given that, one simple way to avoid conflict could be using global queues per product next to the main queue

// Queue[X] is the queue for product X
// QueueMain is the main queue 
DoWork(ProductType X)
{
  if (Queue[X].empty())
  {
    product = QueueMain().pop()
    if (product.type != X)
    {
      Queue[product.type].push(product) 
      return;
    }
  }else
  {
     product = Queue[X].pop()
  }

  //process product...
}

The access to queues need to be atomic

Episodic answered 4/4, 2015 at 11:39 Comment(0)
E
0

You should use session enabled service bus queue for ordering and concurrency.

Epoxy answered 8/12, 2022 at 2:38 Comment(0)
B
0

The problem here is that two process trying to access a common resource to perform update simultaneously. If we analyze the current situation, let's say two worker W1 and W2 trying to work on the same product, say product A.

  1. Worker 1 -> GET (T11), COMPUTE (T12), UPDATE (T13)
  2. Worker 2 -> GET (T21), COMPUTE (T22), UPDATE (T23)

If we allow both worker 1 and worker 2 run in parallel, it can lead to either of two cases (considering race conditions only)

  1. It can lead to inconsistent data. This can happen when T11 == T21, T13 < T23 or T23 < T13
  2. It can lead to failure to update the data in database if T13 == T23.

So, now let's think about multiple solutions around it

  1. Distribute task such that specific worker node processes specific product oriented message. It could be achieved simply by having different Azure Queues for different products and having different workers processing messaged from different queue. The downside with this approach is that it doesn't scale well and can lead to hotspot/overhead if a specific product is bought continuously.

  2. Utilizing distributed Locking to take lock on a product before performing any action. The worker process taking the lock, should ensure to unlock it, otherwise it can lead to deadlock situation. It should also consider situation if the system breaks down or gets crashed before unlocking. If a process doesn't get a lock it should wait until a timeout to get the lock to complete the processing. Now, busy wait, i.e., burning CPU cycle during wait is not a good idea.

Billowy answered 17/3, 2024 at 16:51 Comment(0)
Y
0

You can make use of Kafka here, with productId being the partitioning key. That way the product with same productId will end up in the same partition, and hence will be processed by the same consumer application in the order they were sent. Kafka partition gaurantees that same keys always end up in the same partition.

You can create 3 partitions, given your use-case, and make sure that 3 different consumers are attached to each partition. This way there wont be any synchronization issues.

Yser answered 21/7, 2024 at 16:13 Comment(0)
S
-1

1) Every high scale data solution that I can think of has something built in to handle precisely this sort of conflict. The details will depend on your final choice for data storage. In the case of a traditional relational database, this comes baked in without any add'l work on your part. Refer to your chosen technology's documentation for appropriate detail.

2) Understand your data model and usage patterns. Design your datastore appropriately. Don't design for scale that you won't have. Optimize for your most common usage patterns.

3) Challenge your assumptions. Do you actually have to mutate the same entity very frequently from multiple roles? Sometimes the answer is yes, but often you can simply create a new entity that's similar to reflect the update. IE, take a journaling/logging approach instead of a single-entity approach. Ultimately high volumes of updates on a single entity will never scale.

Sweetsop answered 8/3, 2015 at 20:13 Comment(2)
The conflict is indeed handled by the database, e.g. by throwing concurrency errors. The question is how can I avoid having fail-and-retry cases when I can.Unaccountedfor
The underlying technology provides the necessary protection for data consistency. I believe an application level solution to minimize or eliminate concurrency issues in complex scenarios (lots of clients accessing same data) is of the essence for efficiency and performance.Plan

© 2022 - 2025 — McMap. All rights reserved.