I have an existing Function App with 2 Functions and a storage queue. F1 is triggered by a message in a service bus topic. For each msg received, F1 calculates a some sub-tasks (T1,T2,...) which have to be executed with varying amount of delay. Ex - T1 to be fired after 3 mins, T2 after 5min etc. F1 posts messages to a storage queue with appropriate visibility timeouts (to simulate the delay) and F2 is triggered whenever a message is visible in the queue. All works fine.
I now want to migrate this app to use 'Durable Functions'. F1 now only starts the orchestrator. The orchestrator code is something as follows -
public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
var pnTask = context.CallActivityAsync("PerformSubTask", value);
tasks.Add(pnTask);
}
//dont't await as we want to fire and forget. No fan-in!
//await Task.WhenAll(tasks);
}
[FunctionName("PerformSubTask")]
public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
{
TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
//will still keep the activity function running and incur costs??
await Task.Delay(actualDelay);
//perform subtask work after delay!
}
I would only like to fan-out (no fan-in to collect the results) and start the subtasks. The orchestrator starts all the tasks and avoids call 'await Task.WhenAll'. The activity function calls 'Task.Delay' to wait for the specified amount of time and then does its work.
My questions
- Does it make sense to use Durable Functions for this workflow?
- Is this the right approach to orchestrate 'Fan-out' workflow?
- I do not like the fact that the activity function is running for specified amount of time (3 or 5 mins) doing nothing. It will incurs costs,or?
- Also if a delay of more than 10 minutes is required there is no way for an activity function to succeed with this approach!
My earlier attempt to avoid this was to use 'CreateTimer' in the orchestrator and then add the activity as a continuation, but I see only timer entries in the 'History' table. The continuation does not fire! Am I violating the constraint for orchestrator code - 'Orchestrator code must never initiate any async operation' ?
foreach (var value in results) { //calculate time to start var timeToStart = ; var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value)); tasks.Add(pnTask); }
UPDATE : using approach suggested by Chris
Activity that calculates subtasks and delays
[FunctionName("CalculateTasks")] public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log) { //in reality time is obtained by calling an endpoint DateTime currentTime = DateTime.UtcNow; return new List<TaskInfo> { new TaskInfo{ DelayInSeconds = 10, Origin = currentTime }, new TaskInfo{ DelayInSeconds = 20, Origin = currentTime }, new TaskInfo{ DelayInSeconds = 30, Origin = currentTime }, }; } public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log) { var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput"); var currentTime = context.CurrentUtcDateTime; List<Task> tasks = new List<Task>(); foreach (var value in results) { TimeSpan timeDifference = currentTime - value.Origin; TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds); var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference; var timeToStart = currentTime.Add(actualDelay); Task delayedActivityCall = context .CreateTimer(timeToStart, CancellationToken.None) .ContinueWith(t => context.CallActivityAsync("PerformSubtask", value)); tasks.Add(delayedActivityCall); } await Task.WhenAll(tasks); }
Simply scheduling tasks from within the orchestrator seems to work.In my case I am calculating the tasks and the delays in another activity (CalculateTasks) before the loop. I want the delays to be calculated using the 'current time' when the activity was run. I am using DateTime.UtcNow in the activity. This somehow does not play well when used in the orchestrator. The activities specified by 'ContinueWith' just don't run and the orchestrator is always in 'Running' state.
Can I not use the time recorded by an activity from within the orchestrator?
UPDATE 2
So the workaround suggested by Chris works!
Since I do not want to collect the results of the activities I avoid calling 'await Tasks.WhenAll(tasks)
' after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.
Am I right? Is there any way to free the orchestrator earlier i.e right after scheduling all activities?