Following is Async Cache and Database update using Transaction Scope. I cannot use TransactionScopeAsyncFlowOption.Enabled
introduced in the v 4.5.1, since the Apache Ignite.Net Cache I am using doesn't support it. I have tried finding a workaround by capturing the current Synchronization Context
and then explicitly using Synchronization Context Send
method to complete the transaction, but this doesn't work as I still get an error Transaction scope must be disposed on same thread it was created
Any suggestion how to go about achieving the Async Update
. one of the suggestion by Apache Ignite support is to use something like:
Task.WhenAll(cacheUpdate, databaseUpdate).Wait()
, but that would make Async code Sync, therefore not one of the best option
public async Task Update()
{
// Capture Current Synchronization Context
var sc = SynchronizationContext.Current;
TransactionOptions tranOptions = new TransactionOptions();
tranOptions.IsolationLevel = System.Transactions.IsolationLevel.RepeatableRead;
using (var ts = new TransactionScope())
{
// Do Cache Update Operation as Async
Task cacheUpdate = // Update Cache Async
// Do Database Update Operation as Async
Task databaseUpdate = // Update Database Async
await Task.WhenAll(cacheUpdate, databaseUpdate);
sc.Send(new SendOrPostCallback(
o =>
{
ts.Complete();
}), sc);
}
}
After fair amount of search across blogs and article, I have found the following blog by Stephen Toub, helps in achieving the Continuation of Async method on exactly same thread, thus avoiding the Transaction Scope issue. Now I don't need TransactionScopeAsyncFlowOption.Enabled
to get the Async methods run in the TransactionScope
https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/
void Main()
{
// Modified Async Scheduler for Continuations to work on Exactly same thread
// Required in the case same Thread is required for Task Continuation post await
Run(async () => await DemoAsync());
"Main Complete".Dump();
}
static async Task DemoAsync()
{
// Transcation Scope test (shall dispose
using (var ts = new TransactionScope())
{
await Cache + Database Async update
ts.Complete();
"Transaction Scope Complete".Dump();
}
}
// Run Method to utilize the Single Thread Synchronization context, thus ensuring we can
// Control the threads / Synchronization context post await, cotinuation run of specific set of threads
public static void Run(Func<Task> func)
{
// Fetch Current Synchronization context
var prevCtx = SynchronizationContext.Current;
try
{
// Create SingleThreadSynchronizationContext
var syncCtx = new SingleThreadSynchronizationContext();
// Set SingleThreadSynchronizationContext
SynchronizationContext.SetSynchronizationContext(syncCtx);
// Execute Func<Task> to fetch the task to be executed
var t = func();
// On Continuation complete the SingleThreadSynchronizationContext
t.ContinueWith(
delegate { syncCtx.Complete(); }, TaskScheduler.Default);
// Ensure that SingleThreadSynchronizationContext run on a single thread
// Execute a Task and its continuation on same thread
syncCtx.RunOnCurrentThread();
// Fetch Result if any
t.GetAwaiter().GetResult();
}
// Reset the Previous Synchronization Context
finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}
// Overriden Synchronization context, using Blocking Collection Consumer / Producer model
// Ensure that same Synchronization context / Thread / set of threads are maintained
// In this case we main a single thread for continuation post await
private sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
// BlockingCollection Consumer Producer Model
private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>>
m_queue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();
// Override Post, which is called during Async continuation
// Send is for Synchronous continuation
public override void Post(SendOrPostCallback d, object state)
{
m_queue.Add(
new KeyValuePair<SendOrPostCallback, object>(d, state));
}
// RunOnCurrentThread, does the job if fetching object from BlockingCollection and execute it
public void RunOnCurrentThread()
{
KeyValuePair<SendOrPostCallback, object> workItem;
while (m_queue.TryTake(out workItem, Timeout.Infinite))
workItem.Key(workItem.Value);
}
// Compete the SynchronizationContext
public void Complete() { m_queue.CompleteAdding(); }
}