Enable Async TransactionScope without TransactionS

2019-07-13 04:29发布

Following is Async Cache and Database update using Transaction Scope. I cannot use TransactionScopeAsyncFlowOption.Enabled introduced in the v 4.5.1, since the Apache Ignite.Net Cache I am using doesn't support it. I have tried finding a workaround by capturing the current Synchronization Context and then explicitly using Synchronization Context Send method to complete the transaction, but this doesn't work as I still get an error Transaction scope must be disposed on same thread it was created

Any suggestion how to go about achieving the Async Update. one of the suggestion by Apache Ignite support is to use something like:

Task.WhenAll(cacheUpdate, databaseUpdate).Wait(), but that would make Async code Sync, therefore not one of the best option

public async Task Update()
{
    // Capture Current Synchronization Context
    var sc = SynchronizationContext.Current;

    TransactionOptions tranOptions = new TransactionOptions();
    tranOptions.IsolationLevel = System.Transactions.IsolationLevel.RepeatableRead;


    using (var ts = new TransactionScope())
    {
        // Do Cache Update Operation as Async
        Task cacheUpdate = // Update Cache Async

        // Do Database Update Operation as Async
        Task databaseUpdate = // Update Database Async

        await Task.WhenAll(cacheUpdate, databaseUpdate);

                sc.Send(new SendOrPostCallback(
                o =>
                {
                    ts.Complete();
                }), sc);        
    }
}

1条回答
爷的心禁止访问
2楼-- · 2019-07-13 04:42

After fair amount of search across blogs and article, I have found the following blog by Stephen Toub, helps in achieving the Continuation of Async method on exactly same thread, thus avoiding the Transaction Scope issue. Now I don't need TransactionScopeAsyncFlowOption.Enabled to get the Async methods run in the TransactionScope

https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/

void Main()
{
    // Modified Async Scheduler for Continuations to work on Exactly same thread
    // Required in the case same Thread is required for Task Continuation post await
    Run(async () => await DemoAsync());

    "Main Complete".Dump();
}

static async Task DemoAsync()
{
    // Transcation Scope test (shall dispose 
    using (var ts = new TransactionScope())
    {            
        await Cache + Database Async update
        ts.Complete();
        "Transaction Scope Complete".Dump();
    }   
}

// Run Method to utilize the Single Thread Synchronization context, thus ensuring we can
// Control the threads / Synchronization context post await, cotinuation run of specific set of threads

public static void Run(Func<Task> func)
{
    // Fetch Current Synchronization context
    var prevCtx = SynchronizationContext.Current;

    try
    {
        // Create SingleThreadSynchronizationContext
        var syncCtx = new SingleThreadSynchronizationContext();

        // Set SingleThreadSynchronizationContext
        SynchronizationContext.SetSynchronizationContext(syncCtx);

        // Execute Func<Task> to fetch the task to be executed
        var t = func();

        // On Continuation complete the SingleThreadSynchronizationContext
        t.ContinueWith(
            delegate { syncCtx.Complete(); }, TaskScheduler.Default);

        // Ensure that SingleThreadSynchronizationContext run on a single thread
        // Execute a Task and its continuation on same thread
        syncCtx.RunOnCurrentThread();

        // Fetch Result if any
        t.GetAwaiter().GetResult();
    }
    // Reset the Previous Synchronization Context
    finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}

// Overriden Synchronization context, using Blocking Collection Consumer / Producer model
// Ensure that same Synchronization context / Thread / set of threads are maintained
// In this case we main a single thread for continuation post await

private sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    // BlockingCollection Consumer Producer Model
    private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>>
      m_queue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();

    // Override Post, which is called during Async continuation
    // Send is for Synchronous continuation
    public override void Post(SendOrPostCallback d, object state)
    {
        m_queue.Add(
            new KeyValuePair<SendOrPostCallback, object>(d, state));
    }

    // RunOnCurrentThread, does the job if fetching object from BlockingCollection and execute it
    public void RunOnCurrentThread()
    {
        KeyValuePair<SendOrPostCallback, object> workItem;
        while (m_queue.TryTake(out workItem, Timeout.Infinite))
            workItem.Key(workItem.Value);
    }

    // Compete the SynchronizationContext
    public void Complete() { m_queue.CompleteAdding(); }
}
查看更多
登录 后发表回答