Async threadsafe Get from MemoryCache
Asked Answered
D

4

25

I have created a async cache that uses .NET MemoryCache underneath. This is the code:

public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if(parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    if(!_cache.Contains(key))
    {
        var data = await populator();
        lock(_cache)
        {
            if(!_cache.Contains(key)) //Check again but locked this time
                _cache.Add(key, data, DateTimeOffset.Now.Add(expire));
        }
    }

    return (T)_cache.Get(key);
}

I think the only downside is that I need to do the await outside the lock so the populator isn't thread safe, but since the await can't reside inside a lock I guess this is the best way. Are there any pitfalls that I have missed?

Update: A version of Esers answer that is also threadsafe when another thread invalidates the cache:

public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if(parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var lazy = new Lazy<Task<T>>(populator, true);
    _cache.AddOrGetExisting(key, lazy, DateTimeOffset.Now.Add(expire));
    return ((Lazy<Task<T>>) _cache.Get(key)).Value;
}

It can however be slower because it creates Lazy instances that never will be executed and it uses Lazy in full threadsafe mode LazyThreadSafetyMode.ExecutionAndPublication

Update with new benchmark (Higher is better)

Lazy with lock      42535929
Lazy with GetOrAdd  41070320 (Only solution that is completely thread safe)
Semaphore           64573360
Dextrose answered 5/8, 2015 at 11:57 Comment(6)
Suppose a new thread comes with the same key while the first one awaits populate. populator for the same key will be executed unnecessarily twice.Serena
you can use a SempahoreSlim with count 1, it has asynchronous wait msdn.microsoft.com/en-us/library/hh462805(v=vs.110).aspxLamellibranch
Yeah, that's the downside I'm aware of, but it's hard to build around since await can't be locked?Dextrose
@ned, nice, will look at thatDextrose
So far as I know is the MemoryCache thread safe. Or did I miss something?Reboant
The internal methods are, but if you call contains and then Add those are not threadsafe, plus the async populator is not threadsafe at all even with my above codeDextrose
P
15

A simple solution would be to use SemaphoreSlim.WaitAsync() instead of a lock, and then you could get around the issue of awaiting inside a lock. Although, all other methods of MemoryCache are thread-safe.

private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public async Task<T> GetAsync(
            string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    if (!_cache.Contains(key))
    {
        await semaphoreSlim.WaitAsync();
        try
        {
            if (!_cache.Contains(key))
            {
                var data = await populator();
                _cache.Add(key, data, DateTimeOffset.Now.Add(expire));
            }
        }
        finally
        {
            semaphoreSlim.Release();
        }
    }

    return (T)_cache.Get(key);
}
Pascal answered 5/8, 2015 at 12:2 Comment(15)
Yes but the contains key check and then insert into cache is notDextrose
Nice solution, Ned was also on the same trackDextrose
But there is a bug ;) The populator execution must be inside the if block, otherwise the semaphore solution is no better than my solution aboveDextrose
What if the moment after you check !_cache.Contains(key) but before you do return (T)_cache.Get the item gets deleted from the cache due to the expiration policy? The former should be AddOrGetExisting. Actually you have the same issue with the inner _cache.Contains(key) too.Company
@OhadSchneider By former you means the check for Contains? It's definitely a possibility that the key would expire prior to the Get call.Pascal
Yes I mean both _cache.Contains calls. On one hand your method provides "GetOrAdd" semantics (should be renamed really), but on the other hand in some edge cases it does not. Moreover, it would return null in those cases, which would be indistinguishable from an actual value of null for that key in the cache.Company
@OhadSchneider You're right. The main focus of my answer wasn't to redefine the semantics of the call made by the OP, but to show how it's possible to use SemaphoreSlim to use in conjunction with an async call. I'll reiterate the code to tackle the expiry issues.Pascal
Thanks! BTW I was wrong in my last observation - you can't add null items to a memory cache, possibly due to the the ambiguity reason I mentioned (of course, it is not documented anywhere, you just get an ArgumentNullException).Company
The nice thing about this implementation is that the expiration can be a function of the populator's result. For example if the latter gets a token from some service, you could set the cache expiration to the token's expiration.Company
This isn't a solution. Every time the cache is called it may have to await other calls. What if you are trying to cache multiple pieces of data? You might find that one thing takes 100 milliseconds, but it keeps having to wait for things that take 2-3+ seconds.Orthoepy
@ChristianFindlay I'm not sure what you mean by "this isn't a solution". It definitely is a solution, which may perform poorly if you have awaitables that a long time to return. But, same is true for all solutions that internally await. If you care about cache performance, don't make the populator an awaitable, make it pass the value directly.Pascal
It's not a solution because it shares a lock for all the caches. You'd at least need a semaphore for each key (i.e. keep a dictionary of keys) to make this work. Someone else posted a solution here: https://mcmap.net/q/169195/-stop-reentrancy-on-memorycache-callsOrthoepy
@ChristianFindlay It doesn't share a lock for all caches, since there's a single cache. It does share a lock for ALL keys, if that's what you mean. You could implement this more efficiently, yes, but that doesn't make this any less of a solution then the link you pasted here.Pascal
I meant it shares the lock for all cache entries (or keys)Orthoepy
@YuvalItzchakov I'd say that it performs suboptimally when the cache is handling 2+ entries. That's the typical use-case for a cache, but the OP's question isn't clear if they are wanting to cache multiple entries. I'd not make that assumption though. In most cases, this would be deal-breaker (it is for me). I'd put a caveat though. Just my 2c.Korten
C
13

The current answers use the somewhat outdated System.Runtime.Caching.MemoryCache. They also contain subtle race conditions (see comments). Finally, not all of them allow the timeout to be dependent on the value to be cached.

Here's my attempt using the new Microsoft.Extensions.Caching.Memory (used by ASP.NET Core):

//Add NuGet package: Microsoft.Extensions.Caching.Memory    

using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Primitives;

MemoryCache _cache = new MemoryCache(new MemoryCacheOptions());

public Task<T> GetOrAddAsync<T>(
        string key, Func<Task<T>> factory, Func<T, TimeSpan> expirationCalculator)
{    
    return _cache.GetOrCreateAsync(key, async cacheEntry => 
    {
        var cts = new CancellationTokenSource();
        cacheEntry.AddExpirationToken(new CancellationChangeToken(cts.Token));
        var value = await factory().ConfigureAwait(false);
        cts.CancelAfter(expirationCalculator(value));
        return value;
    });
}

Sample usage:

await GetOrAddAsync("foo", () => Task.Run(() => 42), i  => TimeSpan.FromMilliseconds(i)));

Note that it is not guaranteed for the factory method to be called only once (see https://github.com/aspnet/Caching/issues/240).

Company answered 8/7, 2017 at 14:3 Comment(1)
learn.microsoft.com/en-us/dotnet/api/…Matthews
F
12

Although there is an already accepted answer, I'll post a new one with Lazy<T> approach. Idea is: to minimize the duration of lock block, if the key doesn't exists in cache, put a Lazy<T> to cache. That way all threads using the same key at the same time will be waiting the same Lazy<T>'s value

public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    lock (_cache)
    {
        if (!_cache.Contains(key))
        {
            var lazy = new Lazy<Task<T>>(populator, true);
            _cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
        }
    }

    return ((Lazy<Task<T>>)_cache.Get(key)).Value;
}

Version2

public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var lazy = ((Lazy<Task<T>>)_cache.Get(key));
    if (lazy != null) return lazy.Value;

    lock (_cache)
    {
        if (!_cache.Contains(key))
        {
            lazy = new Lazy<Task<T>>(populator, true);
            _cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
            return lazy.Value;
        }
        return ((Lazy<Task<T>>)_cache.Get(key)).Value;
    }
}

Version3

public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var task = (Task<T>)_cache.Get(key);
    if (task != null) return task;

    var value = populator();
    return 
     (Task<T>)_cache.AddOrGetExisting(key, value, DateTimeOffset.Now.Add(expire)) ?? value;
}
Forgiving answered 5/8, 2015 at 12:28 Comment(14)
With your new pattern you can get rid of the lock entirely by dropping the Contains check and switching the the Add call to AddOrGetExisting.Grim
With Scotts additon its even thread safe when removing entries (Wich not mine nor Yuvals is)Dextrose
@Dextrose I tried it after Scott's comment, but it requires unnecessary Lazy's (although they are not evaluated) be created. So I didn't post it.Forgiving
True, also a Lazy set to full thread safe mode is alot slower than Yuval's solutionDextrose
@Dextrose Also no need for async/await :)Forgiving
Did a bench, your version is slowest actuallyDextrose
@Dextrose I changed the code a little bit, and my results are not similar to yours. Can you test also the version2Forgiving
Version two is much faster then Semaphore. Version of that that only uses thread saftly of MemoryCache is this pastebin.com/ggfxqGkp My test looks like this btw pastebin.com/FSjayVXzDextrose
@Dextrose One more optimization can be done. Just drop all Lazy's. Task.Result (or await) is thread safe. See Version3.Forgiving
@Forgiving thats just test code actually, hmm, which makes the test bad. How can I execute the result in a manner that Web API does for more correct test?Dextrose
@Dextrose Yes, Sorry for disturbance.Educational
Versions 1 and 2 suffer from a race condition where the item gets cleared right before you return it using _cache.Get.Company
@OhadSchneider Your edit revision 9 of the code for Version3 has introduced a bug. The code as you have written it can return null, since AddOrGetExisting returns null in the cases when it Adds. You should rollback that change to revision 8. @Forgiving could also do the rollback. If the goal of the change was to make the code shorter, consider this: return (Task<TValue>)(_cache.AddOrGetExisting(key, populator(), DateTimeOffset.Now.Add(expire)) ?? _cache.Get(key));. (Although some will argue it is less readable that the original if statement.)Divest
@Divest Thanks, I assumed AddOrGetExisting had the same contract as GetOrAdd in ConcurrentDictionary (which makes much more sense IMO). Anyway, your proposed fix contains a race condition (imagine the value is added but then is then immediately removed before you get to _cache.Get) so I fixed it slightly differently.Company
K
0

This thread is a bit old but I hit this recently and I thought I'd leave this answer hoping it helps.

With async, there are few things to keep in mind:

  1. An "uber lock" approach is not quick as it blocks the factory operations on other keys when performing one on a key.
  2. A "lock per key" (SemaphoreSlim) has 2 things going on: a. It's disposable, so disposing it after could race. b. Live with not disposing it.

I chose to solve it using a pool of locks. It's not required to have a lock per key, but just enough locks as the max active threads possible. I then assign the same lock to the key through hashing. The pool size is a function of ProcessorCount. The valueFactory is executed only once. Since multiple keys map to a lock (a key always maps to the same lock), operations for keys with the same hash will get synchronized. So this loses some parallelism, and this compromise may not work for all cases. I'm OK with this compromise. This is the approach that LazyCache, and FusionCache (one of its many approaches) use, among other things. So I'd use one of them, but it's good to know the trick though as it's pretty nifty.

private readonly SemaphoreSlimPool _lockPool = new SemaphoreSlimPool(1, 1);

private async Task<TValue> GetAsync(object key, Func<ICacheEntry, Task<TValue>> valueFactory)
{
    if (_cache.TryGetValue(key, out var value))
    {
        return value;
    }

    // key-specific lock so as to not block operations on other keys
    var lockForKey = _lockPool[key];
    await lockForKey.WaitAsync().ConfigureAwait(false);
    try
    {
        if (_cache.TryGetValue(key, out value))
        {
            return value;
        }

        value = await _cache.GetOrCreateAsync(key, valueFactory).ConfigureAwait(false);
        return value;
    }
    finally
    {
        lockForKey.Release();
    }
}

// Dispose SemaphoreSlimPool

And here's the SemaphoreSlimPool impl (source, nuget).

/// <summary>
/// Provides a pool of SemaphoreSlim objects for keyed usage.
/// </summary>
public class SemaphoreSlimPool : IDisposable
{
    /// <summary>
    /// Pool of SemaphoreSlim objects.
    /// </summary>
    private readonly SemaphoreSlim[] pool;

    /// <summary>
    /// Size of the pool.
    /// <para></para>
    /// Environment.ProcessorCount is not always correct so use more slots as buffer,
    /// with a minimum of 32 slots.
    /// </summary>
    private readonly int poolSize = Math.Max(Environment.ProcessorCount << 3, 32);

    private const int NoMaximum = int.MaxValue;

    /// <summary>
    /// Ctor.
    /// </summary>
    public SemaphoreSlimPool(int initialCount)
        : this(initialCount, NoMaximum)
    { }

    /// <summary>
    /// Ctor.
    /// </summary>
    public SemaphoreSlimPool(int initialCount, int maxCount)
    {
        pool = new SemaphoreSlim[poolSize];
        for (int i = 0; i < poolSize; i++)
        {
            pool[i] = new SemaphoreSlim(initialCount, maxCount);
        }
    }

    /// <inheritdoc cref="Get(object)" />
    public SemaphoreSlim this[object key] => Get(key);

    /// <summary>
    /// Returns a <see cref="SemaphoreSlim"/> from the pool that the <paramref name="key"/> maps to.
    /// </summary>
    /// <exception cref="ArgumentNullException"></exception>
    public SemaphoreSlim Get(object key)
    {
        _ = key ?? throw new ArgumentNullException(nameof(key));
        return pool[GetIndex(key)];
    }

    private uint GetIndex(object key)
    {
        return unchecked((uint)key.GetHashCode()) % (uint)poolSize;
    }

    private bool disposed = false;

    public void Dispose()
    {
        Dispose(true);
    }

    public void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (pool != null)
                {
                    for (int i = 0; i < poolSize; i++)
                    {
                        pool[i].Dispose();
                    }
                }
            }

            disposed = true;
        }
    }
}

I've thrown quite a bit of threads on this with a lot of churn due to low ttl and it's not bombing out. So far it looks good to me, but I'd like to see if anyone can find bugs.

Korten answered 3/3, 2022 at 10:3 Comment(7)
I don't like this solution. The number of stored entries in a MemoryCache is not related to the number of processors in any way. A MemoryCache may store thousands of keys, in a machine having 4 cores. What this solution does is to use the same synchronization primitive (a SemaphoreSlim) for all different keys that may have the same key.GetHashCode() % poolSize value. So the GetAsync("Light") may have to wait for the completion of the unrelated GetAsync("Heavy"), in case the "Light" and "Heavy" keys happens to have the same hashcode modulo 32.Housewifely
Also the valueFactory might not be invoked on the current context, because of the ConfigureAwait(false), with unknown consequences.Housewifely
Yup, that's a compromise that's ok for me. Cache is typically not write-heavy. All other solutions I seen so far either have a race for valueFactory or live with not disposing, which is worse imo.Korten
The consuming code can not use ConfigureAwait(false). In my case, I need it.Korten
Hi, FusionCache creator here: just to be clear, FusionCache does not have this contention problem, since it's not using the same lock (SemaphoreSlim) per multiple keys, but it has a 1.1 approach.Pusillanimous
@JodyDonetti What do you mean by "1.1 approach"? This thread is old, and my answer is outdated. If you want, I can update it. What I meant is that FusionCache uses a pool of objects trick, not pool of SemaphoreSlims (last I had check, it was a constant size of ~8k object locks). See github.com/ZiggyCreatures/FusionCache/blob/…. I chose, sometimes, to lock on an object from a pool of objects just to create a task, and await the task outside of the lock.Korten
In your answer (not comment) there's a pool of SemaphoreSlim, so I was commenting on that. In FusionCache there' is not such pool, and the pool that is there is just for when a new SemaphoreSlim will need to be created the first time: in the end it's always one SemaphoreSlim per key (and that was the "1:1" part which I wrote wrongly as "1.1").Pusillanimous

© 2022 - 2024 — McMap. All rights reserved.