Synchronization across threads / atomic checks?

2019-07-31 15:16发布

问题:

I need to create an method invoker that any thread (Thread B for example sake) can call, which will execute on the main executing thread (Thead A) at a specific given point in its execution.

Example usage would be as follows:

static Invoker Invoker = new Invoker();

static void ThreadA()
{
    new Thread(ThreadB).Start();

    Thread.Sleep(...); // Hypothetic Alpha

    Invoker.Invoke(delegate { Console.WriteLine("Action"); }, true);

    Console.WriteLine("Done");

    Console.ReadLine();
}

static void ThreadB()
{
    Thread.Sleep(...); // Hypothetic Beta

    Invoker.Execute();
}

The Invoker class looks like this:

public class Invoker
{
    private Queue<Action> Actions { get; set; }

    public Invoker()
    {
        this.Actions = new Queue<Action>();
    }

    public void Execute()
    {
        while (this.Actions.Count > 0)
        {
            this.Actions.Dequeue()();
        }
    }

    public void Invoke(Action action, bool block = true)
    {
        ManualResetEvent done = new ManualResetEvent(!block);

        this.Actions.Enqueue(delegate
        {
            action();
            if (block) done.Set();
        });

        if (block)
        {
            done.WaitOne();
        }
    }
}

This works fine in most cases, although it won't if, for any reason, the execution (and therefore the Set) is done before the WaitOne, in which case it will just freeze (it allows for the thread to proceed, then blocks). That could be reproduced if Alpha >> Beta.

I can use booleans and whatnot, but I'm never getting a real atomic safety here. I tried some fixes, but they wouldn't work in the case where Beta >> Alpha.

I also thought of locking around both the Invoker.Execute and Invoker.Invoke methods so that we are guaranteed that the execution does not occur between enqueing and waiting. However, the problem is that the lock also englobes the WaitOne, and therefore never finishes (deadlock).

How should I go about getting absolute atomic safety in this paradigm?

Note: It really is a requirement that I work with this design, from external dependencies. So changing design is not a real option.

EDIT: I did forget to mention that I want a blocking behaviour (based on bool block) until the delegate is executed on the Invoke call.

回答1:

Use a Semaphore(Slim) instead of the ManualResetEvent.

Create a semaphore with an maximum count of 1, call WaitOne() in the calling thread, and call Release() in the delegate.

If you've already called Release(), WaitOne() should return immediately.

Make sure to Dispose() it when you're done, preferably in a using block.
If block is false, you shouldn't create it in the first place (although for SemaphoreSlim, that's not so bad).



回答2:

You can use my technique:

public void BlockingInvoke(Action action)
{
    volatile bool isCompleted = false;
    volatile bool isWaiting = false;
    ManualResetEventSlim waiter = new ManualResetEventSlim();

    this.Actions.Enqueue(delegate
    {
        action();

        isCompleted = true;
        Thread.MemoryBarrier();
        if (!isWaiting) 
            waiter.Dispose();
        else
            waiter.Set();
    });

    isWaiting = true;
    Thread.MemoryBarrier();
    if (!isCompleted)
        waiter.Wait();
    waiter.Dispose();
}

Untested



回答3:

I'm answering only to show the implementation SLaks described and my solution to ensure proper and unique disposal with locks. It's open to improvement and criticism, but it actually works.

public class Invoker
{
    private Queue<Action> Actions { get; set; }

    public Invoker()
    {
        this.Actions = new Queue<Action>();
    }

    public void Execute()
    {
        while (this.Actions.Count > 0)
        {
            this.Actions.Dequeue()();
        }
    }

    public void Invoke(Action action, bool block = true)
    {
        if (block)
        {
            SemaphoreSlim semaphore = new SemaphoreSlim(1);
            bool disposed = false;

            this.Actions.Enqueue(delegate
            {
                action();
                semaphore.Release();

                lock (semaphore)
                {
                    semaphore.Dispose();
                    disposed = true;
                }
            });

            lock (semaphore)
            {
                if (!disposed)
                {
                    semaphore.Wait();
                    semaphore.Dispose();
                }
            }
        }
        else
        {
            this.Actions.Enqueue(action);
        }
    }
}