Running Multiple threads in queue using BlockingCollections
Asked Answered
C

2

6

My program has 3 functions. Each function takes a list of Items and fill certain information. For example

class Item {
 String sku,upc,competitorName;
 double price;
}

function F1 takes a List and fills upc

function F2 takes List (output of F1) and fills price.

function F3 takes List (output of F2) and fills competitorName

F1 can process 5 items at a time, F2 can process 20 items at a time, F3 also 20.

Right now I am running F1 -> F2 -> F3 in serial because F2 needs info(UPC code) from F1. F3 needs price from F2.

I would like to make this process efficient by running F1 run continuously instead of waiting for F2 and F3 to be completed. F1 executes and output into queue then F2 takes 20 items at a time and process them. and then follows F3.

How can i achieve this by using BlockingCollection and Queue?

Chambers answered 8/9, 2016 at 16:26 Comment(0)
D
2

This is a typical use case of Apache Storm in case you've continuous items coming in to F1. You can implement this in Storm in matter of minutes and you'll have fast and perfectly parallel system in place. Your F1, F2 and F3 will become bolts and your Items producer will become spout.

Since you asked how to do it using BlockingCollections here is an implementation. You'll need 3 threads in total.

ItemsProducer: It is producing 5 items at a time and feeding it to F1.

F2ExecutorThread: It is consuming 20 items at a time and feeding it to F2.

F3ExecutorThread: It is consuming 20 items at a time and feeding it to F3.

You also have 2 blocking queues one is used to transfer data from F1->F2 and one from F2->F3. You can also have a queue to feed data to F1 in similar fashion if required. It depends upon how you are getting the items. I've used Thread.sleep to simulate the time required to execute the function.

Each function will keep looking for items in their assigned queue, irrespective of what other functions are doing and wait until the queue has items. Once they've processed the item they'll put it in another queue for another function. They'll wait until the other queue has space if it is full.

Since all your functions are running in different threads, F1 won't be waiting for F2 or F3 to finish. If your F2 and F3 are significantly faster then F1 you can assign more threads to F1 and keep pushing to same f2Queue.

public class App {

    final BlockingQueue<Item> f2Queue = new ArrayBlockingQueue<>(100);
    final BlockingQueue<Item> f3Queue = new ArrayBlockingQueue<>(100);

    public static void main(String[] args) throws InterruptedException {
        App app = new App();
        app.start();
    }

    public void start() throws InterruptedException {
        Thread t1 = new ItemsProducer(f2Queue);
        Thread t2 = new F2ExecutorThread(f2Queue, f3Queue);
        Thread t3 = new F3ExecutorThread(f3Queue);

        t1.start();
        t2.start();
        t3.start();

        t1.join();
        t2.join();
        t3.join();
    }
}

/**
 * Thread producing 5 items at a time and feeding it to f1()
 */
class ItemsProducer extends Thread {
    private BlockingQueue<Item> f2Queue;

    private static final int F1_BATCH_SIZE = 5;

    public ItemsProducer(BlockingQueue<Item> f2Queue) {
        this.f2Queue = f2Queue;
    }

    public void run() {
        Random random = new Random();
        while (true) {
            try {
                List<Item> items = new ArrayList<>();
                for (int i = 0; i < F1_BATCH_SIZE; i++) {
                    Item item = new Item(String.valueOf(random.nextInt(100)));
                    Thread.sleep(20);
                    items.add(item);
                    System.out.println("Item produced: " + item);
                }

                // Feed items to f1
                f1(items);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    void f1(List<Item> items) throws InterruptedException {
        Random random = new Random();
        for (Item item : items) {
            Thread.sleep(100);
            item.upc = String.valueOf(random.nextInt(100));
            f2Queue.put(item);
        }
    }
}

/**
 * Thread consuming items produced by f1(). It takes 20 items at a time, but if they are not
 * available it waits and starts processesing as soon as one gets available
 */
class F2ExecutorThread extends Thread {
    static final int F2_BATCH_SIZE = 20;
    private BlockingQueue<Item> f2Queue;
    private BlockingQueue<Item> f3Queue;

    public F2ExecutorThread(BlockingQueue<Item> f2Queue, BlockingQueue<Item> f3Queue) {
        this.f2Queue = f2Queue;
        this.f3Queue = f3Queue;
    }

    public void run() {
        try {
            List<Item> items = new ArrayList<>();
            while (true) {
                items.clear();
                if (f2Queue.drainTo(items, F2_BATCH_SIZE) == 0) {
                    items.add(f2Queue.take());
                }
                f2(items);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    void f2(List<Item> items) throws InterruptedException {
        Random random = new Random();
        for (Item item : items) {
            Thread.sleep(100);
            item.price = random.nextInt(100);
            f3Queue.put(item);
        }
    }
}

/**
 * Thread consuming items produced by f2(). It takes 20 items at a time, but if they are not
 * available it waits and starts processesing as soon as one gets available.
 */
class F3ExecutorThread extends Thread {
    static final int F3_BATCH_SIZE = 20;
    private BlockingQueue<Item> f3Queue;

    public F3ExecutorThread(BlockingQueue<Item> f3Queue) {
        this.f3Queue = f3Queue;
    }

    public void run() {
        try {
            List<Item> items = new ArrayList<>();
            while (true) {
                items.clear();
                if (f3Queue.drainTo(items, F3_BATCH_SIZE) == 0) {
                    items.add(f3Queue.take());
                }
                f3(items);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void f3(List<Item> items) throws InterruptedException {
        Random random = new Random();

        for (Item item : items) {
            Thread.sleep(100);
            item.competitorName = String.valueOf(random.nextInt(100));
            System.out.println("Item done: " + item);
        }
    }
}

class Item {
    String sku, upc, competitorName;
    double price;

    public Item(String sku) {
        this.sku = sku;
    }

    public String toString() {
        return "sku: " + sku + " upc: " + upc + " price: " + price + " compName: " + competitorName;
    }
}

I guess you can follow the exact same approach in .Net as well. For better understanding I suggest you to go through basic architecture of http://storm.apache.org/releases/current/Tutorial.html

Deutzia answered 20/9, 2016 at 14:55 Comment(2)
I tried to do same thing in .NET and i think it is working.Chambers
Great!! Happy to help.Deutzia
C
1

I tried to do same thing in .NET and i think it is working.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace BlockingCollectionExample
{
    class Program
    {
        static void Main(string[] args)
        {
            BlockingCollection<Listing> needUPCJobs = new BlockingCollection<Listing>();
            BlockingCollection<Listing> needPricingJobs = new BlockingCollection<Listing>();

            // This will have final output
            List<Listing> output = new List<Listing>();

            // start executor 1 which waits for data until available
            var executor1 = Task.Factory.StartNew(() =>
            {
                int maxSimutenousLimit = 5;
                int gg = 0;
                while (true)
                {
                    while (needUPCJobs.Count >= maxSimutenousLimit)
                    {
                        List<Listing> tempListings = new List<Listing>();
                        for (int i = 0; i < maxSimutenousLimit; i++)
                        {
                            Listing listing = new Listing();
                            if (needUPCJobs.TryTake(out listing))
                                tempListings.Add(listing);
                        }
                        // Simulating some delay for first executor
                        Thread.Sleep(1000);

                        foreach (var eachId in tempListings)
                        {
                            eachId.UPC = gg.ToString();
                            gg++;
                            needPricingJobs.Add(eachId);
                        }
                    }

                    if (needUPCJobs.IsAddingCompleted)
                    {
                        if (needUPCJobs.Count == 0)
                            break;
                        else
                            maxSimutenousLimit = needUPCJobs.Count;
                    }                    
                }
                needPricingJobs.CompleteAdding();
            });

            // start executor 2 which waits for data until available
            var executor2 = Task.Factory.StartNew(() =>
            {
                int maxSimutenousLimit = 10;
                int gg = 10;
                while (true)
                {
                    while (needPricingJobs.Count >= maxSimutenousLimit)
                    {
                        List<Listing> tempListings = new List<Listing>();
                        for (int i = 0; i < maxSimutenousLimit; i++)
                        {
                            Listing listing = new Listing();
                            if (needPricingJobs.TryTake(out listing))
                                tempListings.Add(listing);
                        }
                        // Simulating more delay for second executor
                        Thread.Sleep(10000);

                        foreach (var eachId in tempListings)
                        {
                            eachId.Price = gg;
                            gg++;
                            output.Add(eachId);
                        }
                    }
                    if (needPricingJobs.IsAddingCompleted)
                    {
                        if(needPricingJobs.Count==0)
                            break;
                        else
                            maxSimutenousLimit = needPricingJobs.Count;
                    }
                }

            });

            // producer thread
            var producer = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 100; i++)
                {
                    needUPCJobs.Add(new Listing() { ID = i });
                }
                needUPCJobs.CompleteAdding();
            });

            // wait for producer to finish producing
            producer.Wait();

            // wait for all executors to finish executing
            Task.WaitAll(executor1, executor2);

            Console.WriteLine();
            Console.WriteLine();
        }
    }

    public class Listing
    {
        public int ID;
        public string UPC;
        public double Price;
        public Listing() { }
    }
}
Chambers answered 22/9, 2016 at 22:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.