How does F#'s async really work?

2019-03-08 14:00发布

I am trying to learn how async and let! work in F#. All the docs i've read seem confusing. What's the point of running an async block with Async.RunSynchronously? Is this async or sync? Looks like a contradiction.

The documentation says that Async.StartImmediate runs in the current thread. If it runs in the same thread, it doesn't look very asynchronous to me... Or maybe asyncs are more like coroutines rather than threads. If so, when do they yield back an forth?

Quoting MS docs:

The line of code that uses let! starts the computation, and then the thread is suspended until the result is available, at which point execution continues.

If the thread waits for the result, why should i use it? Looks like plain old function call.

And what does Async.Parallel do? It receives a sequence of Async<'T>. Why not a sequence of plain functions to be executed in parallel?

I think i'm missing something very basic here. I guess after i understand that, all the documentation and samples will start making sense.

7条回答
仙女界的扛把子
2楼-- · 2019-03-08 14:45

Many good answers here but I thought I take a different angle to the question: How does F#'s async really work?

Unlike async/await in C# F# developers can actually implement their own version of Async. This can be a great way to learn how Async works.

(For the interested the source code to Async can be found here: https://github.com/Microsoft/visualfsharp/blob/fsharp4/src/fsharp/FSharp.Core/control.fs)

As our fundamental building block for our DIY workflows we define:

type DIY<'T> = ('T->unit)->unit

This is a function that accepts another function (called the continuation) that is called when the result of type 'T is ready. This allows DIY<'T> to start a background task without blocking the calling thread. When the result is ready the continuation is called allowing the computation to continue.

The F# Async building block is a bit more complicated as it also includes cancellation and exception continuations but essentially this is it.

In order to support the F# workflow syntax we need to define a computation expression (https://msdn.microsoft.com/en-us/library/dd233182.aspx). While this is a rather advanced F# feature it's also one of the most amazing features of F#. The two most important operations to define are return & bind which are used by F# to combine our DIY<_> building blocks into aggregated DIY<_> building blocks.

adaptTask is used to adapt a Task<'T> into a DIY<'T>. startChild allows starting several simulatenous DIY<'T>, note that it doesn't start new threads in order to do so but reuses the calling thread.

Without any further ado here's the sample program:

open System
open System.Diagnostics
open System.Threading
open System.Threading.Tasks

// Our Do It Yourself Async workflow is a function accepting a continuation ('T->unit).
// The continuation is called when the result of the workflow is ready. 
// This may happen immediately or after awhile, the important thing is that 
//  we don't block the calling thread which may then continue executing useful code.
type DIY<'T> = ('T->unit)->unit

// In order to support let!, do! and so on we implement a computation expression.
// The two most important operations are returnValue/bind but delay is also generally 
//  good to implement.
module DIY =

    // returnValue is called when devs uses return x in a workflow.
    // returnValue passed v immediately to the continuation.
    let returnValue (v : 'T) : DIY<'T> =
        fun a ->
            a v

    // bind is called when devs uses let!/do! x in a workflow
    // bind binds two DIY workflows together
    let bind (t : DIY<'T>) (fu : 'T->DIY<'U>) : DIY<'U> =
        fun a ->
            let aa tv =
                let u = fu tv
                u a
            t aa

    let delay (ft : unit->DIY<'T>) : DIY<'T> =
        fun a ->
            let t = ft ()
            t a

    // starts a DIY workflow as a subflow
    // The way it works is that the workflow is executed 
    //  which may be a delayed operation. But startChild
    //  should always complete immediately so in order to
    //  have something to return it returns a DIY workflow
    // postProcess checks if the child has computed a value 
    //  ie rv has some value and if we have computation ready
    //  to receive the value (rca has some value).
    //  If this is true invoke ca with v
    let startChild (t : DIY<'T>) : DIY<DIY<'T>> =
        fun a ->
            let l   = obj()
            let rv  = ref None
            let rca = ref None

            let postProcess () =
                match !rv, !rca with
                | Some v, Some ca ->
                    ca v
                    rv  := None
                    rca := None
                | _ , _ -> ()

            let receiver v =
                lock l <| fun () ->
                    rv := Some v
                    postProcess ()

            t receiver

            let child : DIY<'T> =
                fun ca ->
                    lock l <| fun () ->
                        rca := Some ca
                        postProcess ()

            a child

    let runWithContinuation (t : DIY<'T>) (f : 'T -> unit) : unit =
        t f

    // Adapts a task as a DIY workflow
    let adaptTask (t : Task<'T>) : DIY<'T> =
        fun a ->
            let action = Action<Task<'T>> (fun t -> a t.Result)
            ignore <| t.ContinueWith action

    // Because C# generics doesn't allow Task<void> we need to have
    //  a special overload of for the unit Task.
    let adaptUnitTask (t : Task) : DIY<unit> =
        fun a ->
            let action = Action<Task> (fun t -> a ())
            ignore <| t.ContinueWith action

    type DIYBuilder() =
        member x.Return(v)  = returnValue v
        member x.Bind(t,fu) = bind t fu
        member x.Delay(ft)  = delay ft

let diy = DIY.DIYBuilder()

open DIY

[<EntryPoint>]
let main argv = 

    let delay (ms : int) = adaptUnitTask <| Task.Delay ms

    let delayedValue ms v =
        diy {
            do! delay ms
            return v
        }

    let complete = 
        diy {
            let sw = Stopwatch ()
            sw.Start ()

            // Since we are executing these tasks concurrently 
            //  the time this takes should be roughly 700ms
            let! cd1 = startChild <| delayedValue 100 1
            let! cd2 = startChild <| delayedValue 300 2
            let! cd3 = startChild <| delayedValue 700 3

            let! d1 = cd1
            let! d2 = cd2
            let! d3 = cd3

            sw.Stop ()

            return sw.ElapsedMilliseconds,d1,d2,d3
        }

    printfn "Starting workflow"

    runWithContinuation complete (printfn "Result is: %A")

    printfn "Waiting for key"

    ignore <| Console.ReadKey ()

    0

The output of the program should be something like this:

Starting workflow
Waiting for key
Result is: (706L, 1, 2, 3)

When running the program note that Waiting for key is printed immidiately as the Console thread is not blocked from starting workflow. After about 700ms the result is printed.

I hope this was interesting to some F# devs

查看更多
登录 后发表回答