How to window/buffer IObservable into chunks ba

2019-06-24 07:51发布

问题:

Given a class:

class Foo { DateTime Timestamp {get; set;} }

...and an IObservable<Foo>, with guaranteed monotonically increasing Timestamps, how can I generate an IObservable<IList<Foo>> chunked into Lists based on those Timestamps?

I.e. each IList<Foo> should have five seconds of events, or whatever. I know I can use Buffer with a TimeSpan overload, but I need to take the time from the events themselves, not the wall clock. (Unless there a clever way of providing an IScheduler here which uses the IObservable itself as the source of .Now?)

If I try to use the Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries) overload like so:

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
    x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();

pub.Buffer(windows).Subscribe(x => t.Dump()));  // linqpad
pub.Connect();

...then the IList instances contain the item that causes the window to be closed, but I really want this item to go into the next window/buffer.

E.g. with timestamps [0, 1, 10, 11, 15] you will get blocks of [[0], [1, 10], [11, 15]] instead of [[0, 1], [10, 11], [15]]

回答1:

Here's an idea. The group key condition is the "window number" and I use GroupByUntil. This gives you the desired output in your example (and I've used an int stream just like that example - but you can substitute whatever you need to number your windows).

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();
        var xs = scheduler.CreateHotObservable<int>(
            OnNext(0, 0),
            OnNext(1, 1),
            OnNext(10, 10),
            OnNext(11, 11),
            OnNext(15, 15),
            OnCompleted(16, 0));                  

        xs.Publish(ps =>                                // (1)
            ps.GroupByUntil(
                p => p / 5,                             // (2)
                grp => ps.Where(p => p / 5 != grp.Key)) // (3)
            .SelectMany(x => x.ToList()))               // (4)
        .Subscribe(Console.WriteLine);

        scheduler.Start();
    }
}

Notes

  1. We publish the source stream because we will subscribe more than once.
  2. This is a function to create a group key - use this to generate a window number from your item type.
  3. This is the group termination condition - use this to inspect the source stream for an item in another window. Note that means a window won't close until an element outside of it arrives, or the source stream terminates. This is obvious if you think about it - your desired output requires consideration of next element after a window ends. Note if your source bears any relation to real time, you could merge this with an Observable.Timer+Select that outputs a null/default instance of your term to terminate the stream earlier.
  4. SelectMany puts the groups into lists and flattens the stream.

This example will run in LINQPad quite nicely if you include nuget package rx-testing. New up a Tests instance and just run the Test() method.



回答2:

Window is a generalization of Buffer, and GroupJoin is a generalization of Window (and Join). When you write a Window or Buffer query and you find that notifications are being incorrectly included or excluded from the edges of the windows/lists, then redefine your query in terms of GroupJoin to take control over where edge notifications arrive.

Note that in order to make the closing notifications available to newly opened windows you must define your boundaries as windows of those notifications (the windowed data, not the boundary data). In your case, you cannot use a sequence of DateTime values as your boundaries, you must use a sequence of Foo objects instead. To accomplish this, I've replaced your Select->DistinctUntilChanged query with a Scan->Where->Select query.

var batches = foos.Publish(publishedFoos => publishedFoos
  .Scan(
    new { foo = (Foo)null, last = DateTime.MinValue, take = true },
    (acc, foo) =>
    {
      var boundary = foo.Timestamp - acc.last >= TimeSpan.FromSeconds(5);

      return new
      {
        foo,
        last = boundary ? foo.Timestamp : acc.last,
        take = boundary
      };
    })
  .Where(a => a.take)
  .Select(a => a.foo)
  .Publish(boundaries => boundaries
    .Skip(1)
    .StartWith((Foo)null)
    .GroupJoin(
      publishedFoos,
      foo => foo == null ? boundaries.Skip(1) : boundaries,
      _ => Observable.Empty<Unit>(),
      (foo, window) => (foo == null ? window : window.StartWith(foo)).ToList())))
  .Merge()
  .Replay(lists => lists.SkipLast(1)
                        .Select(list => list.Take(list.Count - 1))
                        .Concat(lists),
          bufferSize: 1);

The Replay query at the end is only required if you expect the sequence to eventually end and you care about not dropping the last notification; otherwise, you could simply modify window.StartWith(foo) to window.StartWith(foo).SkipLast(1) to achieve the same basic results, though the last notification of the last buffer will be lost.



回答3:

I think James World's answer is neater/more readable, but for posterity, I've found another way to do this using Buffer():

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
        x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks))
    .DistinctUntilChanged().Publish.RefCount();

pub.Buffer(windows, x => windows).Subscribe(x => t.Dump()));
pub.Connect();

With 10m events, James' approach is more than 2.5x as fast (20s vs. 56s on my machine).