Does Async.StartChild have a memory leak?

2019-04-03 22:21发布

When I run the following test (built with F#2.0) I get OutOfMemoryException. It takes about 5 min to reach exception on my system (i7-920 6gb ram if it was running as x86 process), but in any case we can see how memory is growing in task manager.

module start_child_test
    open System
    open System.Diagnostics
    open System.Threading
    open System.Threading.Tasks

    let cnt = ref 0
    let sw = Stopwatch.StartNew()
    Async.RunSynchronously(async{
        while true do
            let! x = Async.StartChild(async{
                if (Interlocked.Increment(cnt) % 100000) = 0 then
                    if sw.ElapsedMilliseconds > 0L then
                        printfn "ops per sec = %d" (100000L*1000L / sw.ElapsedMilliseconds)
                    else
                        printfn "ops per sec = INF"
                    sw.Restart()
                    GC.Collect()
            })
            do! x
    })

    printfn "done...."

I don't see nothing wrong with this code, and don't see any reasons for memory growing. I made alternate implementation to make sure my arguments are valid:

module start_child_fix
    open System
    open System.Collections
    open System.Collections.Generic
    open System.Threading
    open System.Threading.Tasks


    type IAsyncCallbacks<'T> = interface
        abstract member OnSuccess: result:'T -> unit
        abstract member OnError: error:Exception -> unit
        abstract member OnCancel: error:OperationCanceledException -> unit
    end

    type internal AsyncResult<'T> = 
        | Succeeded of 'T
        | Failed of Exception
        | Canceled of OperationCanceledException

    type internal AsyncGate<'T> = 
        | Completed of AsyncResult<'T>
        | Subscribed of IAsyncCallbacks<'T>
        | Started
        | Notified

    type Async with
        static member StartChildEx (comp:Async<'TRes>) = async{
            let! ct = Async.CancellationToken

            let gate = ref AsyncGate.Started
            let CompleteWith(result:AsyncResult<'T>, callbacks:IAsyncCallbacks<'T>) =
                if Interlocked.Exchange(gate, Notified) <> Notified then
                    match result with
                        | Succeeded v -> callbacks.OnSuccess(v)
                        | Failed e -> callbacks.OnError(e)
                        | Canceled e -> callbacks.OnCancel(e)

            let ProcessResults (result:AsyncResult<'TRes>) =
                let t = Interlocked.CompareExchange<AsyncGate<'TRes>>(gate, AsyncGate.Completed(result), AsyncGate.Started)
                match t with
                | Subscribed callbacks -> 
                    CompleteWith(result, callbacks)
                | _ -> ()
            let Subscribe (success, error, cancel) = 
                let callbacks = {
                    new IAsyncCallbacks<'TRes> with
                        member this.OnSuccess v = success v
                        member this.OnError e = error e
                        member this.OnCancel e = cancel e
                }
                let t = Interlocked.CompareExchange<AsyncGate<'TRes>>(gate, AsyncGate.Subscribed(callbacks), AsyncGate.Started)
                match t with
                | AsyncGate.Completed result -> 
                    CompleteWith(result, callbacks)
                | _ -> ()

            Async.StartWithContinuations(
                computation = comp,
                continuation = (fun v -> ProcessResults(AsyncResult.Succeeded(v))),
                exceptionContinuation = (fun e -> ProcessResults(AsyncResult.Failed(e))),
                cancellationContinuation = (fun e -> ProcessResults(AsyncResult.Canceled(e))),
                cancellationToken = ct
            )
            return Async.FromContinuations( fun (success, error, cancel) ->
                Subscribe(success, error, cancel)
            )
        }

For this test it works well without any considerably memory consumption. Unfortunately I'm not much experienced in F# and have doubts if I miss some things. In case if it is bug how can I report it to F# team?

1条回答
混吃等死
2楼-- · 2019-04-03 22:51

I think you're correct - there seems to be a memory leak in the implementation of StartChild.

I did a bit of profiling (following a fantastic tutorial by Dave Thomas) and the open-source F# release and I think I even know how to fix that. If you look at the implementation of StartChild, it registers a handler with the current cancellation token of the workflow:

let _reg = ct.Register(
    (fun _ -> 
        match !ctsRef with
        |   null -> ()
        |   otherwise -> otherwise.Cancel()), null)

The objects that stay alive in the heap are instances of this registered function. They could be unregistered by calling _reg.Dispose(), but that never happens in the F# source code. I tried adding _reg.Dispose() to the functions that get called when the async completes:

(fun res -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Ok res, reuseThread=true))   
(fun err -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Error err,reuseThread=true))   
(fun err -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Canceled err,reuseThread=true))

... and based on my experiments, this fixes the problem. So, if you want a workaround, you can probably copy all the required code from control.fs and add this as a fix.

I'll send a bug report to the F# team with a link to your question. If you find something else, you can contact them by sending bug reports to fsbugs at microsoft dot com.

查看更多
登录 后发表回答