Caching asynchronous operations

2019-02-02 08:36发布

问题:

I am looking for an elegant way of caching the results of my asynchronous operations.

I first had a synchronous method like this:

public String GetStuff(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = request.GetResponse())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return sr.ReadToEnd();
}

Then I made it asynchronous:

public async Task<String> GetStuffAsync(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = await request.GetResponseAsync())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return await sr.ReadToEndAsync();
}

Then I decided that I should cache the results, so I do not need to query outside that often:

ConcurrentDictionary<String, String> _cache = new ConcurrentDictionary<String, String>();

public async Task<String> GetStuffAsync(String url)
{
    return _cache.GetOrAdd(url, await GetStuffInternalAsync(url));
}

private async Task<String> GetStuffInternalAsync(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = await request.GetResponseAsync())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return await sr.ReadToEndAsync();
}

Then I read an article (o watched a video) about how caching Task<T> is better, because creating them is expensive:

ConcurrentDictionary<String, Task<String>> _cache = new ConcurrentDictionary<String, Task<String>>();

public Task<String> GetStuffAsync(String url)
{
    return _cache.GetOrAdd(url, GetStuffInternalAsync(url));
}

private async Task<String> GetStuffInternalAsync(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = await request.GetResponseAsync())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return await sr.ReadToEndAsync();
}

And now the problem is, that if the request fails (e.g.: a HTTP 401), the cache will contain a failed Task<String> and I will have to reset the app because it will be impossible to resend the request.

Is there a elegant way of using ConcurrentDictionary<T1,T2> to cache only successful tasks and still have the atomic behavior?

回答1:

First of all, both your approaches are wrong, because they don't save you any requests (though the second one at least saves you time).

Your first code (the one with await) does this:

  1. Make the request.
  2. Wait for the request to complete.
  3. If there already was a result in the cache, ignore the result of the request.

Your second code removes step 2, so it's faster, but you're still making lots of unnecessary requests.

What you should do instead is to use the overload of GetOrAdd() that takes a delegate:

public Task<String> GetStuffAsync(String url)
{
    return _cache.GetOrAdd(url, GetStuffInternalAsync);
}

This doesn't completely eliminate the possibility of requests that are ignored, but it does make them much less likely. (For that, you could try canceling requests that you know are being ignored, but I don't think that's worth the effort here.)


Now to your actual question. What I think you should do is to use the AddOrUpdate() method. If the value isn't there yet, add it. If it's there, replace it if it's faulted:

public Task<String> GetStuffAsync(String url)
{
    return _cache.AddOrUpdate(
        url, GetStuffInternalAsync, (u, task) =>
        {
            if (task.IsCanceled || task.IsFaulted)
                return GetStuffInternalAsync(u);
            return task;
        });
}


回答2:

It's actually reasonable (and depending on your design and performance, crucial) to keep those failed tasks as a Negative Cache. Otherwise, if a url always fails, using it again and again defeats the point of using a cache altogether.

What you do need is a way to clear the cache from time to time. The simplest way is to have a timer that replaces the ConcurrentDictionarry instance. The more robust solution is to build your own LruDictionary or something similar.



回答3:

I have made an wrapper for MemoryCache that basically caches Lazy<Task<T>> objects and works so that all the following problems are solved:

  • No parallel or unnecessary operations to get a value will be started. Multiple call sites or threads could await for the same value from the cache.
  • Failed Tasks are not cached. (No negative caching.)
  • Cache users can't get invalidated results from the cache, even if the value is invalidated during an await.

The solution is further explained in my blog and the full working code is available at GitHub.



回答4:

Here's a way to cache results of asynchronous operations that guarantees no cache misses.

In the accepted answer, if the same url is requested many times in a loop (depending on the SynchronizationContext) or from multiple threads the web request will keep getting sent out until there's a response that gets cached, at which point the cache will start getting used.

The method below creates a SemaphoreSlim object for each unique key. This will prevent the long running async operation from running multiple times for the same key while allowing it to be running simultaneously for different keys. Obviously, there's overhead keeping SemaphoreSlim objects around to prevent cache misses so it may not be worth it depending on the use case. But if guaranteeing no cache misses is important than this accomplishes that.

private readonly ConcurrentDictionary<string, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly ConcurrentDictionary<string, string> _cache = new ConcurrentDictionary<string, string>();

public async Task<string> GetSomethingAsync(string key)
{   
    string value;
    // get the semaphore specific to this key
    var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1));
    await keyLock.WaitAsync();
    try
    {
        // try to get value from cache
        if (!_cache.TryGetValue(key, out value))
        {
            // if value isn't cached, get it the long way asynchronously
            value = await GetSomethingTheLongWayAsync();

            // cache value
            _cache.TryAdd(key, value);
        }
    }
    finally
    {
        keyLock.Release();
    }
    return value;
}

Edit: As @mtkachenko mentioned in the comments, an additional cache check could be performed at the beginning of this method to potentially skip the lock acquisition step.



回答5:

This work for me:

ObjectCache _cache = MemoryCache.Default;
static object _lockObject = new object();
public Task<T> GetAsync<T>(string cacheKey, Func<Task<T>> func, TimeSpan? cacheExpiration = null) where T : class
{
    var task = (T)_cache[cacheKey];
    if (task != null) return task;          
    lock (_lockObject)
    {
        task = (T)_cache[cacheKey](cacheKey);
        if (task != null) return task;
        task = func();
        Set(cacheKey, task, cacheExpiration);
        task.ContinueWith(t => {
            if (t.Status != TaskStatus.RanToCompletion)
                _cache.Remove(cacheKey);
        });
    }
    return task;
} 


回答6:

Another easy way to do this is to extend Lazy<T> to be AsyncLazy<T>, like so:

public class AsyncLazy<T> : Lazy<Task<T>>
{
    public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) :
        base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode)
    { }

    public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); }
}

Then you can do this:

private readonly ConcurrentDictionary<string, AsyncLazy<string>> _cache
    = new ConcurrentDictionary<string, AsyncLazy<string>>();

public async Task<string> GetStuffAsync(string url)
{
    return await _cache.GetOrAdd(url,
        new AsyncLazy<string>(
            () => GetStuffInternalAsync(url),
            LazyThreadSafetyMode.ExecutionAndPublication));
}