Thread safe limited size queue, without using lock
Asked Answered
S

2

3

I'm trying to write a subj queue, but I get deadlocks and other multithreading problems. I want to use Interlocked.CompareExchange to avoid lock usage. But this code doesn't work as expected: it just wipe entire Queue. What am I doing wrong here?

public class FixedSizedQueue<T> : IEnumerable<T>
{
    readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    public int Limit { get; set; }

    public FixedSizedQueue(int limit)
    {
        Limit = limit;
    }

    public void Enqueue(T obj)
    {
        _queue.Enqueue(obj);
        if (_queue.Count <= Limit)
            return;
        int count = _queue.Count;
        if (_queue.Count != Interlocked.CompareExchange(ref count, count, _queue.Count))
        {
            T overflow;
            while (_queue.TryDequeue(out overflow))
            {

            }
        }
    }

    public T[] ToArray()
    {
        return _queue.ToArray();
    }

    public IEnumerator<T> GetEnumerator()
    {
        return _queue.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

Maybe I just need another thread that will just cut the queue...

Stelu answered 7/9, 2015 at 9:29 Comment(4)
Well, your while loop clearly wipes the entire queue. That can't work. Limit that loop.Marvismarwin
To quote Eric Lippert: "Just take the lock."Thar
@Thar tnx for link, I follow this adviceStelu
Use BlockingCollection instead, the BoundedCapacity property sets the limit. It uses ConcurrentQueue under the hood to implement the collection.Flour
K
4

Interlocked.CompareExchange is meaningless on stack variable count, as it is accessed from single thread. As I guess, you tried to use this method on _queue.Count, but it failed to be compiled because .Count is a property, not a simple variable. So you need to define counter in your class.

public class FixedSizedQueue<T> : IEnumerable<T>
{
    readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    int CountShadow = 0; // Counter for check constraints.
    public int Limit { get; set; }

    public FixedSizedQueue(int limit)
    {
        Limit = limit;
    }

    public void Enqueue(T obj)
    {
        /* Update shadow counter first for check constraints. */
        int count = CountShadow;
        while(true)
        {
             if(count => Limit) return; // Adding element would violate constraint
             int countOld = Interlocked.CompareExchange(ref CountShadow, count + 1, count);
             if(countOld == count) break; //Successful update
             count = countOld;
        }
        _queue.Enqueue(obj); // This will update real counter.
    }
    ...
}

Also, you need to set your own setter for Limit property, which would maintain invariant CountShadow <= Limit. Or just forbid user to set that property after object's construction.

Ketchan answered 7/9, 2015 at 10:57 Comment(2)
did you mean Interlocked.CompareExchange(ref CountShadow, count + 1, count)?Cutis
@dugas: Thanks for note, fixed. Funny that different languages has different parameters order for CAS operation.Ketchan
S
1

Tsyvarev's approach is clever and valid, but there is also another way to limit the size of the queue with the Interlocked class. Instead of spinning with the Interlocked.CompareExchange until the current thread wins the optimistic race, it is also possible to just increment the CountShadow field, and then immediately decrement it in case the maximum limit has been exceeded. Here is an implementation of this idea:

public class ConcurrentBoundedQueue<T> : IEnumerable<T>
{
    private readonly ConcurrentQueue<T> _queue;
    private readonly int _boundedCapacity;
    private volatile int _approximateCount;

    public ConcurrentBoundedQueue(int boundedCapacity)
    {
        if (boundedCapacity < 1)
            throw new ArgumentOutOfRangeException(nameof(boundedCapacity));
        _queue = new();
        _boundedCapacity = boundedCapacity;
        _approximateCount = 0;
    }

    public int BoundedCapacity => _boundedCapacity;
    public int Count => _queue.Count;

    public bool TryEnqueue(T item)
    {
        if (_approximateCount >= _boundedCapacity) return false;
        if (Interlocked.Increment(ref _approximateCount) > _boundedCapacity)
        {
            Interlocked.Decrement(ref _approximateCount);
            return false;
        }
        _queue.Enqueue(item);
        return true;
    }

    public bool TryDequeue(out T item)
    {
        bool success = _queue.TryDequeue(out item);
        if (success) Interlocked.Decrement(ref _approximateCount);
        return success;
    }

    public T[] ToArray() => _queue.ToArray();
    public IEnumerator<T> GetEnumerator() => _queue.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

According to my experiments this approach is slightly faster than Tsyvarev's approach, under conditions of intense activity and contention. Functionally the two approaches seem to be identical. They both enforce the boundedCapacity policy, and I can't see any difference in the way they behave. They are also both faster than wrapping a normal Queue<T>, and protecting it with the lock statement.

It should be noted that the functionality offered by the ConcurrentBoundedQueue<T> class is also offered out of the box by the built-in BlockingCollection<T>. The TryEnqueue method corresponds to the TryAdd, and the TryDequeue to the TryTake. These APIs use internally Interlocked operations, in a similar way with Tsyvarev's solution. According to my experiments using the BlockingCollection<T> for this purpose has considerable overhead, that makes it even slower than a simple lock-protected Queue<T>.

Storyteller answered 15/3, 2023 at 2:17 Comment(9)
In many use-cases, it's a problem when an increment overshoots, not maintaining an invariant like count < capacity. But it's probably fine here, since the ConcurrentQueue is thread-safe on its own and doesn't need locking or an accurate index to a position for TryDequeue. So I think no correctness problem, although the overshoot could make other threads think they've failed when they didn't need to. This results in more total actual modifications to _approximateCount, but if a failed CAS is just as expensive then perhaps less total operations.Freehand
I've sometimes wondered about this tradeoff of overshoot and correct vs. CAS-retry loop for stuff like counted locks, but never tried benchmarking since I wasn't sure what load / contention conditions might be realistic. (And because I'm lazy).Freehand
You could check the return value from Interlocked.Decrement and go on to .Enqueue if another thread has decremented in the meantime; perhaps a reader, or another writer correcting its own overshoot. That extra logic would probably only come into play much in high-contention scenarios, though, like an artificial benchmark where multiple threads spend all their time hammering on this.Freehand
If you can relax the upper limit to allow _boundedCapacity + numThreads total objects in the _queue, you don't need to do any checking or fixups. IDK if there are scenarios where that has a bad effect on fairness, though.Freehand
@PeterCordes I assume that the ConcurrentBoundedQueue<T> would be useful mainly as a pool of reusable objects, and these objects could have a significant memory footprint. In that case allowing more than boundedCapacity objects to be stored would not be desirable. I think that enforcing strictly the boundedCapacity policy is what most people would be comfortable with.Storyteller
@PeterCordes I just tested a variant that takes into account the Interlocked.Decrement, replacing the if block with while, and inside the block: if (Interlocked.Decrement(ref _approximateCount) >= _boundedCapacity) return false;. It doesn't make much of a difference. In my test environment ~99% of the rejected TryEnqueue operations are doing at most one loop.Storyteller
@PeterCordes I got my statistics slightly wrong. Actually it is: ~98% of the total TryEnqueue operations are doing at most one loop. Previously I was counting operations with two loops or less. The multi-loop operations are still a small minority, that doesn't have a measurable impact on performance.Storyteller
Oh, I had been thinking my idea would save atomic RMW operations, like that if you saw Interlocked.Decrement produce a result that was below the capacity limit, you could go straight to the enqueue. But no, that would under-count, you do need to retry the increment so it's really just a matter of whether you return to the caller for it to retry, or like you say change the if to a while in this function. It might make sense to _mm_pause() (or C#'s SpinWait.SpinOnce?) if the Interlocked.Decrement return value is still at or above the limit, so an increment wouldn't succeed.Freehand
@PeterCordes in the object-pool scenario the caller wouldn't retry. In case the TryEnqueue wasn't successful, the superfluous object would be discarded. If you want to enqueue the object no matter what, meaning that you want to block the caller until there is space available in the queue, there is the BlockingCollection<T> class that offers this functionality out of the box.Storyteller

© 2022 - 2025 — McMap. All rights reserved.