Both sequential and parallel processing
Asked Answered
L

10

17

I have one producer and many consumers.

  • the producer is fast and generating a lot of results
  • tokens with the same value need to be processed sequentially
  • tokens with different values must be processed in parallel
  • creating new Runnables would be very expensive and also the production code could work with 100k of Tokens(in order to create a Runnable I have to pass to the constructor some complex to build objects)

Can I achieve the same results with a simpler algorithm? Nesting a syncronization block with a reentrant lock seems a bit unnatural. Are there any race conditions you might notice?

Update: a second solution I found was working with 3 collections. One to cache the producer results, second a blocking queue and 3rd using a list to track in the tasks in progress. Again a bit to complicated.

My version of code

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

public class Main1 {
    static class Token {
        private int order;
        private String value;
        Token() {

        }
        Token(int o, String v) {
            order = o;
            value = v;
        }

        int getOrder() {
            return order;
        }

        String getValue() {
            return value;
        }
    }

    private final static BlockingQueue<Token> queue = new ArrayBlockingQueue<Token>(10);
    private final static ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
    private final static ReentrantLock reentrantLock = new ReentrantLock();
    private final static Token STOP_TOKEN = new Token();
    private final static List<String> lockList = Collections.synchronizedList(new ArrayList<String>());

    public static void main(String[] args) {
        ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
        producerExecutor.submit(new Runnable() {
            public void run() {
                Random random = new Random();
                    try {
                        for (int i = 1; i <= 100; i++) {
                            Token token = new Token(i, String.valueOf(random.nextInt(1)));

                            queue.put(token);
                        }

                        queue.put(STOP_TOKEN);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
        });

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
        for(int i=1; i<=10;i++) {

            // creating to many runnable would be inefficient because of this complex not thread safe object
            final Object dependecy = new Object(); //new ComplexDependecy()
            consumerExecutor.submit(new Runnable() {
                public void run() {
                    while(true) {
                        try {
                            //not in order


                            Token token = queue.take();
                            if (token == STOP_TOKEN) {
                                queue.add(STOP_TOKEN);
                                return;
                            }


                            System.out.println("Task start" + Thread.currentThread().getId() + " order "  + token.getOrder());

                            Random random = new Random();
                            Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy)
                            lockList.remove(token.getValue());

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            }});

    }
}}
Lownecked answered 8/1, 2016 at 19:31 Comment(1)
...must be processed in parallel... It is difficult to force any two or more things to happen at the same time. Different threads are allowed to do things in parallel, but nothing in Java guarantees that things will happen in parallel.Discommend
O
6

You can pre-create set of Runnables which will pick incoming tasks (tokens) and place them in queues according to their order value.

As pointed out in comments, it's not guaranteed that tokens with different values will always execute in parallel (all in all, you are bounded, at least, by nr of physical cores in your box). However, it is guaranteed that tokens with same order will be executed in the order of arrival.

Sample code:

/**
 * Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}).
 */
public class TasksOrderingExecutor {

    public interface Task extends Runnable {
        /**
         * @return ordering value which will be used to sequence tasks with the same value.<br>
         * Tasks with different ordering values <i>may</i> be executed in parallel, but not guaranteed to.
         */
        String getOrder();
    }

    private static class Worker implements Runnable {

        private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Task task) {
            tasks.add(task);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Task task = tasks.take();
                    task.run();
                } catch (InterruptedException ie) {
                    // perhaps, handle somehow
                }
            }
        }
    }

    private final Worker[] workers;
    private final ExecutorService executorService;

    /**
     * @param queuesNr nr of concurrent task queues
     */
    public TasksOrderingExecutor(int queuesNr) {
        Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
        executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
        workers = new Worker[queuesNr];
        for (int i = 0; i < queuesNr; i++) {
            Worker worker = new Worker();
            executorService.submit(worker);
            workers[i] = worker;
        }
    }

    public void submit(Task task) {
        Worker worker = getWorker(task);
        worker.schedule(task);
    }

    public void stop() {
        for (Worker w : workers) w.stop();
        executorService.shutdown();
    }

    private Worker getWorker(Task task) {
        return workers[task.getOrder().hashCode() % workers.length];
    }
}
Oracular answered 10/1, 2016 at 21:4 Comment(3)
The producer is faster than the consumers, you will have a lot of Tasks in memory. Also the load might not spread evenly between the workers.Lownecked
@danip Regarding sustained difference in speed -- then you'll need to switch to bounded blocking queue and accept some delays due to blocking. To improve throughput, you'll need to split load between several boxes.Oracular
I believe that using several boxes would over complicate the problem. I can use the concept you present to distribute Tasks by order, but I have to make it uniform.Lownecked
G
6

By the nature of your code, the only way to guarantee that the tokens with the same value are processed in serial manner is to wait for STOP_TOKEN to arrive.

You'll need single producer-single consumer setup, with consumer collecting and sorting the tokens by their value (into the Multimap, let say).

Only then you know which tokens can be process serially and which may be processed in parallel.

Anyway, I advise you to look at LMAX Disruptor, which offers very effective way for sharing data between threads.

It doesn't suffer from synchronization overhead as Executors as it is lock free (which may give you nice performance benefits, depending on the way how you process the data).

The solution using two Disruptors

// single thread for processing as there will be only on consumer
Disruptor<InEvent> inboundDisruptor = new Disruptor<>(InEvent::new, 32, Executors.newSingleThreadExecutor());

// outbound disruptor that uses 3 threads for event processing
Disruptor<OutEvent> outboundDisruptor = new Disruptor<>(OutEvent::new, 32, Executors.newFixedThreadPool(3));

inboundDisruptor.handleEventsWith(new InEventHandler(outboundDisruptor));

// setup 3 event handlers, doing round robin consuming, effectively processing OutEvents in 3 threads
outboundDisruptor.handleEventsWith(new OutEventHandler(0, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(1, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(2, 3, new Object()));

inboundDisruptor.start();
outboundDisruptor.start();

// publisher code
for (int i = 0; i < 10; i++) {
    inboundDisruptor.publishEvent(InEventTranslator.INSTANCE, new Token());
}

The event handler on the inbound disruptor just collects incoming tokens. When STOP token is received, it publishes the series of tokens to outbound disruptor for further processing:

public class InEventHandler implements EventHandler<InEvent> {

    private ListMultimap<String, Token> tokensByValue = ArrayListMultimap.create();
    private Disruptor<OutEvent> outboundDisruptor;

    public InEventHandler(Disruptor<OutEvent> outboundDisruptor) {
        this.outboundDisruptor = outboundDisruptor;
    }

    @Override
    public void onEvent(InEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (event.token == STOP_TOKEN) {
            // publish indexed tokens to outbound disruptor for parallel processing
            tokensByValue.asMap().entrySet().stream().forEach(entry -> outboundDisruptor.publishEvent(OutEventTranslator.INSTANCE, entry.getValue()));
        } else {
            tokensByValue.put(event.token.value, event.token);
        }
    }
}

Outbound event handler processes tokens of the same value sequentially:

public class OutEventHandler implements EventHandler<OutEvent> {

    private final long order;
    private final long allHandlersCount;
    private Object yourComplexDependency;

    public OutEventHandler(long order, long allHandlersCount, Object yourComplexDependency) {
        this.order = order;
        this.allHandlersCount = allHandlersCount;
        this.yourComplexDependency = yourComplexDependency;
    }

    @Override
    public void onEvent(OutEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (sequence % allHandlersCount != order ) {
            // round robin, do not consume every event to allow parallel processing
            return;
        }

        for (Token token : event.tokensToProcessSerially) {
            // do procesing of the token using your complex class
        }

    }
}

The rest of the required infrastructure (purpose described in the Disruptor docs):

public class InEventTranslator implements EventTranslatorOneArg<InEvent, Token> {

    public static final InEventTranslator INSTANCE = new InEventTranslator();

    @Override
    public void translateTo(InEvent event, long sequence, Token arg0) {
        event.token = arg0;
    }

}

public class OutEventTranslator implements EventTranslatorOneArg<OutEvent, Collection<Token>> {

    public static final OutEventTranslator INSTANCE = new OutEventTranslator();

    @Override
    public void translateTo(OutEvent event, long sequence, Collection<Token> tokens) {
        event.tokensToProcessSerially = tokens;
    }
}


public class InEvent {

    // Note that no synchronization is used here,
    // even though the field is used among multiple threads.
    // Memory barrier used by Disruptor guarantee changes are visible.
    public Token token;
}

public class OutEvent {
    // ... again, no locks.
    public Collection<Token> tokensToProcessSerially;

}

public class Token {
    String value;

}
Gracia answered 12/1, 2016 at 21:27 Comment(1)
I would prefer not to use an external library.Lownecked
I
5

If you have lots of different tokens, then the simplest solution is to create some number of single-thread executors (about 2x your number of cores), and then distribute each task to an executor determined by the hash of its token.

That way all tasks with the same token will go to the same executor and execute sequentially, because each executor only has one thread.

If you have some unstated requirements about scheduling fairness, then it is easy enough to avoid any significant imbalances by having the producer thread queue up its requests (or block) before distributing them, until there are, say, less than 10 requests per executor outstanding.

Incubate answered 13/1, 2016 at 2:14 Comment(5)
This is a solution I will consider. The producer distribution system might make the solution a bit more complicated then my example but maybe I can ignore that.Lownecked
I think you're response is closest to the point. The algorithm would put the tokens in different Consumers and keep track of them. For each new token the next consumer by rotation will be used. If a token was already processed the same consumer will have to process it.Lownecked
In most cases like this, you can just use (token.hashCode()&0x7FFFFFFF)%number_of_consumers to decide where to put each one, and don't need to keep track of anythingIncubate
isn't there a possibility that some consumers will not get anything?Lownecked
That is extremely unlikely if you have many different tokens. It also doesn't really matter, since you have more consumers than processor cores, so all your CPU will still be busy.Incubate
C
4

The following solution will only use a single Map that is used by the producer and consumers to process orders in sequential order for each order number while processing different order numbers in parallel. Here is the code:

public class Main {

    private static final int NUMBER_OF_CONSUMER_THREADS = 10;
    private static volatile int sync = 0;

    public static void main(String[] args) {
        final ConcurrentHashMap<String,Controller> queues = new ConcurrentHashMap<String, Controller>();
        final CountDownLatch latch = new CountDownLatch(NUMBER_OF_CONSUMER_THREADS);
        final AtomicBoolean done = new AtomicBoolean(false);

        // Create a Producer
        new Thread() {
            {
                this.setDaemon(true);
                this.setName("Producer");
                this.start();
            }

            public void run() {
                Random rand = new Random();

                for(int i =0 ; i < 1000 ; i++) {
                    int order = rand.nextInt(20);
                    String key = String.valueOf(order);
                    String value = String.valueOf(rand.nextInt());
                    Controller controller = queues.get(key);
                    if (controller == null) {
                        controller = new Controller();
                        queues.put(key, controller);
                    }
                    controller.add(new Token(order, value));
                    Main.sync++;
                }

                done.set(true);
            }
        };

        while (queues.size() < 10) {
            try {
                // Allow the producer to generate several entries that need to
                // be processed.
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }

        // System.out.println(queues);

        // Create the Consumers
        ExecutorService consumers = Executors.newFixedThreadPool(NUMBER_OF_CONSUMER_THREADS);
        for(int i = 0 ; i < NUMBER_OF_CONSUMER_THREADS ; i++) {
            consumers.submit(new Runnable() {
                private Random rand = new Random();

                public void run() {
                    String name = Thread.currentThread().getName();
                    try {
                        boolean one_last_time = false;
                        while (true) {
                            for (Map.Entry<String, Controller> entry : queues.entrySet()) {
                                Controller controller = entry.getValue();
                                if (controller.lock(this)) {
                                    ConcurrentLinkedQueue<Token> list = controller.getList();
                                    Token token;
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(rand.nextInt(200));
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    int last = Main.sync;
                                    queues.remove(entry.getKey());
                                    while(done.get() == false && last == Main.sync) {
                                        // yield until the producer has added at least another entry
                                        Thread.yield();
                                    }
                                    // Purge any new entries added
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(200);
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    controller.unlock(this);
                                }
                            }
                            if (one_last_time) {
                                return;
                            }
                            if (done.get()) {
                                one_last_time = true;
                            }
                        }
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumers.shutdown();
        System.out.println("Exiting.. remaining number of entries: " + queues.size());
    }

}

Note that the Main class contains a queues instance that is a Map. The map key is the order id that you want to process sequentially by the consumers. The value is a Controller class that will contain all of the orders associated with that order id.

The producer will generate the orders and add the order, (Token), to its associated Controller. The consumers will iterator over the queues map values and call the Controller lock method to determine if it can process orders for that particular order id. If the lock returns false it will check the next Controller instance. If the lock returns true, it will process all orders and then check the next Controller.

updated Added the sync integer that is used to guarantee that when an instance of the Controller is removed from the queues map. All of its entries will be consumed. There was an logic error in the consumer code where the unlock method was called to soon.

The Token class is similar to the one that you've posted here.

class Token {
    private int order;
    private String value;

    Token(int order, String value) {
        this.order = order;
        this.value = value;
    }

    int getOrder() {
        return order;
    }

    String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Token [order=" + order + ", value=" + value + "]\n";
    }
}

The Controller class that follows is used to insure that only a single thread within the thread pool will be processing the orders. The lock/unlock methods are used to determine which of the threads will be allowed to process the orders.

class Controller {

    private ConcurrentLinkedQueue<Token> tokens = new ConcurrentLinkedQueue<Token>();
    private ReentrantLock lock = new ReentrantLock();
    private Runnable current = null;

    void add(Token token) {
        tokens.add(token);
    }

    public ConcurrentLinkedQueue<Token> getList() {
        return tokens;
    }

    public void unlock(Runnable runnable) {
        lock.lock();
        try {
            if (current == runnable) {
                current = null;
            }
        } finally {
            lock.unlock();
        }
    }

    public boolean lock(Runnable runnable) {
        lock.lock();
        try {
            if (current == null) {
                current = runnable;
            }
        } finally {
            lock.unlock();
        }
        return current == runnable;
    }

    @Override
    public String toString() {
        return "Controller [tokens=" + tokens + "]";
    }

}

Additional information about the implementation. It uses a CountDownLatch to insure that all produced orders will be processed prior to the process exiting. The done variable is just like your STOP_TOKEN variable.

The implementation does contain an issue that you would need to resolve. There is the issue that it does not purge the controller for an order id when all of the orders have been processed. This will cause instances where a thread in the thread pool gets assigned to a controller that contains no orders. Which will waste cpu cycles that could be used to perform other tasks.

Contort answered 13/1, 2016 at 5:14 Comment(3)
You are keeping in memory all the produced results. This is something I need to avoid.Lownecked
The poll method will remove and not only get the entry from the linked queue. The results are not being kept in memory.Contort
The producer is much faster then all the consumers so the results will increase fast in the queue.Lownecked
A
4

Is all you need is to ensure that tokens with the same value are not being processed concurrently? Your code is too messy to understand what you mean (it does not compile, and has lots of unused variables, locks and maps, that are created but never used). It looks like you are greatly overthinking this. All you need is one queue, and one map. Something like this I imagine:

   class Consumer implements Runnable {
     ConcurrentHashMap<String, Token> inProcess;
     BlockingQueue<Token> queue;

     public void run() {
        Token token = null;
        while ((token = queue.take()) != null) {
           if(inProcess.putIfAbsent(token.getValue(), token) != null) {
              queue.put(token);
              continue;
           }
           processToken(token);
           inProcess.remove(token.getValue());
        }
     }
   }
And answered 16/1, 2016 at 20:48 Comment(6)
The code compiles fine with Java 8. I'm not sure what compilation problem you have. The unused variables are just to explain the complexity of the problem.Lownecked
"tokens with the same value need to be processed sequentially" so you can't take a token and put it at the end of the queue because the order will not be preserved. If you run my example you will see the correct output.Lownecked
@danip surely, the more unused variables you create, the more complex the problem looks. You should not create variables "to demonstrate complexity", each variable should have its own clear and definite purpose. Try that, and you'll see that the problem is not nearly as complex as you imagine it to be.And
@danip I am not sure what problem you see with posting back on the queue, but if by " sequentially" you mean "in order", then your whole design needs to be rewapmed, this is not the right approach: make each of the consumers look at its own queue, and have the producer distribute tokens to the queues by the hash code of the value or some similar function. For the future, remember: "sequentially" means "not in parallel", not necessarily in order.And
sequential often follows a numerical or alphabetical order :)Lownecked
No, "sequential" means "in sequence", or "one-by-one". Anyway, you don't have to convince me, that was just an advice for you to speak more precisely when describing technical matters, so that people understand what you are describing. If you leave something for them to "guess" in your description, you can be sure that some guesses will be different from what you expect. If you want something done in order just say "in order", it is shorter than "sequentially", and better describes the requirement.And
D
3

tokens with the same value need to be processed sequentially

The way to insure that any two things happen in sequence is to do them in the same thread.

I'd have a collection of however many worker threads, and I'd have a Map. Any time I get a token that I've not seen before, I'll pick a thread at random, and enter the token and the thread into the map. From then on, I'll use that same thread to execute tasks associated with that token.

creating new Runnables would be very expensive

Runnable is an interface. Creating new objects that implement Runnable is not going to be significantly more expensive than creating any other kind of object.

Discommend answered 8/1, 2016 at 20:1 Comment(3)
the code is an abstract example. In order to create my particular objects that implement Runnable I need to pass to the constructor some complex objects (maybe you are familiar with the Marshaller from JAXB). What strategy should I apply in that case?Lownecked
And secondly "I'll pick a thread at random", is there any way I can use executors for this or I have to manage Threads directly?Lownecked
@danip I don't know about your first question, but as for the executors, yes. Everywhere I said "Thread" in my answer, you could change that to "ExecutorService", and then you would want to create each one by calling Executors.newSingleThreadExecutor() .Discommend
H
3

Maybe I'm misunderstanding something. But it seems that it would be easier to filter the Tokens with same value from the ones with different values into two different queues initially.

And then use Stream with either map or foreach for the sequential. And simply use the parallel stream version for the rest.

If your Tokens in production environment are lazily generated and you only get one at a time you simply make some sort of filter which distributes them to the two different queues.

If you can implement it with Streams I suqqest doing that as they are simple, easy to use and FAST!

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

I made a brief example of what I mean. In this case the numbers Tokens are sort of artificially constructed but thats beside the point. Also the streams are both initiated on the main thread which would probably also not be ideal.

public static void main(String args[]) {
    ArrayList<Token> sameValues = new ArrayList<Token>();
    ArrayList<Token> distinctValues = new ArrayList<Token>();
    Random random = new Random();
    for (int i = 0; i < 100; i++) {
        int next = random.nextInt(100);
        Token n = new Token(i, String.valueOf(next));
        if (next == i) {
            sameValues.add(n);
        } else {
            distinctValues.add(n);
        }
    }       
    distinctValues.stream().parallel().forEach(token -> System.out.println("Distinct: " + token.value));
    sameValues.stream().forEach(token -> System.out.println("Same: " + token.value));       
}
Hengelo answered 12/1, 2016 at 23:30 Comment(0)
A
3

I am not entirely sure I have understood the question but I'll take a stab at an algorithm.

The actors are:

  • A queue of tasks
  • A pool of free executors
  • A set of in-process tokens currently being processed
  • A controller

Then,

  • Initially all executors are available and the set is empty

  • controller picks an available executor and goes through the queue looking for a task with a token that is not in the in-process set and when it finds it

    • adds the token to the in-process set
    • assigns the executor to process the task and
    • goes back to the beginning of the queue
  • the executor removes the token from the set when it is done processing and adds itself back to the pool

Arawak answered 13/1, 2016 at 0:50 Comment(2)
The only problem with this is that I need a blocking queue because the producer is very fast. The only implementation I could find is ArrayBlockingQueue but it's FIFO so the "goes through the queue" part is not easy to implement.Lownecked
Well it is not exactly a queue because you need to remove out-of-order, while adding at tail. A linked list might be a more appropriate data structure. As for blocking, for a project of this complexity, it should be well in scope to wrap one of the existing collection classes.Arawak
C
3

One way of doing this is having one executor for sequence processing and one for parallel processing. We also need a single threaded manager service that will decide to which service token needs to be submitted for processing. // Queue to be shared by both the threads. Contains the tokens produced by producer.
BlockingQueue tokenList = new ArrayBlockingQueue(10);

    private void startProcess() {
    ExecutorService producer = Executors.newSingleThreadExecutor();
    final ExecutorService consumerForSequence = Executors
            .newSingleThreadExecutor();
    final ExecutorService consumerForParallel = Executors.newFixedThreadPool(10);
    ExecutorService manager = Executors.newSingleThreadExecutor();

    producer.submit(new Producer(tokenList));

    manager.submit(new Runnable() {

        public void run() {
            try {
                while (true) {
                    Token t = tokenList.take();
                    System.out.println("consumed- " + t.orderid
                            + " element");

                    if (t.orderid % 7 == 0) { // any condition to check for sequence processing

                        consumerForSequence.submit(new ConsumerForSequenceProcess(t));

                    } else {

                        ConsumerForParallel.submit(new ConsumerForParallelProcess(t));

                    }
                }
            }

            catch (InterruptedException e) { // TODO Auto-generated catch
                // block
                e.printStackTrace();
            }

        }
    });
}
Centistere answered 16/1, 2016 at 10:48 Comment(2)
Your code limits the sequence processing to a single consumer. Multiple consumers should be able to process tokens in sequence.Lownecked
Yes, but that could be handled. We need to use a blocking Queue for holding up the tokens need to be processed in sequence and then we can employ a set of consumers instead of one on that queue to pick up the tokens for processing. Correct me for any suggestions.Centistere
B
2

I think there is a more fundamental design issue hidden behind this task, but ok. I cant figure out from you problem description if you want in-order execution or if you just want operations on tasks described by single tokens to be atomic/transactional. What i propose below feels more like a "quick fix" to this issue than a real solution.

For the real "ordered execution" case I propose a solution which is based on queue proxies which order the output:

  1. Define a implementation of Queue which provides a factory method generating proxy queues which are represented to the producer side by a this single queue object; the factory method should also register these proxy queue objects. adding an element to the input queue should add it directly to one of the output queues if it matches one of the elements in one of the output queues. Otherwise add it to any (the shortest) output queue. (implement the check for this efficiently). Alternatively (slightly better): don't do this when the element is added, but when any of the output queues runs empty.

  2. Give each of your runnable consumers an field storing an individual Queue interface (instead of accessing a single object). Initialize this field by a the factory method defined above.

For the transaction case i think it's easier to span more threads than you have cores (use statistics to calculate this), and implement the blocking mechanism on an lower (object) level.

Bick answered 17/1, 2016 at 16:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.