I have an observable sequence of event objects and a number of observers handling specific types of events. I need to accomplish the following scenarios:
- Some event types need to be handled by the first observer matching a condition (e.g. observable.SubscribeExclusively(x=>{}) and become "unobservable" to the others.
- If there are no subscriptions, set some default handler (e.g. observable.SubscribeIfNoSubscriptions(x=>{})) so that no items get lost (this handler may for example save the item to a database so that it is processed later).
Is there a way to handle these cases with Rx?
I'm not sure I quite grok your scenario, but how does this strike you:
IObservable<Event> streamOfEvents.SelectMany(x => {
if (matchesExclusiveItem1(x)) {
x += exclusiveItem1Handler;
return Observable.Empty<Event>();
}
// Default case
return Observable.Return(x);
}).Subscribe(x => {
// Default case
x += defaultHandler;
});
I'm using "Event objects" because that's what you specified, but it'd probably be better to use IObservable<IObservable<T>>
- this selector has side effects (connecting the event), which is less than good.
The "Exclusivity" is easier - you just have everybody else subscribe to the filtered output of the exclusive observer.
"Default" is harder - RX programming is a functional programming and subscribers do not know of each other, while by definition having a "Default" subscriber means having some state shared between observers. One way to have shared state is to create a producer/consumer queue using ConcurrentBag or BufferBlock from TPL DataFlow. Another way is to attach the "processed" state to the event itself using a class like this:
public class Handled<T>
{
public bool IsHandled { get; set; }
public T Data { get; set; }
}
In any case, you would have to give the observers some time to react before you employ the "default" handler. The code below illustrates both "Exclusive" and "Default" concepts:
var source = new[] {0, 1, 2, 3, 4}.ToObservable();
var afterExclusive = source
.Where(x =>
{
if (x == 0)
{
Console.WriteLine("exclusive");
return false;
}
return true;
})
.Select(x => new Handled<int> {Data = x})
.Publish(); // publish is a must otherwise
afterExclusive // we'll get non shared objects
.Do(x => { x.IsHandled = true; })
.Subscribe();
afterExclusive
.Delay(TimeSpan.FromSeconds(1))
.Where(x => !x.IsHandled)
.Subscribe(x => Console.WriteLine("missed by all {0}", x));
afterExclusive.Connect();