How to work threading with ConcurrentQueue<T>
Asked Answered
C

3

25

I am trying to figure out what the best way of working with a queue will be. I have a process that returns a DataTable. Each DataTable, in turn, is merged with the previous DataTable. There is one problem, too many records to hold until the final BulkCopy (OutOfMemory).

So, I have determined that I should process each incoming DataTable immediately. Thinking about the ConcurrentQueue<T>...but I don't see how the WriteQueuedData() method would know to dequeue a table and write it to the database.

For instance:

public class TableTransporter
{
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();

    public TableTransporter()
    {
        tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
    }

    public void ExtractData()
    {
        DataTable table;

        // perform data extraction
        tableQueue.Enqueue(table);
    }

    private void WriteQueuedData(object sender, EventArgs e)
    {
        BulkCopy(e.Table);
    }
}

My first question is, aside from the fact that I don't actually have any events to subscribe to, if I call ExtractData() asynchronously will this be all that I need? Second, is there something I'm missing about the way ConcurrentQueue<T> functions and needing some form of trigger to work asynchronously with the queued objects?

Update I have just derived a class from ConcurrentQueue<T> that has an OnItemQueued event handler. Then:

new public void Enqueue (DataTable Table)
{
    base.Enqueue(Table);
    OnTableQueued(new TableQueuedEventArgs(Table));
}

public void OnTableQueued(TableQueuedEventArgs table)
{
    EventHandler<TableQueuedEventArgs> handler = TableQueued;

    if (handler != null)
    {
        handler(this, table);
    }
}

Any concerns about this implementation?

Capsulate answered 29/12, 2010 at 2:38 Comment(1)
FYI to anyone coming here now: Task Parallel Library is recommended for a wide range of multi-threaded use, if you have .NET 4.5+. For producer/consumer queues, consider Dataflow: How to: Implement a Producer-Consumer Dataflow Pattern as discussed in Mathew Watson's answer hereWester
P
29

From my understanding of the problem, you are missing a few things.

The concurrent queue is a data structure designed to accept multiple threads reading and writing to the queue without you needing to explicitly lock the data structure. (All that jazz is taken care of behind the scenes, or the collection is implemented in such a way that it doesn't need to take a lock.)

With that in mind, it looks like the pattern you are trying to use is the "Produce/Consumer". First, you have some tasks producing work (and adding items to the queue). And second you have a second task Consuming things from the queue (and dequeing items).

So really you want two threads: one adding items and a second removing items. Because you are using a concurrent collection, you can have multiple threads adding items and multiple threads removing items. But obviously the more contention you have on the concurrent queue the quicker that will become the bottleneck.

Pennon answered 29/12, 2010 at 2:49 Comment(4)
I thought I had 2 threads. The main thread would basically wait for the event to trigger. The second thread begins as an asynchronous call to ExtractData(). In the async callback I will simply continue the extraction process.Capsulate
Actually, I think I have it backwards; main thread should be en-queuing datatables; then begin asynchronous writing method via the enqueued item event trigger.Capsulate
@Chris Smith: I don't know how to message you. You have a malicious website on your profile. Please remove it.Goofball
@KamranBigdely thank you for pointing that out! Domain of my blog lapsed, and bad actor took it over. Fixed.Pennon
M
26

I think ConcurrentQueue is useful only in very few cases. Its main advantage is that it is lock free. However, usually the producer thread(s) have to inform the consumer thread(s) somehow that there is data available to process. This signalling between threads needs locks and negates the benefit of using ConcurrentQueue. The fastest way to synchronize threads is using Monitor.Pulse(), which works only within a lock. All other synchronization tools are even slower.

Of course, the consumer can just continuously check if there is something in the queue, which works without locks, but is a huge waste of processor resources. A little bit better is if the consumer waits between checking.

Raising a thread when writing to the queue is a very bad idea. Using ConcurrentQueue to save maybe 1 microsecond will be completely wasted by executing the eventhandler, which might take 1000 times longer.

Although I could imagine one use case for ConcurrentQueue: When the producers are faster than the consumer and the whole thing stops once the queue is empty. In this case the consumer can avoid an idle wait loop.

If all the processing is done in an event handler or an async call, the question is why still a queue is needed? Better pass the data directly to the handler and don't use a queue at all.

Please note that the implementation of ConcurrentQueue is rather complicated to allow concurrency. In most cases, better use a normal Queue<> and lock every access to the queue. Since the queue access needs only microseconds, it is extremely unlikely that 2 threads access the queue in the same microsecond and there will be hardly ever any delay because of locking. Using a normal Queue<> with locking will often result in faster code execution than ConcurrentQueue.

Morello answered 18/7, 2014 at 8:22 Comment(4)
Shame about receiving the down vote. I think it's a valid, pragmatic opinion.Futility
>producer thread(s) have to inform the consumer thread(s) somehow that there is data available to process How do you do this typically?Codd
For an overview of how threads can synchronize each other see: Microsoft, Overview of Synchronization Primitives. In those cases, ConcurrentQueue is not helpfull, because the Synchronization Primitives use locking anyway. ConcurrentQueue might be useful for massive parallel problems, when several threads produce something as fast as they can and another thread collects those results and processes them. Once the problem is solved, all threads are released and no waiting is involved.Morello
Fantastic answer, so the implementation is straight forward: stackoverflow.com/a/530228Microbalance
C
3

This is the complete solution for what I came up with:

public class TableTransporter
{
    private static int _indexer;

    private CustomQueue tableQueue = new CustomQueue();
    private Func<DataTable, String> RunPostProcess;
    private string filename;

    public TableTransporter()
    {
        RunPostProcess = new Func<DataTable, String>(SerializeTable);
        tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued);
    }

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e)
    {
        //  do something with table
        //  I can't figure out is how to pass custom object in 3rd parameter
        RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename);
    }

    public void ExtractData()
    {
        // perform data extraction
        tableQueue.Enqueue(MakeTable());
        Console.WriteLine("Table count [{0}]", tableQueue.Count);
    }

    private DataTable MakeTable()
    { return new DataTable(String.Format("Table{0}", _indexer++)); }

    private string SerializeTable(DataTable Table)
    {
        string file = Table.TableName + ".xml";

        DataSet dataSet = new DataSet(Table.TableName);

        dataSet.Tables.Add(Table);

        Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file);
        string xmlstream = String.Empty;

        using (MemoryStream memstream = new MemoryStream())
        {
            XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet));
            XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8);

            xmlSerializer.Serialize(xmlWriter, dataSet);
            xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());

            using (var fileStream = new FileStream(file, FileMode.Create))
                fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2);
        }
        filename = file;

        return file;
    }

    private void PostComplete(IAsyncResult iasResult)
    {
        string file = (string)iasResult.AsyncState;
        Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file);

        RunPostProcess.EndInvoke(iasResult);
    }

    public static String UTF8ByteArrayToString(Byte[] ArrBytes)
    { return new UTF8Encoding().GetString(ArrBytes); }

    public static Byte[] StringToUTF8ByteArray(String XmlString)
    { return new UTF8Encoding().GetBytes(XmlString); }
}

public sealed class CustomQueue : ConcurrentQueue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;

    public CustomQueue()
    { }
    public CustomQueue(IEnumerable<DataTable> TableCollection)
        : base(TableCollection)
    { }

    new public void Enqueue (DataTable Table)
    {
        base.Enqueue(Table);
        OnTableQueued(new TableQueuedEventArgs(Table));
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

public class TableQueuedEventArgs : EventArgs
{
    #region Fields
    #endregion

    #region Init
    public TableQueuedEventArgs(DataTable Table)
    {this.Table = Table;}
    #endregion

    #region Functions
    #endregion

    #region Properties
    public DataTable Table
    {get;set;}
    #endregion
}

As proof of concept, it seems to work pretty well. At most I saw 4 worker threads.

Capsulate answered 29/12, 2010 at 6:3 Comment(2)
Looking through this, it's a good implementation, however, having run a quick test, when does an item get dequeued?Alphonso
@RichardPriddy: since this was just over 5 years ago (and I have long since moved on to my 3rd company), I can only assume this was not a complete example. Note the proof of concept remark at the end. ;) That said, depending on requirements you could expose the enqueued event and let something else handle dequeueing. Otherwise, it might be logical to dequeue somewhere in the post process function's AsyncCallback. It would be really difficult to pinpoint anything more specific at this late date.Capsulate

© 2022 - 2024 — McMap. All rights reserved.