With Reactive Extensions (RX), is it possible to a

2019-04-08 14:36发布

问题:

I have a class which takes in a stream of events, and pushes out another stream of events.

All of the events use Reactive Extensions (RX). The incoming stream of events is pushed from an external source into an IObserver<T> using .OnNext, and the outgoing stream of events is pushed out using IObservable<T> and .Subscribe. I am using Subject<T> to manage this, behind the scenes.

I am wondering what techniques there are in RX to pause the output temporarily. This would mean that incoming events would build up in an internal queue, and when they are unpaused, the events would flow out again.

回答1:

Here is my solution using Buffer and Window operators:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
    var queue = source.Buffer(pauser.Where(toPause => !toPause),
                              _ => pauser.Where(toPause => toPause))
                      .SelectMany(l => l.ToObservable());

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
                         _ => pauser.Where(toPause => !toPause))
                 .Switch()
                 .Merge(queue);
}

Window is opened at subscription and every time 'true' is received from pauser stream. It closes when pauser provides 'false' value.

Buffer does what it supposed to do, buffers values that are between 'false' and 'true' from pauser. Once Buffer receives 'true' it outputs IList of values that are instantly streamed all at once.

DotNetFiddle link: https://dotnetfiddle.net/vGU5dJ



回答2:

Here's a reasonably simple Rx way to do what you want. I've created an extension method called Pausable that takes a source observable and a second observable of boolean that pauses or resumes the observable.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

It can be used like this:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

Now, the only thing I couldn't quite work out what you mean by your "incoming stream of events is an IObserver<T>". Streams are IObservable<T>. Observers aren't streams. It sounds like you're not doing something right here. Can you add to your question and explain further please?



回答3:

You can simulate pausing/unpausing with an Observable.

Once your pauseObservable emits a 'paused' value, buffer events until pauseObservable emits an 'unpaused' value.

Here's an example which uses BufferUntil implementation by Dave Sexton and Observable logic by Timothy Shields (from my own question a while back)

        //Input events, hot observable
        var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => i.ToString())
            .Publish().RefCount();

       //Simulate pausing from Keyboard, not actually relevant within this answer
        var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            k => KeyPressed += k, k => KeyPressed -= k)
            .Select(i => i.EventArgs.PressedKey)
            .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause
            .DistinctUntilChanged();

        //Let events through when not paused
        var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused})
            .Where(i => !i.paused) //not paused
            .Select(i => i.value)
            .Subscribe(Console.WriteLine);

        //When paused, buffer until not paused
        var pausedEvents = pauseObservable.Where(i => i)
            .Subscribe(_ =>
                source.BufferUntil(pauseObservable.Where(i => !i))
                    .Select(i => String.Join(Environment.NewLine, i))
                    .Subscribe(Console.WriteLine));

Room for improvement : maybe merge the two subscriptions to source (pausedEvents and notPausedEvents) as one.