BlockingCollection multiple consumer
Asked Answered
S

2

7

I have the following code with one producer thread and multiple consumer threads. Do you know if multiple consumers are thread safe. For example is there any chance that thread 1 is consuming and while do that thread 2 consume in parallel and change the value of the item that is used in thread 1?

namespace BlockingColl
{
public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();
    }

    private void button1_Click(object sender, EventArgs e)
    {
        try
        {
            for (int i = 0; i < 3; i++)
            {

                ThreadPool.QueueUserWorkItem((x) =>
                   {
                       foreach (var item in bc.GetConsumingEnumerable())
                       {
                           Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " - " + item + " - " + DateTime.Now.ToString("MM/dd/yyyy hh:mm:ss.fff tt"));
                       }
                   });
            }
        }
        catch (Exception)
        {

            throw;
        }
    }

    private void button2_Click(object sender, EventArgs e)
    {
        for (int i = 0; i < 3; i++)
        {

            ThreadPool.QueueUserWorkItem((x) =>
               {
                   Cache.Consume();
               });
        }


        for (int i = 0; i < 50000; i++)
        {
            Cache.bc.TryAdd(new Client() { ClientId = i, ClientName = "Name" + i });
        }
    }
}

static class Cache
{
    public static BlockingCollection<Client> bc = new BlockingCollection<Client>();


    public static void Consume()
    {
        foreach (var item in bc.GetConsumingEnumerable())
        {
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " - " + item + " - " + DateTime.Now.ToString("MM/dd/yyyy hh:mm:ss.fff tt"));
        }
    }
}

public class Client
{
    public int ClientId { get; set; }
    public string ClientName { get; set; }
}
}

Thanks in advance

Semifinal answered 2/11, 2012 at 12:42 Comment(0)
P
1

Once you've consumed an element it is removed from the collection, so no other thread will be able to access it (at least through the collection).

That Cache looks more like a buffer to me. What does it add on top of the blocking collection anyway? It's weird that the cache would be able to consume its own elements.

Parnas answered 2/11, 2012 at 14:11 Comment(6)
Thanks. You are right. It is a buffer. The names are irrelevant. The scenario I wanted to ask is if is there any chance that when thread 1 is consuming and while doing that thread 2 consume in parallel and change the value of the item that is used in thread 1 because as you can see I call Consumer three times in in Parallel (Button1_Click event)Semifinal
Then the answer is no, there is no chance, but because different threads cannot consume the same elements. Do you need the different threads to access all the elements in the collection? If that's the case you cannot use the GetConsumingEnumerable() method.Parnas
No. What I want is to have multiple consumers that will consume the Buffer asap. So any manipulation done in the foreach loop on Thread 1 on the 'item' object will not be modified by Thread 2? Because as you can see the Consume method is static.Semifinal
It won't be modified because the consuming enumerable will not return it for thread 1 if it already returned it for thread 2. Each thread will get some of the items, but there will not be an overlap.Parnas
It seems that you were right. Each thread is accessing its own 'instance' of itemSemifinal
Not quite. Each thread gets a different subset of the elements.Parnas
L
0

A BlockingCollection blocks only the collection itself. Not the objects in the list.

Ledeen answered 2/11, 2012 at 13:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.