Observable.FromAsyncPattern with UDPClient.EndRece

2019-05-26 08:58发布

I'm learning about Reactive extensions and trying to re-factor some of my code.

UDPClient.EndReceive takes a ref IPEndPoint parameter, so I currently have this working:

UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

IPEndPoint ep = null;
var async = Observable.FromAsyncPattern<byte[]>(receiverUDP.BeginReceive, (i) => receiverUDP.EndReceive(i, ref ep));
var subscr = async().Subscribe(x => Console.WriteLine(ASCIIEncoding.ASCII.GetString(x)));

What if my subscribers need access to the remote IPEndPoint? In my current incarnation I'm using events, and passing back a custom class which wraps byte[] and IPEndPoint. I cannot for the life of me, work out how to do this with Rx.

2条回答
beautiful°
2楼-- · 2019-05-26 09:22

For anyone else looking, there's a slightly simpler, and more modern way to do this using ReceiveAsync:

public static IObservable<UdpReceiveResult> UdpStream(IPEndPoint endpoint)
{
    return Observable.Using(() => new UdpClient(endpoint),
        udpClient => Observable.Defer(() =>
            udpClient.ReceiveAsync().ToObservable()).Repeat());
}

You can call it with IPAddress.Any:

var stream = UdpStream(new IPEndPoint(IPAddress.Any, 514));

and then use Select to project the stream to whatever type you want.

查看更多
Summer. ? 凉城
3楼-- · 2019-05-26 09:44

If you've already created a wrapper class for byte[] and IPEndPoint why not return that as the sequence using Select:

private IObservable<RemoteData> GetRemoteDataAsync()
{
    return Observable.Defer(() => 
    {
        UdpClient receiverUDP = new UdpClient();
        receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, 
            SocketOptionName.ReuseAddress, true);
        receiverUDP.EnableBroadcast = true;
        receiverUDP.Client.ExclusiveAddressUse = false;
        receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

        IPEndPoint ep = null;
        return Observable.FromAsyncPattern<byte[]>(
                   receiverUDP.BeginReceive, 
                   (i) => receiverUDP.EndReceive(i, ref ep)
               )()
               .Select(bytes => new RemoteData(bytes, ep));
    });
}
查看更多
登录 后发表回答