Using the Task Parallel Library on an event-based

2020-01-29 07:00发布

问题:

I'm writing a networked application.

Messages are sent over the transport as such:

Network.SendMessage (new FirstMessage() );

I can register an event handler to be called when this message type arrives, like so:

Network.RegisterMessageHandler<FirstMessage> (OnFirstMessageReceived);

And the event gets fired:

public void OnFirstMessageReceived(EventArgs<FirstMessageEventArgs> e)
{
}

I'm writing a custom authentication procedure for my networked application, which requires around five messages to complete.

Without using the Task Parallel Library, I would be forced to code the next step of each procedure in the preceding event handler, like so:

public void OnFirstMessageReceived(EventArgs<FirstMessageEventArgs> e)
{
     Network.SendMessage( new SecondMessage() );
}

public void OnSecondMessageReceived(EventArgs<SecondMessageEventArgs> e)
{
     Network.SendMessage( new ThirdMessage() );
}

public void OnThirdMessageReceived(EventArgs<ThirdMessageEventArgs> e)
{
     Network.SendMessage( new FourthMessage() );
}

public void OnFourthMessageReceived(EventArgs<FourthMessageEventArgs> e)
{
     // Authentication is complete
}

I don't like the idea of jumping around the source code to code a portion of this and a portion of that. It's hard to understand and edit.

I hear the Task Parallel Library substantially simplifies this solution.

However, many of the examples I read using the Task Parallel Library were related to starting a chain of active tasks. What I mean by 'active', is that each task could start when called explicitly, like so:

public void Drink() {}
public void Eat()   {}
public void Sleep() {}

Task.Factory.StartNew(     () => Drink() )
            .ContinueWith( () => Eat()   )
            .ContinueWith( () => Sleep() );

This is opposite from my event-based async pattern, in which each event handler method is called only when the message is received.

In other words, I can't do something like this (but I want to):

Task.Factory.StartNew(     () => OnFirstMessageReceived()  )
            .ContinueWith( () => OnSecondMessageReceived() )
            .ContinueWith( () => OnThirdMessageReceived()  )
            .ContinueWith( () => OnFourthMessageReceived() );

I've read this article, but I don't quite understand it. It seems like what I need has to do with TaskCompletionSource. If I wanted to make a task from my event-based async pattern like the code block above, what would it look like?

回答1:

You're right about TaskCompletionSource, it's the key to transforming EAP (event-based asynchronous pattern) to TPL's Task.

This is documented here: https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/tpl-and-traditional-async-programming#exposing-complex-eap-operations-as-tasks

Here is the simplified code:

public static class Extensions  
{
    public static Task<XDocument> GetRssDownloadTask(
        this WebClient client, Uri rssFeedUri)
    {
        // task completion source is an object, which has some state.
        // it gives out the task, which completes, when state turns "completed"
        // or else it could be canceled or throw an exception
        var tcs = new TaskCompletionSource<XDocument>(); 

        // now we subscribe to completed event. depending on event result
        // we set TaskCompletionSource state completed, canceled, or error
        client.DownloadStringCompleted += (sender, e) => 
        {
                  if(e.Cancelled) 
                  {
                      tcs.SetCanceled();
                  }
                  else if(null != e.Error)
                  {
                      tcs.SetException(e.Error);
                  }
                  else
                  { 
                      tcs.SetResult(XDocument.Parse(e.Result));
                  }
        };

        // now we start asyncronous operation
        client.DownloadStringAsync(rssFeedUri);
        // and return the underlying task immediately
        return tcs.Task;
    }
}

Now, all you need to do, to make a chain of those operations, is just to set your continuations (which is not very comfortable at the moment, and the C# 5 await and async will help alot with it)

So, this code could be used like this:

public static void Main()
{
    var client = new WebClient();

    client.GetRssDownloadTask(
        new Uri("http://blogs.msdn.com/b/ericlippert/rss.aspx"))
        .ContinueWith( t => {
            ShowXmlInMyUI(t.Result); // show first result somewhere
            // start a new task here if you want a chain sequence
        });

    // or start it here if you want to get some rss feeds simultaneously

    // if we had await now, we would add 
    // async keyword to Main method defenition and then

    XDocument feedEric = await client.GetRssDownloadTask(
        new Uri("http://blogs.msdn.com/b/ericlippert/rss.aspx"));
    XDocument feedJon = await client.GetRssDownloadTask(
        new Uri("http://feeds.feedburner.com/JonSkeetCodingBlog?format=xml"));
    // it's chaining - one task starts executing after 
    // another, but it is still asynchronous
}


回答2:

Jeremy Likness has a blog entry title Coroutines for Asynchronous Sequential Workflows using Reactive Extensions (Rx) that might interest you. Here is the question he tries to answer:

The concept is straightforward: there are often times we want an asynchronous set of operations to perform sequentially. Perhaps you must load a list from a service, then load the selected item, then trigger an animation. This can be done either by chaining the completed events or nesting lambda expressions, but is there a cleaner way?