Processing intermittent IO jobs with TPL and/or Ba

2019-09-02 03:10发布

问题:

I have a C# application that interacts with a few serial and USB devices. I am trying to find a nice way to command these devices in a parallel fashion so that many can be controlled at once. I will initiate commands to the device via user input and also scripted commands.

I am currently trying to figure out a clean way to "collect" and run commands in parallel to my UI thread. I have multiple forms, one for each device, each have a few controls that can issue commands.

The way I currently see it a user will click a button which will fire an event in the form. Another class lets call it CommandManager will hook into all of these events; each event passes the necessary information to form a command to send to a device.

When the event is handled by the CommandManager it forms the command and adds it to a BlockingCollection<Command> in a subclassed BackgroundWorker named DeviceCommandWorker which was started when the application opened. All it does is loop over a block of code containing a Task.Factory.StartNew() call.

In the StartNew block Take is called on the BlockingCollection waiting for a command to be input. Once a command is in the collection the Take returns and the Task goes off on its merry way. The BackgroundWorker loops back around and repeats the process until it is cancelled.

// Event handler running on the UI Thread
public void ProcessCommand(DeviceCommand command)
{
    // I assume I cannot add to Commands (BlockingCollection) from here?
    DeviceCommandWorker.Instance.Commands.Add(command);
}

// ....
// BackgroundWorker started upon Application startup
// ...

public class DeviceCommandWorker : BackgroundWorker
{
    public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
    private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());

    public BlockingCollection<DeviceCommand> Commands { get; set; } 

    private DeviceCommandWorker()
    {
        WorkerSupportsCancellation = true;
        Commands = new BlockingCollection<DeviceCommand>();
    }

    protected override void OnDoWork(DoWorkEventArgs e)
    {
        while (!CancellationPending)
        {
            var commandTask = Task.Factory.StartNew(() =>
            {
                // If the BackGroundWorker is cancelled, how to esacpe this blocking call?
                DeviceCommand command = commandQueue.Take(); 

                DeviceCommandResult result;
                command.Process(out result);

                if(result == DeviceCommandResult.Error)
                    ; // How would I inform the UI of this result?
            });
        }
        e.Cancel = true;
    }
}

My questions are stated in the code above but I will reiterate them.

My first issue is that I do not think I can add to the BlockingCollection from outside the running BackGroundWorker? (From reading about the TPL, shouldn't there be a sync object I can lock on when adding?)

Assuming that I can add to the collection, is there no way to escape the Take method when it is blocking, specifically if the BackgroundWorker was cancelled wouldn't it just be blocked forever? (Possibly before cancellation I could send a "custom command" that simply creates a task that does nothing, I would then be able to exit the while loop)

Finally, how would I report either success or errors in command execution back to the UI thread? I have seen this SO answer, is that the right direction to head in?

回答1:

So after some more tinkering it looks like I got the idea above to work pretty well for me initially. I am not sure how it will perform when it comes time to do some real work but I am happy so far.

So I reworked the code. I found that without the Take outside of the Task.Factory.StartNew() I was actually just making tasks as fast as possible each waiting to consume from the BlockingCollection. So I moved the statement outside of the loop. I also confirmed that to break from the blocking Take I will need to send some sort of special command so that I can stop the background worker. Finally (not shown) I plan to use Control.Invoke to return any failures to the UI thread.

public Boolean StartCommandWorker()
{
    if (DeviceCommandWorker.Instance.IsBusy)
        return false;
    else
    {
        Console.Out.WriteLine("Called start command worker!");
        DeviceCommandWorker.Instance.RunWorkerAsync();
        return DeviceCommandWorker.Instance.IsBusy;
    }
}

public void StopCommandWorker()
{
    Console.Out.WriteLine("Called stop command worker!");
    ProcessCommand("QUIT");
    DeviceCommandWorker.Instance.CancelAsync();
}

public Boolean ProcessCommand(String command)
{
    DeviceCommandWorker.Instance.Commands.Add(command);
    Console.Out.WriteLine("Enqueued command \"" + command + "\" ThreadID: " + Thread.CurrentThread.ManagedThreadId);

    return true;
}

internal class DeviceCommandWorker : BackgroundWorker
{
    public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
    private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());

    public BlockingCollection<String> Commands { get; set; }

    private DeviceCommandWorker()
    {
        WorkerSupportsCancellation = true;
        Commands = new BlockingCollection<String>();
    }

    protected override void OnDoWork(DoWorkEventArgs e)
    {
        Console.Out.WriteLine("Background Worker Started ThreadID: " + Thread.CurrentThread.ManagedThreadId);

        while (!CancellationPending)
        {
            Console.Out.WriteLine("Waiting for command on ThreadID: " + Thread.CurrentThread.ManagedThreadId);
            String aString = Commands.Take();

            var commandTask = Task.Factory.StartNew(() =>
            {
                Console.Out.WriteLine("   Dequeued command \"" + aString + "\" ThreadID: " + Thread.CurrentThread.ManagedThreadId);
                if (aString.Equals("QUIT"))
                    Console.Out.WriteLine("   Quit worker called: " + aString);
            });
        }

        Console.Out.WriteLine("Background Worker: Stopped!");
        e.Cancel = true;
    }
}

Here is some sample output. I made a small UI form that I could start, stop, and send commands with. It simply sends a random double as a command.

Called start command worker!
Background Worker Started ThreadID: 17
Waiting for command on ThreadID: 17
Enqueued command "Hello" ThreadID: 10
Waiting for command on ThreadID: 17
   Dequeued command "Hello" ThreadID: 16
Enqueued command "0.0745" ThreadID: 10
Waiting for command on ThreadID: 17
   Dequeued command "0.0745" ThreadID: 12
Enqueued command "0.7043" ThreadID: 10
   Dequeued command "0.7043" ThreadID: 16
Waiting for command on ThreadID: 17
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
   Dequeued command "QUIT" ThreadID: 12
   Quit worker called: QUIT
Background Worker: Stopped!
Enqueued command "0.2646" ThreadID: 10
Enqueued command "0.1619" ThreadID: 10
Enqueued command "0.5570" ThreadID: 10
Enqueued command "0.4129" ThreadID: 10
Called start command worker!
Background Worker Started ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
   Dequeued command "0.2646" ThreadID: 16
   Dequeued command "0.1619" ThreadID: 16
   Dequeued command "0.5570" ThreadID: 16
   Dequeued command "0.4129" ThreadID: 16
Enqueued command "0.8368" ThreadID: 10
   Dequeued command "0.8368" ThreadID: 17
Waiting for command on ThreadID: 12
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
   Dequeued command "QUIT" ThreadID: 16
   Quit worker called: QUIT
Background Worker: Stopped!