C# reactive extensions what happens when OnNext ta

2019-07-19 08:40发布

I'm new to Rx and I'm thinking what happens when IObservable is producing lot of events very quickly and OnNext take very long time. I guess that new events are queued somehow internally so I can possible run our memory. Am I right? Consider following small example:

        Subject<int> subject = new Subject<int>();
        subject.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {      Console.WriteLine("Value published: {0}", x); Thread.Sleep(100); },
                     () => Console.WriteLine("Sequence Completed."));

        for (int i = 0; i < 10; i++)
        {
            subject.OnNext(i);
        }

I publish 10 events and consumer method is very slow. All events are processed so it must be cached in memory. In case that I publish lot of events I will run out of memory, correct? Or do I miss something?

Is there a way how to limit count of pending events in reactive extensions? For example where there is more than 5 pending events I want to omit new ones.

1条回答
Root(大扎)
2楼-- · 2019-07-19 08:58

Yes you are right, slow consumers will cause queuing. The closest built-in operator for what you are asking is Observable.Sample - however this drops older events in favour of more recent ones. This is the more usual requirement as it allows your slow observer to catch up.

Let me know if Sample is sufficient, because the buffering behaviour you describe is an unusual requirement - it is achievable, but quite complex to pull off and requires fairly non-trivial code.

EDIT - If you use Sample like this, it will return the latest event after each OnNext (you need to provide a Scheduler for this to work - and NewThreadScheduler creates a thread per subscription, not per event:

void Main()
{
    var source = Observable.Interval(TimeSpan.FromMilliseconds(10)).Take(100);

    source.Sample(TimeSpan.Zero, NewThreadScheduler.Default)
        .Subscribe(SlowConsumer);

    Console.ReadLine();
}

private void SlowConsumer(long item)
{    
    Console.WriteLine(item + " " + Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(TimeSpan.FromSeconds(1));
}
查看更多
登录 后发表回答