Given a class:
class Foo { DateTime Timestamp {get; set;} }
...and an IObservable<Foo>
, with guaranteed monotonically increasing Timestamp
s, how can I generate an IObservable<IList<Foo>>
chunked into Lists based on those Timestamp
s?
I.e. each IList<Foo>
should have five seconds of events, or whatever. I know I can use Buffer
with a TimeSpan
overload, but I need to take the time from the events themselves, not the wall clock. (Unless there a clever way of providing an IScheduler
here which uses the IObservable
itself as the source of .Now
?)
If I try to use the Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)
overload like so:
IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();
pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad
pub.Connect();
...then the IList
instances contain the item that causes the window to be closed, but I really want this item to go into the next window/buffer.
E.g. with timestamps [0, 1, 10, 11, 15]
you will get blocks of [[0], [1, 10], [11, 15]]
instead of [[0, 1], [10, 11], [15]]
Here's an idea. The group key condition is the "window number" and I use GroupByUntil
. This gives you the desired output in your example (and I've used an int stream just like that example - but you can substitute whatever you need to number your windows).
public class Tests : ReactiveTest
{
public void Test()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateHotObservable<int>(
OnNext(0, 0),
OnNext(1, 1),
OnNext(10, 10),
OnNext(11, 11),
OnNext(15, 15),
OnCompleted(16, 0));
xs.Publish(ps => // (1)
ps.GroupByUntil(
p => p / 5, // (2)
grp => ps.Where(p => p / 5 != grp.Key)) // (3)
.SelectMany(x => x.ToList())) // (4)
.Subscribe(Console.WriteLine);
scheduler.Start();
}
}
Notes
- We publish the source stream because we will subscribe more than once.
- This is a function to create a group key - use this to generate a window number from your item type.
- This is the group termination condition - use this to inspect the source stream for an item in another window. Note that means a window won't close until an element outside of it arrives, or the source stream terminates. This is obvious if you think about it - your desired output requires consideration of next element after a window ends. Note if your source bears any relation to real time, you could merge this with an
Observable.Timer+Select
that outputs a null/default instance of your term to terminate the stream earlier.
- SelectMany puts the groups into lists and flattens the stream.
This example will run in LINQPad quite nicely if you include nuget package rx-testing. New up a Tests instance and just run the Test()
method.
Window
is a generalization of Buffer
, and GroupJoin
is a generalization of Window
(and Join
). When you write a Window
or Buffer
query and you find that notifications are being incorrectly included or excluded from the edges of the windows/lists, then redefine your query in terms of GroupJoin
to take control over where edge notifications arrive.
Note that in order to make the closing notifications available to newly opened windows you must define your boundaries as windows of those notifications (the windowed data, not the boundary data). In your case, you cannot use a sequence of DateTime values as your boundaries, you must use a sequence of Foo objects instead. To accomplish this, I've replaced your Select
->DistinctUntilChanged
query with a Scan
->Where
->Select
query.
var batches = foos.Publish(publishedFoos => publishedFoos
.Scan(
new { foo = (Foo)null, last = DateTime.MinValue, take = true },
(acc, foo) =>
{
var boundary = foo.Timestamp - acc.last >= TimeSpan.FromSeconds(5);
return new
{
foo,
last = boundary ? foo.Timestamp : acc.last,
take = boundary
};
})
.Where(a => a.take)
.Select(a => a.foo)
.Publish(boundaries => boundaries
.Skip(1)
.StartWith((Foo)null)
.GroupJoin(
publishedFoos,
foo => foo == null ? boundaries.Skip(1) : boundaries,
_ => Observable.Empty<Unit>(),
(foo, window) => (foo == null ? window : window.StartWith(foo)).ToList())))
.Merge()
.Replay(lists => lists.SkipLast(1)
.Select(list => list.Take(list.Count - 1))
.Concat(lists),
bufferSize: 1);
The Replay
query at the end is only required if you expect the sequence to eventually end and you care about not dropping the last notification; otherwise, you could simply modify window.StartWith(foo)
to window.StartWith(foo).SkipLast(1)
to achieve the same basic results, though the last notification of the last buffer will be lost.
I think James World's answer is neater/more readable, but for posterity, I've found another way to do this using Buffer()
:
IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks))
.DistinctUntilChanged().Publish.RefCount();
pub.Buffer(windows, x => windows).Subscribe(x => t.Dump()));
pub.Connect();
With 10m events, James' approach is more than 2.5x as fast (20s vs. 56s on my machine).