How to Subscribe to IObservable Sequence, force co

2019-06-06 19:56发布

There is a pattern I'm having trouble with when working with observables.

I am working with a bluetooth device.

  1. I send a message to that device telling it to do something and notify me of the result or results.
  2. The device starts sending notifications (could go for 10ms or 20s)
  3. I wait for the device to finish sending notifications. sometimes this will be a specific message from the device and sometimes I just won't receive any more messages for a timeout period.
  4. I convert the messages to a single item or an IEnumerable and go on my merry way.

Example one:

  1. I enter a login command with a login message and a password
  2. The device sends back a success or failure message (usually 10ms or so)
  3. I wait for the message to come
  4. I use the message to tell if the user can continue or if they need to retry their password.

Example two:

  1. I send a command to the bluetooth device requesting all wifi networks in range
  2. The device turns on its wifi radio and sends back an unknown number of messages but stops at some point
  3. I wait for the messages to stop
  4. I present the complete list of wifi networks to the user

I think this should be done in this something close the following manner. (I've removed as much bluetooth specific code as possible to help focus on Rx):

//Create a new subject
Subject<int> subject = new Subject<int>();

//Observe the subject until some pre-determined stopping criteria
bool waiting = true;
IObservable<int> sequence = subject.TakeWhile(x => waiting);

//Subscribe to the subject so that I can trigger the stopping criteria
IDisposable subscription = sequence.Subscribe(
                onNext: result =>
                {
                     if (result > 50)
                        waiting = false;
                },
                onCompleted: () =>
                {
                     return;
                });

//fake bluetooth messages
int i = 0;
while (i < 100)
    subject.OnNext(i++);

//Gather them all up once the sequence is complete
//***application hangs here***
List<int> ints = await sequence.ToList() as List<int>;

//This line of code is never run
subscription.Dispose();

I'm hoping that an Rx guy can help me understand why this ToList() call hangs. I just wrote this code on the spot for this question so if it doesn't make sense let me know and I'll update it.

Here is the actual code that uses a third-party bluetooth library and receives items from a bluetooth device.

    private static async Task<byte> WritePasswordToPeripheral<P>(P Peripheral, byte[] command) where P : Peripheral, IStatePeripheral
    {
        IGattService service = await Peripheral.RPHDevice.GetKnownService(BleService.Control);
        IGattCharacteristic characteristic = await service.GetKnownCharacteristics(BleCharacteristic.PasswordResult);

        //I know that this TakeWhile isn't necessary here because I'm using FirstAsync() later on
        //In some similar blocks I receive multiple notifications and so I need to decide when to stop listening in this way. 
        //In those situations I would call .ToList() instead of .FirstAsync()
        bool waiting = true;
        await characteristic.EnableNotifications().TakeWhile(x=>waiting);

        IObservable<CharacteristicGattResult> passwordResultSequence = characteristic
            .WhenNotificationReceived();

        IDisposable passwordResultSubscription = passwordResultSequence 
                                                    .Subscribe(
                                                    onNext: result =>
                                                    {
                                                        waiting = false;
                                                    },
                                                    onCompleted: () =>
                                                    {
                                                        return;
                                                    });

        try
        {
            await Peripheral.RPHDevice
                    .WriteCharacteristic(BleService.Control, BleCharacteristic.Password, command)
                    .Timeout(TimeSpan.FromSeconds(10));
        }
        catch (Exception)
        {
            return 0;
        }

        //In this case only one notification ever comes back and so FirstAsync would be nice
        var passwordResult = await passwordResultSequence.FirstAsync();
        await characteristic.DisableNotifications();
        passwordResultSubscription.Dispose();

        return passwordResult.Data[0];
    }

WhenNotificationsReceived:

    IObservable<CharacteristicGattResult> notifyOb;
    public override IObservable<CharacteristicGattResult> WhenNotificationReceived()
    {
        this.AssertNotify();

        this.notifyOb = this.notifyOb ?? Observable.Create<CharacteristicGattResult>(ob =>
        {
            var handler = new EventHandler<CBCharacteristicEventArgs>((sender, args) =>
            {
                if (!this.Equals(args.Characteristic))
                    return;

                if (args.Error == null)
                    ob.OnNext(new CharacteristicGattResult(this, args.Characteristic.Value?.ToArray()));
                else
                    ob.OnError(new BleException(args.Error.Description));
            });
            this.Peripheral.UpdatedCharacterteristicValue += handler;
            return () => this.Peripheral.UpdatedCharacterteristicValue -= handler;
        })
        .Publish()
        .RefCount();

        return this.notifyOb;
    }

1条回答
SAY GOODBYE
2楼-- · 2019-06-06 20:39

You've got a bunch of problems with your code.

First up, Rx is an asynchronous programming model that you're trying to run synchronously. Calling await sequence (and similarly sequence.Wait()) will cause you grief almost all of the time.

Next, you're creating two subscriptions to the sequence observable - once with the sequence.Subscribe(...) and again with the await sequence.ToList(). They are separate subscriptions to the underlying subject and they need to be treated as separate.

And finally, you're mixing external state (bool waiting = true) into your query subject.TakeWhile(x => waiting). That's bad as it is inherently non-thread-safe and you should code as if your query is running on multiple threads.

What is happening with your code is that the await sequence.ToList() is subscribing to your query AFTER you have pumped out your subject.OnNext(i++) values so the query never ends. No value is ever pushed out of the subject AFTER the .ToList() to trigger the .TakeWhile(x => waiting) to end the observable. .ToList() just sits there waiting for the OnCompleted that never comes.

You need to move the await sequence.ToList() to before you pumped out the values - which you can't do because it would still get stuck waiting for for the OnCompleted that never comes.

This is why you need to code asynchronously.

Now the two subscriptions also causes you a race condition. The sequence.Subscribemight set waiting to false before the sequence.ToList() gets any values. This is why you should code as if your query is running on multiple threads. So to avoid this you should only have one subscription.

You need to lose the .TakeWhile(x => waiting) and push the condition inside like this: subject.TakeWhile(x => x <= 50);.

Then you write your code like this:

//Create a new subject
Subject<int> subject = new Subject<int>();

//Observe the subject until some pre-determined stopping criteria
IObservable<int> sequence = subject.TakeWhile(x => x <= 50);

sequence
    .ToList()
    .Subscribe(list =>
    {
        Console.WriteLine(String.Join(", ", list));
    });

//fake bluetooth messages
int i = 0;
while (i < 100)
    subject.OnNext(i++);

This code runs and produces 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50 on to the console.

Don't write Rx code synchronously - lose the await. Don't run multiple subscriptions that could create race conditions. Don't introduce external state into your queries.

Also, with the WhenNotificationReceived method, you're not properly completing the sequence.

You're using the dangerous .Publish().RefCount() operator pair which creates a sequence that can't be subscribed to after it completes.

Try this example:

var query =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Take(3)
        .Publish()
        .RefCount();

var s1 = query.Subscribe(Console.WriteLine);

Thread.Sleep(2500);

var s2 = query.Subscribe(Console.WriteLine);

Thread.Sleep(2500);

s1.Dispose();
s2.Dispose();

var s3 = query.Subscribe(Console.WriteLine);

Thread.Sleep(2500);

s3.Dispose();

This only produces:

0
1
2
2

The s3 subscription produces nothing. I don't think that this is what you're after.

查看更多
登录 后发表回答