I'm looking to build a parallel cache. The requirement is that there will be n number of datacollectors that need to be fired at once. Each of these data collectors will hit a boundary layer (call this the service layer) and retrieve data. However, since this is within the same request (WCF), if 2 data collectors need to invoke the same method on the service layer, I don't want the second request to wait for the first one to complete.
This needs to be build transparently to developers building the data collectors (Using Unity Interception to insert this caching aspect).
here is what the flow would look like. Would Reactive extensions be the correct fit for this kind of a design? I haven't worked with Rx in the past, and do not want to hit a brick wall 10 days into development. Otherwise, a combination of async, await and events might also serve well here.
EDIT: I implemented this using Rx - works well in a multi threaded context. The interesting bit was to try add instead of tryGet. (This is an Unity Interception CallHandler)
/// <summary>
/// Intercepts the calls and tries to retrieve from the cache
/// </summary>
class CacheCallHandler : ICallHandler
{
[Dependency]
public ICache RequestCache { get; set; }
public IMethodReturn Invoke(IMethodInvocation input, GetNextHandlerDelegate getNext)
{
IMethodReturn mesg = null;
string cacheKey = CacheKeyGenerator.GetCacheKey(input);
//create the task to retrieve the data
var task = new Task<IMethodReturn>(() =>
{
return getNext()(input, getNext);
});
//make it observable
var observableItem = task.ToObservable();
//try to add it to the cache
//we need to do this in the order of Add and then try to get, otherwise multiple thread might enter the same area
if (RequestCache.TryAdd(cacheKey, observableItem))
{
//if the add succeeed, it means that we are responsible to starting this task
task.Start();
}
else
{
if ( RequestCache.TryGetValue(cacheKey, out observableItem) )
{
//do nothing, the observable item is already updated with the requried reference
}
else
{
throw new CacheHandlerException("Could not add to cache AND could not retrieve from cache either. Something's wrong", input);
}
}
//observe the return
if ( observableItem != null )
mesg = observableItem.FirstOrDefault();
if (mesg == null)
throw new CacheHandlerException("Not return value found. this should not happen", input);
return mesg;
}
/// <summary>
/// Should always be the first to execute on the boundary
/// </summary>
public int Order
{
get { return 1; }
set { ; }
}
}
I would lean towards an
async
solution, specifically one usingAsyncLazy<T>
(from my blog):This is a very simplistic "cache", since it has no expiration. It can be used as such:
In multithreading situations,
GetOrAdd
may create an extraAsyncLazy<TValue>
, but it will never beawait
ed, solookupAsync
will only be called once perTKey
. Also note thatlookupAsync
is always called from the thread pool.P.S. If you do go
async
, you may find myasync
WCF post helpful.Yes, Rx is an excellent fit for this.
I suggest you look at implementing the following dictionary to back your Key Cache:
Your fetch data asynchronously part just needs to subscribe to the subject to populate it with the results.
https://github.com/reactiveui/ReactiveUI/blob/master/ReactiveUI/ObservableAsyncMRUCache.cs already does basically exactly what you want, specifically wrt the two requests for the same content being "debounced". From the comments: