Translating async-await C# code to F# with respect

2019-02-04 11:45发布

I wonder if this is too a broad question, but recently I made myself to come across a piece of code I'd like to be certain on how to translate from C# into proper F#. The journey starts from here (1) (the original problem with TPL-F# interaction), and continues here (2) (some example code I'm contemplating to translate into F#).

The example code is too long to reproduce here, but the interesting functions are ActivateAsync, RefreshHubs and AddHub. Particularly the interesting points are

  1. AddHub has a signature of private async Task AddHub(string address).
  2. RefreshHubs calls AddHub in a loop and collects a list of tasks, which it then awaits in the very end by await Task.WhenAll(tasks) and consequently the return value matches its signature of private async Task RefreshHubs(object _).
  3. RefreshHubs is called by ActivateAsync just as await RefreshHubs(null) and then in the end there's a call await base.ActivateAsync() matching the function signature public override async Task ActivateAsync().

Question:

What would be the correct translation of such function signatures to F# that still maintains the interface and functionality and respects the default, custom scheduler? And I'm not otherwise too sure of this "async/await in F#" either. As in how to do it "mechanically". :)

The reason is that in the link "here (1)" there seem to be problem (I haven't verified this) in that F# async operations do not respect a custom, cooperative scheduler set by the (Orleans) runtime. Also, it's stated here that TPL operations escape the scheduler and go to the task pool and their use is therefore prohibited.

One way I can think of dealing with this is with a F# function as follows

//Sorry for the inconvenience of shorterned code, for context see the link "here (1)"...
override this.ActivateAsync() =
    this.RegisterTimer(new Func<obj, Task>(this.FlushQueue), null, TimeSpan.FromMilliseconds(100.0), TimeSpan.FromMilliseconds(100.0)) |> ignore

    if RoleEnvironment.IsAvailable then
        this.RefreshHubs(null) |> Async.awaitPlainTask |> Async.RunSynchronously
    else
        this.AddHub("http://localhost:48777/") |> Async.awaitPlainTask |> Async.RunSynchronously

    //Return value comes from here.
    base.ActivateAsync()

member private this.RefreshHubs(_) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //The return value is Task.
    //In the C# version the AddHub provided tasks are collected and then the
    //on the last line there is return await Task.WhenAll(newHubAdditionTasks) 
    newHubs |> Array.map(fun i -> this.AddHub(i)) |> Task.WhenAll

member private this.AddHub(address) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //In the C# version:
    //...
    //hubs.Add(address, new Tuple<HubConnection, IHubProxy>(hubConnection, hub))
    //} 
    //so this is "void" and could perhaps be Async<void> in F#... 
    //The return value is Task.
    hubConnection.Start() |> Async.awaitTaskVoid |> Async.RunSynchronously
    TaskDone.Done

The startAsPlainTask function is by Sacha Barber from here. Another interesting option could be here as

module Async =
    let AwaitTaskVoid : (Task -> Async<unit>) =
        Async.AwaitIAsyncResult >> Async.Ignore

<edit: I just noticed the Task.WhenAll would need to be awaited too. But what would be the proper way? Uh, time to sleep (a bad pun)...

<edit 2: At here (1) (the original problem with TPL-F# interaction) in Codeplex it was mentioned that F# uses synchronization contexts to push work to threads, whereas TPL does not. Now, this is a plausible explanation, I feel (although I'd still have problems in translating these snippets properly regardless of the custom scheduler). Some interesting additional information could be to had from

I think I need to mention Hopac in this context, as an interesting tangential and also mention I'm out of reach for the next 50 odd hours or so in case all my cross-postings go out of hand.

<edit 3: Daniel and svick give good advice in the comments to use a custom task builder. Daniel provides a link to a one that's already defined in FSharpx.

Looking at the the source I see the interface with the parameters are defined as

type TaskBuilder(?continuationOptions, ?scheduler, ?cancellationToken) =
    let contOptions = defaultArg continuationOptions TaskContinuationOptions.None
    let scheduler = defaultArg scheduler TaskScheduler.Default
    let cancellationToken = defaultArg cancellationToken CancellationToken.None

If one were to use this in Orleans, it looks like the TaskScheduler ought to be TaskScheduler.Current as per documentation here

Orleans has it's own task scheduler which provides the single threaded execution model used within grains. It's important that when running tasks the Orleans scheduler is used, and not the .NET thread pool.

Should your grain code require a subtask to be created, you should use Task.Factory.StartNew:

await Task.Factory.StartNew(() =>{ /* logic */ });

This technique will use the current task scheduler, which will be the Orleans scheduler.

You should avoid using Task.Run, which always uses the .NET thread pool, and therefore will not run in the single-threaded execution model.

It looks there's a subtle difference between TaskScheduler.Current and TaskScheduler.Default. Maybe this warrants a question on in which example cases there'll be an undesired difference. As the Orleans documentation points out not to use Task.Run and instead guides to Task.Factory.StartNew, I wonder if one ought to define TaskCreationOptions.DenyAttachChild as is recommended by such authorities as Stephen Toub at Task.Run vs Task.Factory.StartNew and Stephen Cleary at StartNew is Dangerous. Hmm, it looks like the .Default will be .DenyAttachChilld unless I'm mistaken.

Moreover, as there is a problem with Task.Run viz Task.Factory.CreateNew regarding the custom scheduler, I wonder if this particular problem could be removed by using a custom TaskFactory as explained in Task Scheduler (Task.Factory) and controlling the number of threads and How to: Create a Task Scheduler That Limits Concurrency.

Hmm, this is becoming quite a long "pondering" already. I wonder how should I close this? Maybe if svick and Daniel could make their comments as answers and I'd upvote both and accept svick's?

1条回答
爷、活的狠高调
2楼-- · 2019-02-04 12:17

You can use use TaskBuilder in FSharpx and pass in TaskScheduler.Current. Here's my attempt at translating RefreshHubs. Note that Task<unit> is used in lieu of Task.

let RefreshHubs _ =
    let task = TaskBuilder(scheduler = TaskScheduler.Current)
    task {
        let addresses = 
            RoleEnvironment.Roles.["GPSTracker.Web"].Instances
            |> Seq.map (fun instance ->
                let endpoint = instance.InstanceEndpoints.["InternalSignalR"]
                sprintf "http://%O" endpoint.IPEndpoint
            )
            |> Seq.toList

        let newHubs = addresses |> List.filter (not << hubs.ContainsKey)
        let deadHubs = hubs.Keys |> Seq.filter (fun x -> 
            not (List.exists ((=) x) addresses))

        // remove dead hubs
        deadHubs |> Seq.iter (hubs.Remove >> ignore)

        // add new hubs
        let! _ = Task.WhenAll [| for hub in newHubs -> AddHub hub |]
        return ()
    }
查看更多
登录 后发表回答