I'm trying to reorder events arriving unordered on different threads.
Is it possible to create a reactive extension query that matches these marble diagrams:
s1 1 2 3 4
s2 1 3 2 4
result 1 2 3 4
and...
s1 1 2 3 4
s2 4 3 2 1
result 1234
That is: Only publish results in version number order.
The closest I have got is using a Join to open a window each time s1 ticks and only close it when s2 arrives with the same number.
Like this:
var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
publishedEvents.Scan(0, (i, o) => i + 1),
expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
_ => Observable.Never<Unit>(),
(@event, expectedVersion) => new {@event,expectedVersion})
.Where(x => x.expectedVersion == x.@event.Version)
.Select(x => x.@event)
.Subscribe(Persist);
But that won't work with diagram no 2. Group 2 will be completed once s2 ticks with the number 2, and thus before 1.
Does it make sense? Can it be done with Rx? Should it?
EDIT: I guess it's like overlapping windows, where later windows can't close before all preceding windows have closed. And the preceding windows won't close before the window number matches the event version number.
EDIT 2:
I have something like this now, but it's not really the reactive, functional, thread-safe LINQ-revelation, I hoped for (please ignore that my events are JObjects for now):
var orderedEvents = Observable.Create<JObject>(observer =>
{
var nextVersionExpected = 1;
var previousEvents = new List<JObject>();
return events
.ObserveOn(Scheduler.CurrentThread)
.Subscribe(@event =>
{
previousEvents.Add(@event);
var version = (long) @event["Version"];
if (version != nextVersionExpected) return;
foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList())
{
if ((long) previousEvent["Version"] != nextVersionExpected)
break;
observer.OnNext(previousEvent);
previousEvents.Remove(previousEvent);
nextVersionExpected++;
}
});
});
Introduction
The key to this problem is the sort. Anyway you look at it, some form of buffering is required. Whilst no doubt some elaborate combination of operators might achieve this, I think this is a good example where
Observable.Create
is a good choice.Generalizing the solution
I've made some effort to generalize my approach to accept any type of ordering key. To do this, I expect to be given:
Func<TSource,TKey>
TKey
Func<TKey,TKey>
Func<TSource,TSource,TSource>
Since I'm just using a 1-based integer sequence for my tests these are satisfied by:
i => i
1
k => k+1
(left,right) => left
Sort
Here is my
Sort
attempt. It buffers events into a Dictionary and flushes them as soon as possible to the subscriber:I have to say this is a pretty naïve implementation. In production code in the past I have elaborated on this with specific error handling, a fixed-size buffer and time-outs to prevent resource leakage. However, it will do for this example. :)
With this sorted (sorry!), we can now look at handling multiple streams.
Combining Results
First Attempt
My first attempt at this is to produce an unordered stream of events that have been seen the required number of times. This could then be sorted. I do this by grouping elements by key, using
GroupByUntil
to hold each group until two elements had been captured. Each group is then a stream of results of the same key. For the simple example of integer events, I can just take the last element of each group. However, I don't like this because it's awkward for more real-world scenarios where each result stream may be contributing something useful. I include the code for the sake of interest. Note, so that the tests can be shared between this and my second attempt, I accept an unused resultSelector parameter:Aside: You can hack on the
SelectMany
clause to decide how to pick results. One advantage this solution has over the second attempt, is that in scenarios with many result streams it is easier to see how to extend it to pick say, the first two out of three result tuples to arrive.Second Attempt
For this approach I sort each stream independently, and then
Zip
the results together. Not only is this a far simpler looking operation, it's also far easier to combine results from each stream in interesting ways. To keep the tests compatible with my first approach, I pick the resultSelector function to use the first stream's events as the results, but obviously you have flexibility to do something useful in your scenario:Aside: It isn't too hard to see how this code be extended to a more general case accepting any number of input streams, but as alluded to earlier, using
Zip
makes it is quite inflexible about blocking at a given key until results from all streams are in.Test Cases
Finally, here are my tests echoing your example scenarios. To run these, import nuget packages
rx-testing
andnunit
and put the implementations above into a static class:Currying to avoid repetition
Final comment, because I hate repeating myself in code, here's a tweak that avoids the repetitious way I call
Sort
in the second approach. I've not included it in the main body to avoid confusing readers unfamiliar with currying: