How to do Async in Azure WebJob function

2019-05-01 10:44发布

问题:

I have an async method that gets api data from a server. When I run this code on my local machine, in a console app, it performs at high speed, pushing through a few hundred http calls in the async function per minute. When I put the same code to be triggered from an Azure WebJob queue message however, it seems to operate synchronously and my numbers crawl - I'm sure I am missing something simple in my approach - any assistance appreciated.

(1) .. WebJob function that listens for a message on queue and kicks off the api get process on message received:

public class Functions
    {
        // This function will get triggered/executed when a new message is written 
        // on an Azure Queue called queue.

        public static async Task ProcessQueueMessage ([QueueTrigger("myqueue")] string message, TextWriter log)
        {
            var getAPIData = new GetData();
            getAPIData.DoIt(message).Wait();
            log.WriteLine("*** done: " + message);
        }
    }

(2) the class that outside azure works in async mode at speed...

 class GetData
    {
        // wrapper that is called by the message function trigger
        public async Task DoIt(string MessageFile)
        {
            await CallAPI(MessageFile);
        }

        public async Task<string> CallAPI(string MessageFile)
        {
            /// create a list of sample APIs to call...
            var apiCallList = new List<string>();
            apiCallList.Add("localhost/?q=1");
            apiCallList.Add("localhost/?q=2");
            apiCallList.Add("localhost/?q=3");
            apiCallList.Add("localhost/?q=4");
            apiCallList.Add("localhost/?q=5");

            // setup httpclient
            HttpClient client =
                new HttpClient() { MaxResponseContentBufferSize = 10000000 };
            var timeout = new TimeSpan(0, 5, 0); // 5 min timeout
            client.Timeout = timeout;

            // create a list of http api get Task...
            IEnumerable<Task<string>> allResults = apiCallList.Select(str => ProcessURLPageAsync(str, client));
            // wait for them all to complete, then move on...
            await Task.WhenAll(allResults);

            return allResults.ToString();
        }

        async Task<string> ProcessURLPageAsync(string APIAddressString, HttpClient client)
        {
            string page = "";
            HttpResponseMessage resX;

            try
            {
                // set the address to call
                Uri URL = new Uri(APIAddressString);
                // execute the call
                resX = await client.GetAsync(URL);
                page = await resX.Content.ReadAsStringAsync();
                string rslt = page;
                // do something with the api response data
            }
            catch (Exception ex)
            {
                // log error
            }
            return page;
        }

    }

回答1:

First because your triggered function is async, you should use await rather than .Wait(). Wait will block the current thread.

public static async Task ProcessQueueMessage([QueueTrigger("myqueue")] string message, TextWriter log)
{
    var getAPIData = new GetData();
    await getAPIData.DoIt(message);
    log.WriteLine("*** done: " + message);
}

Anyway you'll be able to find usefull information from the documentation

Parallel execution

If you have multiple functions listening on different queues, the SDK will call them in parallel when messages are received simultaneously.

The same is true when multiple messages are received for a single queue. By default, the SDK gets a batch of 16 queue messages at a time and executes the function that processes them in parallel. The batch size is configurable. When the number being processed gets down to half of the batch size, the SDK gets another batch and starts processing those messages. Therefore the maximum number of concurrent messages being processed per function is one and a half times the batch size. This limit applies separately to each function that has a QueueTrigger attribute.

Here is a sample code to configure the batch size:

var config = new JobHostConfiguration();
config.Queues.BatchSize = 50;
var host = new JobHost(config);
host.RunAndBlock();

However, it is not always a good option to have too many threads running at the same time and could lead to bad performance.

Another option is to scale out your webjob:

Multiple instances

if your web app runs on multiple instances, a continuous WebJob runs on each machine, and each machine will wait for triggers and attempt to run functions. The WebJobs SDK queue trigger automatically prevents a function from processing a queue message multiple times; functions do not have to be written to be idempotent. However, if you want to ensure that only one instance of a function runs even when there are multiple instances of the host web app, you can use the Singleton attribute.



回答2:

Have a read of this Webjobs SDK documentation - the behaviour you should expect is that your process will run and process one message at a time, but will scale up if more instances are created (of your app service). If you had multiple queues, they will trigger in parallel.

In order to improve the performance, see the configurations settings section in the link I sent you, which refers to the number of messages that can be triggered in a batch.

If you want to process multiple messages in parallel though, and don't want to rely on instance scaling, then you need to use threading instead (async isn't about multi-threaded parallelism, but making more efficient use of the thread you're using). So your queue trigger function should read the message from the queue, the create a thread and "fire and forget" that thread, and then return from the trigger function. This will mark the message as processed, and allow the next message on the queue to be processed, even though in theory you're still processing the earlier one. Note you will need to include your own logic for error handling and ensuring that the data wont get lost if your thread throws an exception or can't process the message (eg. put it on a poison queue).

The other option is to not use the [queuetrigger] attribute, and use the Azure storage queues sdk API functions directly to connect and process the messages per your requirements.