There is a pattern I'm having trouble with when working with observables.
I am working with a bluetooth device.
- I send a message to that device telling it to do something and notify me of the result or results.
- The device starts sending notifications (could go for 10ms or 20s)
- I wait for the device to finish sending notifications. sometimes this will be a specific message from the device and sometimes I just won't receive any more messages for a timeout period.
- I convert the messages to a single item or an IEnumerable and go on my merry way.
Example one:
- I enter a login command with a login message and a password
- The device sends back a success or failure message (usually 10ms or so)
- I wait for the message to come
- I use the message to tell if the user can continue or if they need to retry their password.
Example two:
- I send a command to the bluetooth device requesting all wifi networks in range
- The device turns on its wifi radio and sends back an unknown number of messages but stops at some point
- I wait for the messages to stop
- I present the complete list of wifi networks to the user
I think this should be done in this something close the following manner. (I've removed as much bluetooth specific code as possible to help focus on Rx):
//Create a new subject
Subject<int> subject = new Subject<int>();
//Observe the subject until some pre-determined stopping criteria
bool waiting = true;
IObservable<int> sequence = subject.TakeWhile(x => waiting);
//Subscribe to the subject so that I can trigger the stopping criteria
IDisposable subscription = sequence.Subscribe(
onNext: result =>
{
if (result > 50)
waiting = false;
},
onCompleted: () =>
{
return;
});
//fake bluetooth messages
int i = 0;
while (i < 100)
subject.OnNext(i++);
//Gather them all up once the sequence is complete
//***application hangs here***
List<int> ints = await sequence.ToList() as List<int>;
//This line of code is never run
subscription.Dispose();
I'm hoping that an Rx guy can help me understand why this ToList() call hangs. I just wrote this code on the spot for this question so if it doesn't make sense let me know and I'll update it.
Here is the actual code that uses a third-party bluetooth library and receives items from a bluetooth device.
private static async Task<byte> WritePasswordToPeripheral<P>(P Peripheral, byte[] command) where P : Peripheral, IStatePeripheral
{
IGattService service = await Peripheral.RPHDevice.GetKnownService(BleService.Control);
IGattCharacteristic characteristic = await service.GetKnownCharacteristics(BleCharacteristic.PasswordResult);
//I know that this TakeWhile isn't necessary here because I'm using FirstAsync() later on
//In some similar blocks I receive multiple notifications and so I need to decide when to stop listening in this way.
//In those situations I would call .ToList() instead of .FirstAsync()
bool waiting = true;
await characteristic.EnableNotifications().TakeWhile(x=>waiting);
IObservable<CharacteristicGattResult> passwordResultSequence = characteristic
.WhenNotificationReceived();
IDisposable passwordResultSubscription = passwordResultSequence
.Subscribe(
onNext: result =>
{
waiting = false;
},
onCompleted: () =>
{
return;
});
try
{
await Peripheral.RPHDevice
.WriteCharacteristic(BleService.Control, BleCharacteristic.Password, command)
.Timeout(TimeSpan.FromSeconds(10));
}
catch (Exception)
{
return 0;
}
//In this case only one notification ever comes back and so FirstAsync would be nice
var passwordResult = await passwordResultSequence.FirstAsync();
await characteristic.DisableNotifications();
passwordResultSubscription.Dispose();
return passwordResult.Data[0];
}
WhenNotificationsReceived:
IObservable<CharacteristicGattResult> notifyOb;
public override IObservable<CharacteristicGattResult> WhenNotificationReceived()
{
this.AssertNotify();
this.notifyOb = this.notifyOb ?? Observable.Create<CharacteristicGattResult>(ob =>
{
var handler = new EventHandler<CBCharacteristicEventArgs>((sender, args) =>
{
if (!this.Equals(args.Characteristic))
return;
if (args.Error == null)
ob.OnNext(new CharacteristicGattResult(this, args.Characteristic.Value?.ToArray()));
else
ob.OnError(new BleException(args.Error.Description));
});
this.Peripheral.UpdatedCharacterteristicValue += handler;
return () => this.Peripheral.UpdatedCharacterteristicValue -= handler;
})
.Publish()
.RefCount();
return this.notifyOb;
}