How to detect a timeout when using asynchronous So

2019-05-06 20:02发布

问题:

Writing an asynchronous Ping using Raw Sockets in F#, to enable parallel requests using as few threads as possible. Not using "System.Net.NetworkInformation.Ping", because it appears to allocate one thread per request. Am also interested in using F# async workflows.

The synchronous version below correctly times out when the target host does not exist/respond, but the asynchronous version hangs. Both work when the host does respond. Not sure if this is a .NET issue, or an F# one...

Any ideas?

(note: the process must run as Admin to allow Raw Socket access)

This throws a timeout:

let result = Ping.Ping ( IPAddress.Parse( "192.168.33.22" ), 1000 )

However, this hangs:

let result = Ping.AsyncPing ( IPAddress.Parse( "192.168.33.22" ), 1000 )
             |> Async.RunSynchronously

Here's the code...

module Ping

open System
open System.Net
open System.Net.Sockets
open System.Threading

//---- ICMP Packet Classes

type IcmpMessage (t : byte) =
    let mutable m_type = t
    let mutable m_code = 0uy
    let mutable m_checksum = 0us

    member this.Type
        with get() = m_type

    member this.Code
        with get() = m_code

    member this.Checksum = m_checksum

    abstract Bytes : byte array

    default this.Bytes
        with get() =
            [|
                m_type
                m_code
                byte(m_checksum)
                byte(m_checksum >>> 8)
            |]

    member this.GetChecksum() =
        let mutable sum = 0ul
        let bytes = this.Bytes
        let mutable i = 0

        // Sum up uint16s
        while i < bytes.Length - 1 do
            sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
            i <- i + 2

        // Add in last byte, if an odd size buffer
        if i <> bytes.Length then
            sum <- sum + uint32(bytes.[i])

        // Shuffle the bits
        sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
        sum <- sum + (sum >>> 16)
        sum <- ~~~sum
        uint16(sum)

    member this.UpdateChecksum() =
        m_checksum <- this.GetChecksum()


type InformationMessage (t : byte) =
    inherit IcmpMessage(t)

    let mutable m_identifier = 0us
    let mutable m_sequenceNumber = 0us

    member this.Identifier = m_identifier
    member this.SequenceNumber = m_sequenceNumber

    override this.Bytes
        with get() =
            Array.append (base.Bytes)
                         [|
                            byte(m_identifier)
                            byte(m_identifier >>> 8)
                            byte(m_sequenceNumber)
                            byte(m_sequenceNumber >>> 8)
                         |]

type EchoMessage() =
    inherit InformationMessage( 8uy )
    let mutable m_data = Array.create 32 32uy
    do base.UpdateChecksum()

    member this.Data
        with get()  = m_data
        and  set(d) = m_data <- d
                      this.UpdateChecksum()

    override this.Bytes
        with get() =
            Array.append (base.Bytes)
                         (this.Data)

//---- Synchronous Ping

let Ping (host : IPAddress, timeout : int ) =
    let mutable ep = new IPEndPoint( host, 0 )
    let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
    socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
    socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
    let packet = EchoMessage()
    let mutable buffer = packet.Bytes

    try
        if socket.SendTo( buffer, ep ) <= 0 then
            raise (SocketException())
        buffer <- Array.create (buffer.Length + 20) 0uy

        let mutable epr = ep :> EndPoint
        if socket.ReceiveFrom( buffer, &epr ) <= 0 then
            raise (SocketException())
    finally
        socket.Close()

    buffer

//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)

type Async with
    static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
        Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
    static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
        Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)

//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom

type System.Net.Sockets.Socket with

    member this.AsyncSendTo( buffer, offset, size, socketFlags, remoteEP ) =
        Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
                            this.BeginSendTo,
                            this.EndSendTo )
    member this.AsyncReceiveFrom( buffer, offset, size, socketFlags, remoteEP ) =
        Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
                            this.BeginReceiveFrom,
                            (fun asyncResult -> this.EndReceiveFrom(asyncResult, remoteEP) ) )

//---- Asynchronous Ping

let AsyncPing (host : IPAddress, timeout : int ) =  
    async {
        let ep = IPEndPoint( host, 0 )
        use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
        socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
        socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )

        let packet = EchoMessage()
        let outbuffer = packet.Bytes

        try
            let! result = socket.AsyncSendTo( outbuffer, 0, outbuffer.Length, SocketFlags.None, ep )
            if result <= 0 then
                raise (SocketException())

            let epr = ref (ep :> EndPoint)
            let inbuffer = Array.create (outbuffer.Length + 256) 0uy 
            let! result = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr )
            if result <= 0 then
                raise (SocketException())
            return inbuffer
        finally
            socket.Close()
    }

回答1:

James, your own accepted answer has a problem I wanted to point out. You only allocate one timer, which makes the async object returned by AsyncReceiveEx a stateful one-time-use object. Here's a similar example that I trimmed down:

let b,e,c = Async.AsBeginEnd(Async.Sleep)

type Example() =
    member this.Close() = ()
    member this.AsyncReceiveEx( sleepTime, (timeoutMS:int) ) = 
        let timedOut = ref false 
        let completed = ref false 
        let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false)
        timer.Elapsed.Add( fun _ -> 
            lock timedOut (fun () -> 
                timedOut := true 
                if not !completed 
                then this.Close() 
                ) 
            ) 
        let complete() = 
            lock timedOut (fun () -> 
                timer.Stop() 
                timer.Dispose() 
                completed := true 
                ) 
        Async.FromBeginEnd( sleepTime,
                            (fun st -> 
                                let result = b(st)
                                timer.Start() 
                                result 
                            ), 
                            (fun result -> 
                                complete() 
                                if !timedOut 
                                then printfn "err"; () 
                                else e(result)
                            ), 
                            (fun () -> 
                                complete() 
                                this.Close() 
                                ) 
                            ) 

let ex = new Example()
let a = ex.AsyncReceiveEx(3000, 1000)
Async.RunSynchronously a
printfn "ok..."
// below throws ODE, because only allocated one Timer
Async.RunSynchronously a

Ideally you want every 'run' of the async returned by AsyncReceiveEx to behave the same, which means each run needs its own timer and set of ref flags. This is easy to fix thusly:

let b,e,c = Async.AsBeginEnd(Async.Sleep)

type Example() =
    member this.Close() = ()
    member this.AsyncReceiveEx( sleepTime, (timeoutMS:int) ) = 
        async {
        let timedOut = ref false 
        let completed = ref false 
        let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false)
        timer.Elapsed.Add( fun _ -> 
            lock timedOut (fun () -> 
                timedOut := true 
                if not !completed 
                then this.Close() 
                ) 
            ) 
        let complete() = 
            lock timedOut (fun () -> 
                timer.Stop() 
                timer.Dispose() 
                completed := true 
                ) 
        return! Async.FromBeginEnd( sleepTime,
                            (fun st -> 
                                let result = b(st)
                                timer.Start() 
                                result 
                            ), 
                            (fun result -> 
                                complete() 
                                if !timedOut 
                                then printfn "err"; () 
                                else e(result)
                            ), 
                            (fun () -> 
                                complete() 
                                this.Close() 
                                ) 
                            ) 
        }
let ex = new Example()
let a = ex.AsyncReceiveEx(3000, 1000)
Async.RunSynchronously a
printfn "ok..."
Async.RunSynchronously a

The only change is to put the body of AsyncReceiveEx inside async{...} and have the last line return!.



回答2:

The docs clearly state that the timeout only applies to the sync versions:

http://msdn.microsoft.com/en-us/library/system.net.sockets.socketoptionname.aspx



回答3:

After some thought, came up with the following. This code adds an AsyncReceiveEx member to Socket, which includes a timeout value. It hides the details of the watchdog timer inside the receive method... very tidy and self contained. Now THIS is what I was looking for!

See the complete async ping example, further below.

Not sure if the locks are necessary, but better safe than sorry...

type System.Net.Sockets.Socket with
    member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
        Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                            this.BeginSend,
                            this.EndSend,
                            this.Close )

    member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
        Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                            this.BeginReceive,
                            this.EndReceive,
                            this.Close )

    member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
        async {
            let timedOut = ref false
            let completed = ref false
            let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
            timer.Elapsed.Add( fun _ ->
                lock timedOut (fun () ->
                    timedOut := true
                    if not !completed
                    then this.Close()
                    )
                )
            let complete() =
                lock timedOut (fun () ->
                    timer.Stop()
                    timer.Dispose()
                    completed := true
                    )
            return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                                (fun (b,o,s,sf,e,st,uo) ->
                                    let result = this.BeginReceive(b,o,s,sf,e,st,uo)
                                    timer.Start()
                                    result
                                ),
                                (fun result ->
                                    complete()
                                    if !timedOut
                                    then err := SocketError.TimedOut; 0
                                    else this.EndReceive( result, err )
                                ),
                                (fun () ->
                                    complete()
                                    this.Close()
                                    )
                                )
            }

Here is a complete Ping example. To avoid running out of source ports and to prevent getting too many replies at once, it scans one class-c subnet at a time.

module Ping

open System
open System.Net
open System.Net.Sockets
open System.Threading

//---- ICMP Packet Classes

type IcmpMessage (t : byte) =
    let mutable m_type = t
    let mutable m_code = 0uy
    let mutable m_checksum = 0us

    member this.Type
        with get() = m_type

    member this.Code
        with get() = m_code

    member this.Checksum = m_checksum

    abstract Bytes : byte array

    default this.Bytes
        with get() =
            [|
                m_type
                m_code
                byte(m_checksum)
                byte(m_checksum >>> 8)
            |]

    member this.GetChecksum() =
        let mutable sum = 0ul
        let bytes = this.Bytes
        let mutable i = 0

        // Sum up uint16s
        while i < bytes.Length - 1 do
            sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
            i <- i + 2

        // Add in last byte, if an odd size buffer
        if i <> bytes.Length then
            sum <- sum + uint32(bytes.[i])

        // Shuffle the bits
        sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
        sum <- sum + (sum >>> 16)
        sum <- ~~~sum
        uint16(sum)

    member this.UpdateChecksum() =
        m_checksum <- this.GetChecksum()


type InformationMessage (t : byte) =
    inherit IcmpMessage(t)

    let mutable m_identifier = 0us
    let mutable m_sequenceNumber = 0us

    member this.Identifier = m_identifier
    member this.SequenceNumber = m_sequenceNumber

    override this.Bytes
        with get() =
            Array.append (base.Bytes)
                         [|
                            byte(m_identifier)
                            byte(m_identifier >>> 8)
                            byte(m_sequenceNumber)
                            byte(m_sequenceNumber >>> 8)
                         |]

type EchoMessage() =
    inherit InformationMessage( 8uy )
    let mutable m_data = Array.create 32 32uy
    do base.UpdateChecksum()

    member this.Data
        with get()  = m_data
        and  set(d) = m_data <- d
                      this.UpdateChecksum()

    override this.Bytes
        with get() =
            Array.append (base.Bytes)
                         (this.Data)

//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)

type Async with
    static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
        Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
    static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
        Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)

//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom

type System.Net.Sockets.Socket with

    member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
        Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                            this.BeginSend,
                            this.EndSend,
                            this.Close )

    member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
        Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                            this.BeginReceive,
                            this.EndReceive,
                            this.Close )

    member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
        async {
            let timedOut = ref false
            let completed = ref false
            let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
            timer.Elapsed.Add( fun _ ->
                lock timedOut (fun () ->
                    timedOut := true
                    if not !completed
                    then this.Close()
                    )
                )
            let complete() =
                lock timedOut (fun () ->
                    timer.Stop()
                    timer.Dispose()
                    completed := true
                    )
            return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                                (fun (b,o,s,sf,e,st,uo) ->
                                    let result = this.BeginReceive(b,o,s,sf,e,st,uo)
                                    timer.Start()
                                    result
                                ),
                                (fun result ->
                                    complete()
                                    if !timedOut
                                    then err := SocketError.TimedOut; 0
                                    else this.EndReceive( result, err )
                                ),
                                (fun () ->
                                    complete()
                                    this.Close()
                                    )
                                )
            }

//---- Asynchronous Ping

let AsyncPing (ip : IPAddress, timeout : int ) =  
    async {
        use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
        socket.Connect( IPEndPoint( ip, 0 ) )

        let pingTime = System.Diagnostics.Stopwatch()
        let packet = EchoMessage()
        let outbuffer = packet.Bytes
        let err = ref (SocketError())

        let isAlive = ref false
        try
            pingTime.Start()
            let! result = socket.AsyncSend( outbuffer, 0, outbuffer.Length, SocketFlags.None, err )
            pingTime.Stop()

            if result <= 0 then
                raise (SocketException(int(!err)))

            let inbuffer = Array.create (outbuffer.Length + 256) 0uy 

            pingTime.Start()
            let! reply = socket.AsyncReceiveEx( inbuffer, 0, inbuffer.Length, SocketFlags.None, err, timeout )
            pingTime.Stop()

            if result <= 0 && not (!err = SocketError.TimedOut) then
                raise (SocketException(int(!err)))

            isAlive := not (!err = SocketError.TimedOut)
                          && inbuffer.[25] = 0uy // Type 0 = echo reply (redundent? necessary?)
                          && inbuffer.[26] = 0uy // Code 0 = echo reply (redundent? necessary?)
        finally
            socket.Close()

        return (ip, pingTime.Elapsed, !isAlive )
    }

let main() =
    let pings net =
        seq {
            for node in 0..255 do
                let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
                yield Ping.AsyncPing( ip, 1000 )
            }

    for net in 0..255 do
        pings net
        |> Async.Parallel
        |> Async.RunSynchronously
        |> Seq.filter ( fun (_,_,alive) -> alive )
        |> Seq.iter ( fun (ip, time, alive) ->
                          printfn "%A %dms" ip time.Milliseconds)

main()
System.Console.ReadKey() |> ignore


回答4:

A couple things...

First, it's possible to adapt the .NET FooAsync/FooCompleted pattern into an F# async. The FSharp.Core library does this for WebClient; I think you can use the same pattern here. Here's the WebClient code

type System.Net.WebClient with
    member this.AsyncDownloadString (address:Uri) : Async<string> =
        let downloadAsync =
            Async.FromContinuations (fun (cont, econt, ccont) ->
                    let userToken = new obj()
                    let rec handler = 
                            System.Net.DownloadStringCompletedEventHandler (fun _ args ->
                                if userToken = args.UserState then
                                    this.DownloadStringCompleted.RemoveHandler(handler)
                                    if args.Cancelled then
                                        ccont (new OperationCanceledException()) 
                                    elif args.Error <> null then
                                        econt args.Error
                                    else
                                        cont args.Result)
                    this.DownloadStringCompleted.AddHandler(handler)
                    this.DownloadStringAsync(address, userToken)
                )
            async { 
                use! _holder = Async.OnCancel(fun _ -> this.CancelAsync())
                return! downloadAsync
            }

and I think you can do the same for SendAsync/SendAsyncCancel/PingCompleted (I have not thought it through carefully).

Second, name your method AsyncPing, not PingAsync. F# async methods are named AsyncFoo, whereas methods with the event pattern are named FooAsync.

I didn't look carefully through your code to try to find where the error may lie.



回答5:

Here is a version using Async.FromContinuations.

However, this is NOT an answer to my problem, because it does not scale. The code may be useful to someone, so posting it here.

The reason this is not an answer is because System.Net.NetworkInformation.Ping appears to use one thread per Ping and quite a bit of memory (likely due to thread stack space). Attempting to ping an entire class-B network will run out of memory and use 100's of threads, whereas the code using raw sockets uses only a few threads and under 10Mb.

type System.Net.NetworkInformation.Ping with
    member this.AsyncPing (address:IPAddress) : Async<PingReply> =
        let pingAsync =
            Async.FromContinuations (fun (cont, econt, ccont) ->
                    let userToken = new obj()
                    let rec handler = 
                            PingCompletedEventHandler (fun _ args ->
                                if userToken = args.UserState then
                                    this.PingCompleted.RemoveHandler(handler)
                                    if args.Cancelled then
                                        ccont (new OperationCanceledException()) 
                                    elif args.Error <> null then
                                        econt args.Error
                                    else
                                        cont args.Reply)
                    this.PingCompleted.AddHandler(handler)
                    this.SendAsync(address, 1000, userToken)
                )
        async { 
            use! _holder = Async.OnCancel(fun _ -> this.SendAsyncCancel())
            return! pingAsync
        }

let AsyncPingTest() =
    let pings =
        seq {
            for net in 0..255 do
                for node in 0..255 do
                    let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
                    let ping = new Ping()
                    yield ping.AsyncPing( ip )
            }
    pings
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Seq.iter ( fun result ->
                      printfn "%A" result )


回答6:

EDIT: code changed to working version.

James, I've modified your code and it seems it work as well as your version, but uses MailboxProcessor as timeout handler engine. The code is 4x times slower then your version but uses 1.5x less memory.

let AsyncPing (host: IPAddress) timeout =  
    let guard =
        MailboxProcessor<AsyncReplyChannel<Socket*byte array>>.Start(
            fun inbox ->
            async {
                try
                    let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
                    try
                        let ep = IPEndPoint( host, 0 )
                        let packet = EchoMessage()
                        let outbuffer = packet.Bytes
                        let! reply = inbox.Receive()
                        let! result = socket.AsyncSendTo( outbuffer, 0, outbuffer.Length, SocketFlags.None, ep )
                        if result <= 0 then
                            raise (SocketException())
                        let epr = ref (ep :> EndPoint)
                        let inbuffer = Array.create (outbuffer.Length + 256) 0uy 
                        let! result = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr )
                        if result <= 0 then
                            raise (SocketException())
                        reply.Reply(socket,inbuffer)
                        return ()
                    finally
                        socket.Close()
                finally
                    ()
            })
    async {
        try
            //#1: blocks thread and as result have large memory footprint and too many threads to use
            //let socket,r = guard.PostAndReply(id,timeout=timeout)

            //#2: suggested by Dmitry Lomov
            let! socket,r = guard.PostAndAsyncReply(id,timeout=timeout)
            printfn "%A: ok" host
            socket.Close()
        with
            _ ->
                printfn "%A: failed" host
                ()
        }

//test it
//timeout is ms interval
//i.e. 10000 is equal to 10s
let AsyncPingTest timeout =
    seq {
        for net in 1..254 do
            for node in 1..254 do
                let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
                yield AsyncPing ip timeout
    }
    |> Async.Parallel
    |> Async.RunSynchronously


回答7:

Possibly see also:

http://blogs.msdn.com/pfxteam/archive/2010/05/04/10007557.aspx