There is a pattern I'm having trouble with when working with observables.
I am working with a bluetooth device.
- I send a message to that device telling it to do something and notify me of the result or results.
- The device starts sending notifications (could go for 10ms or 20s)
- I wait for the device to finish sending notifications. sometimes this will be a specific message from the device and sometimes I just won't receive any more messages for a timeout period.
- I convert the messages to a single item or an IEnumerable and go on my merry way.
Example one:
- I enter a login command with a login message and a password
- The device sends back a success or failure message (usually 10ms or so)
- I wait for the message to come
- I use the message to tell if the user can continue or if they need to retry their password.
Example two:
- I send a command to the bluetooth device requesting all wifi networks in range
- The device turns on its wifi radio and sends back an unknown number of messages but stops at some point
- I wait for the messages to stop
- I present the complete list of wifi networks to the user
I think this should be done in this something close the following manner. (I've removed as much bluetooth specific code as possible to help focus on Rx):
//Create a new subject
Subject<int> subject = new Subject<int>();
//Observe the subject until some pre-determined stopping criteria
bool waiting = true;
IObservable<int> sequence = subject.TakeWhile(x => waiting);
//Subscribe to the subject so that I can trigger the stopping criteria
IDisposable subscription = sequence.Subscribe(
onNext: result =>
{
if (result > 50)
waiting = false;
},
onCompleted: () =>
{
return;
});
//fake bluetooth messages
int i = 0;
while (i < 100)
subject.OnNext(i++);
//Gather them all up once the sequence is complete
//***application hangs here***
List<int> ints = await sequence.ToList() as List<int>;
//This line of code is never run
subscription.Dispose();
I'm hoping that an Rx guy can help me understand why this ToList() call hangs. I just wrote this code on the spot for this question so if it doesn't make sense let me know and I'll update it.
Here is the actual code that uses a third-party bluetooth library and receives items from a bluetooth device.
private static async Task<byte> WritePasswordToPeripheral<P>(P Peripheral, byte[] command) where P : Peripheral, IStatePeripheral
{
IGattService service = await Peripheral.RPHDevice.GetKnownService(BleService.Control);
IGattCharacteristic characteristic = await service.GetKnownCharacteristics(BleCharacteristic.PasswordResult);
//I know that this TakeWhile isn't necessary here because I'm using FirstAsync() later on
//In some similar blocks I receive multiple notifications and so I need to decide when to stop listening in this way.
//In those situations I would call .ToList() instead of .FirstAsync()
bool waiting = true;
await characteristic.EnableNotifications().TakeWhile(x=>waiting);
IObservable<CharacteristicGattResult> passwordResultSequence = characteristic
.WhenNotificationReceived();
IDisposable passwordResultSubscription = passwordResultSequence
.Subscribe(
onNext: result =>
{
waiting = false;
},
onCompleted: () =>
{
return;
});
try
{
await Peripheral.RPHDevice
.WriteCharacteristic(BleService.Control, BleCharacteristic.Password, command)
.Timeout(TimeSpan.FromSeconds(10));
}
catch (Exception)
{
return 0;
}
//In this case only one notification ever comes back and so FirstAsync would be nice
var passwordResult = await passwordResultSequence.FirstAsync();
await characteristic.DisableNotifications();
passwordResultSubscription.Dispose();
return passwordResult.Data[0];
}
WhenNotificationsReceived:
IObservable<CharacteristicGattResult> notifyOb;
public override IObservable<CharacteristicGattResult> WhenNotificationReceived()
{
this.AssertNotify();
this.notifyOb = this.notifyOb ?? Observable.Create<CharacteristicGattResult>(ob =>
{
var handler = new EventHandler<CBCharacteristicEventArgs>((sender, args) =>
{
if (!this.Equals(args.Characteristic))
return;
if (args.Error == null)
ob.OnNext(new CharacteristicGattResult(this, args.Characteristic.Value?.ToArray()));
else
ob.OnError(new BleException(args.Error.Description));
});
this.Peripheral.UpdatedCharacterteristicValue += handler;
return () => this.Peripheral.UpdatedCharacterteristicValue -= handler;
})
.Publish()
.RefCount();
return this.notifyOb;
}
You've got a bunch of problems with your code.
First up, Rx is an asynchronous programming model that you're trying to run synchronously. Calling
await sequence
(and similarlysequence.Wait()
) will cause you grief almost all of the time.Next, you're creating two subscriptions to the
sequence
observable - once with thesequence.Subscribe(...)
and again with theawait sequence.ToList()
. They are separate subscriptions to the underlyingsubject
and they need to be treated as separate.And finally, you're mixing external state (
bool waiting = true
) into your querysubject.TakeWhile(x => waiting)
. That's bad as it is inherently non-thread-safe and you should code as if your query is running on multiple threads.What is happening with your code is that the
await sequence.ToList()
is subscribing to your query AFTER you have pumped out yoursubject.OnNext(i++)
values so the query never ends. No value is ever pushed out of the subject AFTER the.ToList()
to trigger the.TakeWhile(x => waiting)
to end the observable..ToList()
just sits there waiting for theOnCompleted
that never comes.You need to move the
await sequence.ToList()
to before you pumped out the values - which you can't do because it would still get stuck waiting for for theOnCompleted
that never comes.This is why you need to code asynchronously.
Now the two subscriptions also causes you a race condition. The
sequence.Subscribe
might setwaiting
tofalse
before thesequence.ToList()
gets any values. This is why you should code as if your query is running on multiple threads. So to avoid this you should only have one subscription.You need to lose the
.TakeWhile(x => waiting)
and push the condition inside like this:subject.TakeWhile(x => x <= 50);
.Then you write your code like this:
This code runs and produces
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50
on to the console.Don't write Rx code synchronously - lose the
await
. Don't run multiple subscriptions that could create race conditions. Don't introduce external state into your queries.Also, with the
WhenNotificationReceived
method, you're not properly completing the sequence.You're using the dangerous
.Publish().RefCount()
operator pair which creates a sequence that can't be subscribed to after it completes.Try this example:
This only produces:
The
s3
subscription produces nothing. I don't think that this is what you're after.