How to use TryScan in F# properly

2019-06-22 13:44发布

I was trying to find an example about how to use TryScan, but haven't found any, could you help me?

What I would like to do (quite simplified example): I have a MailboxProcessor that accepts two types of mesages.

  • First one GetState returns current state. GetState messages are sent quite frequently

  • The other UpdateState is very expensive (time consuming) - e.g. downloading something from internet and then updates the state accordingly. UpdateState is called only rarely.

My problem is - messages GetState are blocked and wait until preceding UpdateState are served. That's why I tried to use TryScan to process all GetState messages, but with no luck.

My example code:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan"
                        chnl.Reply(state)
                        Some(async { return! loop state})
                    | _ -> None
                ) |> ignore

                let! msg = mbox.Receive()
                match msg with
                | UpdateState ->
                    printfn "U processing"
                    // something very time consuming here...
                    async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                    return! loop (state+1)
                | GetState(chnl) ->
                    printfn "G processing"
                    chnl.Reply(state)
                    return! loop state
             }
             loop 0
)

[async { for i in 1..10 do 
          printfn " U"
          mbox.Post(UpdateState)
          async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
        async { do! Async.Sleep(500) } |> Async.RunSynchronously
        for i in 1..20 do 
          printfn "G"
          printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously

If you try to run the code, you will see, that GetState message is not almost processed, because it waits for the result. On the other hand UpdateState is only fire-and-forget, thus blocking effectively getting state.

Edit

Current solution that works for me is this one:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                let! res = mbox.TryScan((function
                    | GetState(chnl) -> Some(async {
                            chnl.Reply(state)
                            return state
                        })
                    | _ -> None
                ), 5)

                match res with
                | None ->
                    let! msg = mbox.Receive()
                    match msg with
                        | UpdateState ->
                            async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                            return! loop (state+1)
                        | _ -> return! loop state
                | Some n -> return! loop n
             }
             loop 0
)

Reactions to comments: the idea with other MailboxProcessor or ThreadPool that executes UpdateState in parallel is great, but I don't need it currently. All I wanted to do is to process all GetState messages and after that the others. I don't care that during processing UpdateState the agent is blocked.

I'll show you what was the problem on the output:

// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
 U            200ms   <-- issue UpdateState
U processing          <-- process UpdateState, it takes 1sec, so other 
 U            200ms       5 requests are sent; sent means, that it is
 U            200ms       fire-and-forget message - it doesn't wait for any result
                          and therefore it can send every 200ms one UpdateState message
G                     <-- first GetState sent, but waiting for reply - so all 
                          previous UpdateState messages have to be processed! = 3 seconds
                          and AFTER all the UpdateState messages are processed, result
                          is returned and new GetState can be sent. 
 U            200ms
 U            200ms       because each UpdateState takes 1 second
 U            200ms
U processing
 U
 U
 U
 U
U processing
G processing          <-- now first GetState is processed! so late? uh..
U processing          <-- takes 1sec
3
G
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
G processing          <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent

3条回答
该账号已被封号
2楼-- · 2019-06-22 13:50

I don't think that the TryScan method will help you in this scenario. It allows you to specify timeout to be used while waiting for messages. Once some message is received, it will start processing the message (ignoring the timeout).

For example, if you wanted to wait for some specific message, but perform some other checking every second (while waiting) you could write:

let loop () = async {
  let! res = mbox.TryScan(function
    | ImportantMessage -> Some(async { 
          // process message 
          return 0
        })
    | _ -> None)
  match res with
  | None -> 
       // perform some check & continue waiting
       return! loop ()
  | Some n -> 
       // ImportantMessage was received and processed 
}

What can you do to avoid blocking the mailbox processor when processing the UpdateState message? The mailbox processor is (logically) single-threaded - you probably don't want to cancel the processing of UpdateState message, so the best option is to start processing it in background and wait until the processing completes. The code that processes UpdateState can then send some message back to the mailbox (e.g. UpdateStateCompleted).

Here is a sketch how this might look:

let rec loop (state) = async {
  let! msg = mbox.Receive()
  match msg with
  | GetState(repl) -> 
      repl.Reply(state)
      return! scanning state
  | UpdateState -> 
      async { 
        // complex calculation (runs in parallel)
        mbox.Post(UpdateStateCompleted newState) }
      |> Async.Start
  | UpdateStateCompleted newState ->
      // Received new state from background workflow
      return! loop newState }

Now that the background task is running in parallel, you need to be careful about mutable state. Also, if you send UpdateState messages faster than you can process them, you'll be in trouble. This can be fixed, for example, by ignoring or queueing requests when you're already processing previous one.

查看更多
劫难
3楼-- · 2019-06-22 14:08

As Tomas mentioned MailboxProcessor is single threaded. You will need another MailboxProcessor to run the updates on a separate thread from the state getter.

#nowarn "40"

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | UpdateState

let runner_UpdateState = MailboxProcessor.Start(fun mbox ->
    let rec loop = async {
        let! state = mbox.Receive()
        printfn "U start processing %d" !state
        // something very time consuming here...
        do! Async.Sleep 100
        printfn "U done processing %d" !state
        state := !state + 1
        do! loop
    }
    loop
)

let mbox = MailboxProcessor.Start(fun mbox ->
    // we need a mutiple state if another thread can change it at any time
    let state = ref 0

    let rec loop = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> runner_UpdateState.Post state
        | GetState chnl -> chnl.Reply !state

        return! loop 
    }
    loop)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.Console.ReadLine() |> ignore

output:

U start processing 0
U done processing 0
U start processing 1
U done processing 1
U start processing 2
U done processing 2
U start processing 3
U done processing 3
U start processing 4
U done processing 4
G 5
U start processing 5
G 5
U done processing 5
G 5
G 6
U start processing 6
G 6
G 6
U done processing 6
G 7
U start processing 7
G 7
G 7
U done processing 7
G 8
G U start processing 8
8
G 8
U done processing 8
G 9
G 9
U start processing 9
G 9
U done processing 9
G 9
G 10
G 10
G 10
G 10

You could also use ThreadPool.

open System.Threading

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | SetState of int
    | UpdateState

let mbox = MailboxProcessor.Start(fun mbox ->
    let rec loop state = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> 
            ThreadPool.QueueUserWorkItem((fun obj -> 
                let state = obj :?> int

                printfn "U start processing %d" state
                Async.Sleep 100 |> Async.RunSynchronously
                printfn "U done processing %d" state
                mbox.Post(SetState(state + 1))

                ), state)
            |> ignore
        | GetState chnl -> 
            chnl.Reply state
        | SetState newState ->
            return! loop newState
        return! loop state
    }
    loop 0)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.Console.ReadLine() |> ignore

查看更多
我只想做你的唯一
4楼-- · 2019-06-22 14:13

DON'T USE TRYSCAN!!!

Unfortunately, the TryScan function in the current version of F# is broken in two ways. Firstly, the whole point is to specify a timeout but the implementation does not actually honor it. Specifically, irrelevant messages reset the timer. Secondly, as with the other Scan function, the message queue is examined under a lock that prevents any other threads from posting for the duration of the scan, which can be an arbitrarily long time. Consequently, the TryScan function itself tends to lock-up concurrent systems and can even introduce deadlocks because the caller's code is evaluated inside the lock (e.g. posting from the function argument to Scan or TryScan can deadlock the agent when the code under the lock blocks waiting to acquire the lock it is already under).

I used TryScan in an early prototype of my production code and it caused no end of problems. However, I managed to architect around it and the resulting architecture was actually better. In essence, I eagerly Receive all messages and filter using my own local queue.

查看更多
登录 后发表回答