Enabling Queue<T> with concurrency
Asked Answered
D

5

10

I have a previous question which I have provided my solution; however, I don't have access to ConcurrentQueue<T> since I am on .Net 3.5. I need Queue<T> to allow concurrency. I read this question and seems to present a problem if an item is not in the queue and the threaded method tries to dequeue an item.

My task now is to determine whether I can derive my own concurrent Queue class. This is what I came up with:

public sealed class ConcurrentQueue : Queue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;
    private ICollection que;

    new public void Enqueue(DataTable Table)
    {
        lock (que.SyncRoot)
        {
            base.Enqueue(Table);
        }

        OnTableQueued(new TableQueuedEventArgs(Dequeue()));
    }

    //  this is where I think I will have a problem...
    new public DataTable Dequeue()
    {
        DataTable table;

        lock (que.SyncRoot)
        {
            table = base.Dequeue();
        }

        return table;
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

So, when a DataTable is queued, the EventArgs will pass a dequeued table to the event subscriber. Will this implementation provide me with a thread-safe Queue?

Danelledanete answered 29/12, 2010 at 15:22 Comment(4)
que is utterly useless. You should lock on a readonly object key = new object();.Formalize
@SLaks: I implemented ICollection que and lock(que.SyncRoot) based on MSDN: msdn.microsoft.com/en-us/library/bb344892.aspxDanelledanete
You don't need it at all. SyncRoot is useful if you have disjoint pieces of code that need to lock for the same collection. In your case, que is null. You just need to lock on a single object in your methods.Formalize
I can't see why you are trying to use a queue here, as it de-queues immediately on the same thread. If you want to have worker threads handle de-queuing, you can use a producer/consumer queue pattern. There are many examples on SO. Also, I assume this throws nullreference exception as soon as you enqueue.Salvadorsalvadore
F
2

You're dequeueing your items as you enqueue them.
You need to raise the event using your parameter.

Whether it's actually thread-safe depends on how you use it.
If you ever check the Count or check for emptiness, it's not threadsafe and cannot easily be made threadsafe.
If you don't, you can probably use something simpler than a queue.

Formalize answered 29/12, 2010 at 15:25 Comment(4)
The enqueueing is expected to be faster than the dequeueing (however, I don't have any data yet to confirm either way). Enqueueing is also performed on the main thread. When the event fires, I use an action BeginInvoke to run the dequeuing process asynchronously.Danelledanete
Also...I don't ever check Count and didn't see a need to do so since I am dequeuing almost immediately on a worker thread. I don't know what would be simpler than a Queue<T> unless I went to a List<T> - which I almost did, but the Queue<T> class seemed to have everything I would need except, of course, for the event when an item is queued.Danelledanete
It looks like you don't need a queue at all; you can just pass the table as a parameter to BeginInvoke.Formalize
@Danelledanete I you want to create an event to be raised on item queue (item arrived), you could do it by creating a custom event EventHandler. Below there is an example using NET 3.5: experts-exchange.com/questions/28768639/…Strachey
P
3

A quick trip to my favorite search engine revealed that my memory was correct; you can get the Task Parallel Library even on .NET 3.5. Also see The PFX team blog post on the subject, and the Reactive Extensions that you download in order to get at the desired System.Threading.dll.

Putscher answered 29/12, 2010 at 15:32 Comment(4)
Unfortunately, I am bound (by internal policy) to the basic libraries used in .Net 3.5 - basically keeping all developers using the same libraries. If I try to use TPLib, I'll be marked as a rogue. Otherwise, a good option.Danelledanete
That's an insane policy. Re-inventing the wheel is one of the worst productivity drains, and a great source of bugs.Formalize
This kind of policy is usually enforced by people who know nothing about programming...Diplomat
Actually, the policy makes sense when the CTP is no longer available and the reactive extensions are unsupported .Danelledanete
D
3

The fact you need to use new to hide methods from the base class is usually an indication that you should use composition rather than inheritance...

Here's a simple synchronized queue, which doesn't use inheritance but still relies on the behavior of the standard Queue<T>:

public class ConcurrentQueue<T> : ICollection, IEnumerable<T>
{
    private readonly Queue<T> _queue;

    public ConcurrentQueue()
    {
        _queue = new Queue<T>();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (SyncRoot)
        {
            foreach (var item in _queue)
            {
                yield return item;
            }
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        lock (SyncRoot)
        {
            ((ICollection)_queue).CopyTo(array, index);
        }
    }

    public int Count
    {
        get
        { 
            // Assumed to be atomic, so locking is unnecessary
            return _queue.Count;
        }
    }

    public object SyncRoot
    {
        get { return ((ICollection)_queue).SyncRoot; }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public void Enqueue(T item)
    {
        lock (SyncRoot)
        {
            _queue.Enqueue(item);
        }
    }

    public T Dequeue()
    {
        lock(SyncRoot)
        {
            return _queue.Dequeue();
        }
    }

    public T Peek()
    {
        lock (SyncRoot)
        {
            return _queue.Peek();
        }
    }

    public void Clear()
    {
        lock (SyncRoot)
        {
            _queue.Clear();
        }
    }
}
Diplomat answered 29/12, 2010 at 15:46 Comment(8)
What for explicit interface implementation and cast onto _queue?Thoreau
lock-ing in an iterator is dangerous.Formalize
Normally, I wouldn't derive a class where I would then use new on a method. But I didn't want to rewrite a bunch of methods provided by the base. Since I knew I was only going to replace 2 of the methods, no harm. All of the base methods and properties are available without any additional effort on my part. If my thinking is wrong, then I would like an explanation as to why I shouldn't derive my own ConcurrentQueue<T> from Queue<T>.Danelledanete
@SLaks, I agree that it feels wrong, but I can't put my finger on exactly why it is "dangerous"... could you elaborate?Diplomat
@dboarman, it's ok as long as you use your class through a variable of this type. But if you use it through a variable of type Queue<T>, your new methods won't be calledDiplomat
@Thomas: The lock will only be released when you Dispose() the iterator's IEnumerator<T>. #2275164Formalize
@SLaks, that's precisely what I intended... but I realize it can be an issue. On the other hand, if you don't lock during the enumeration, another thread can modify the collection, invalidating the enumerator. I'm not sure which is worse...Diplomat
@Thomas: I decided to leave the Queue<T> class as SLaks suggested: #4568156Danelledanete
F
2

You're dequeueing your items as you enqueue them.
You need to raise the event using your parameter.

Whether it's actually thread-safe depends on how you use it.
If you ever check the Count or check for emptiness, it's not threadsafe and cannot easily be made threadsafe.
If you don't, you can probably use something simpler than a queue.

Formalize answered 29/12, 2010 at 15:25 Comment(4)
The enqueueing is expected to be faster than the dequeueing (however, I don't have any data yet to confirm either way). Enqueueing is also performed on the main thread. When the event fires, I use an action BeginInvoke to run the dequeuing process asynchronously.Danelledanete
Also...I don't ever check Count and didn't see a need to do so since I am dequeuing almost immediately on a worker thread. I don't know what would be simpler than a Queue<T> unless I went to a List<T> - which I almost did, but the Queue<T> class seemed to have everything I would need except, of course, for the event when an item is queued.Danelledanete
It looks like you don't need a queue at all; you can just pass the table as a parameter to BeginInvoke.Formalize
@Danelledanete I you want to create an event to be raised on item queue (item arrived), you could do it by creating a custom event EventHandler. Below there is an example using NET 3.5: experts-exchange.com/questions/28768639/…Strachey
T
2

Some time after the initial question, I know (this came up as "related" to the right of another question), but I've gone with the following in similar cases. Not as good for CPU-cache-use as it could be, but simple, lock-free, thread-safe, and often CPU-cache-use isn't that important if there'd often be large gaps between operations, and when not the closeness of allocation might reduce the impact:

internal sealed class LockFreeQueue<T>
{
  private sealed class Node
  {
    public readonly T Item;
    public Node Next;
    public Node(T item)
    {
      Item = item;
    }
  }
  private volatile Node _head;
  private volatile Node _tail;
  public LockFreeQueue()
  {
    _head = _tail = new Node(default(T));
  }
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
  public void Enqueue(T item)
  {
    Node newNode = new Node(item);
    for(;;)
    {
      Node curTail = _tail;
      if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)   //append to the tail if it is indeed the tail.
      {
        Interlocked.CompareExchange(ref _tail, newNode, curTail);   //CAS in case we were assisted by an obstructed thread.
        return;
      }
      else
      {
        Interlocked.CompareExchange(ref _tail, curTail.Next, curTail);  //assist obstructing thread.
      }
    }
  }    
  public bool TryDequeue(out T item)
  {
    for(;;)
    {
      Node curHead = _head;
      Node curTail = _tail;
      Node curHeadNext = curHead.Next;
      if (curHead == curTail)
      {
        if (curHeadNext == null)
        {
          item = default(T);
          return false;
        }
        else
          Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);   // assist obstructing thread
      }
      else
      {
        item = curHeadNext.Item;
        if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
        {
          return true;
        }
      }
    }
  }
#pragma warning restore 420
}
Tagmemics answered 9/12, 2011 at 10:10 Comment(0)
E
0

In the line OnTableQueued(new TableQueuedEventArgs(Dequeue())); in your Enqueue method

use Peek instead of Dequeue

It should be

OnTableQueued(new TableQueuedEventArgs(base.Peek()));

Emotional answered 29/12, 2010 at 15:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.