This is a typical use case of Apache Storm in case you've continuous items coming in to F1. You can implement this in Storm in matter of minutes and you'll have fast and perfectly parallel system in place. Your F1, F2 and F3 will become bolts and your Items producer will become spout.
Since you asked how to do it using BlockingCollections here is an implementation. You'll need 3 threads in total.
ItemsProducer: It is producing 5 items at a time and feeding it to F1.
F2ExecutorThread: It is consuming 20 items at a time and feeding it to F2.
F3ExecutorThread: It is consuming 20 items at a time and feeding it to F3.
You also have 2 blocking queues one is used to transfer data from F1->F2 and one from F2->F3. You can also have a queue to feed data to F1 in similar fashion if required. It depends upon how you are getting the items. I've used Thread.sleep to simulate the time required to execute the function.
Each function will keep looking for items in their assigned queue, irrespective of what other functions are doing and wait until the queue has items. Once they've processed the item they'll put it in another queue for another function. They'll wait until the other queue has space if it is full.
Since all your functions are running in different threads, F1 won't be waiting for F2 or F3 to finish. If your F2 and F3 are significantly faster then F1 you can assign more threads to F1 and keep pushing to same f2Queue.
public class App {
final BlockingQueue<Item> f2Queue = new ArrayBlockingQueue<>(100);
final BlockingQueue<Item> f3Queue = new ArrayBlockingQueue<>(100);
public static void main(String[] args) throws InterruptedException {
App app = new App();
app.start();
}
public void start() throws InterruptedException {
Thread t1 = new ItemsProducer(f2Queue);
Thread t2 = new F2ExecutorThread(f2Queue, f3Queue);
Thread t3 = new F3ExecutorThread(f3Queue);
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
}
}
/**
* Thread producing 5 items at a time and feeding it to f1()
*/
class ItemsProducer extends Thread {
private BlockingQueue<Item> f2Queue;
private static final int F1_BATCH_SIZE = 5;
public ItemsProducer(BlockingQueue<Item> f2Queue) {
this.f2Queue = f2Queue;
}
public void run() {
Random random = new Random();
while (true) {
try {
List<Item> items = new ArrayList<>();
for (int i = 0; i < F1_BATCH_SIZE; i++) {
Item item = new Item(String.valueOf(random.nextInt(100)));
Thread.sleep(20);
items.add(item);
System.out.println("Item produced: " + item);
}
// Feed items to f1
f1(items);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void f1(List<Item> items) throws InterruptedException {
Random random = new Random();
for (Item item : items) {
Thread.sleep(100);
item.upc = String.valueOf(random.nextInt(100));
f2Queue.put(item);
}
}
}
/**
* Thread consuming items produced by f1(). It takes 20 items at a time, but if they are not
* available it waits and starts processesing as soon as one gets available
*/
class F2ExecutorThread extends Thread {
static final int F2_BATCH_SIZE = 20;
private BlockingQueue<Item> f2Queue;
private BlockingQueue<Item> f3Queue;
public F2ExecutorThread(BlockingQueue<Item> f2Queue, BlockingQueue<Item> f3Queue) {
this.f2Queue = f2Queue;
this.f3Queue = f3Queue;
}
public void run() {
try {
List<Item> items = new ArrayList<>();
while (true) {
items.clear();
if (f2Queue.drainTo(items, F2_BATCH_SIZE) == 0) {
items.add(f2Queue.take());
}
f2(items);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
void f2(List<Item> items) throws InterruptedException {
Random random = new Random();
for (Item item : items) {
Thread.sleep(100);
item.price = random.nextInt(100);
f3Queue.put(item);
}
}
}
/**
* Thread consuming items produced by f2(). It takes 20 items at a time, but if they are not
* available it waits and starts processesing as soon as one gets available.
*/
class F3ExecutorThread extends Thread {
static final int F3_BATCH_SIZE = 20;
private BlockingQueue<Item> f3Queue;
public F3ExecutorThread(BlockingQueue<Item> f3Queue) {
this.f3Queue = f3Queue;
}
public void run() {
try {
List<Item> items = new ArrayList<>();
while (true) {
items.clear();
if (f3Queue.drainTo(items, F3_BATCH_SIZE) == 0) {
items.add(f3Queue.take());
}
f3(items);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void f3(List<Item> items) throws InterruptedException {
Random random = new Random();
for (Item item : items) {
Thread.sleep(100);
item.competitorName = String.valueOf(random.nextInt(100));
System.out.println("Item done: " + item);
}
}
}
class Item {
String sku, upc, competitorName;
double price;
public Item(String sku) {
this.sku = sku;
}
public String toString() {
return "sku: " + sku + " upc: " + upc + " price: " + price + " compName: " + competitorName;
}
}
I guess you can follow the exact same approach in .Net as well. For better understanding I suggest you to go through basic architecture of http://storm.apache.org/releases/current/Tutorial.html