Rx produce and consume on different threads

2019-07-21 12:36发布

问题:

I have tried to simplify my issue by a sample code here. I have a producer thread constantly pumping in data and I am trying to batch it with a time delay between batches so that the UI has time to render it. But the result is not as expected, the produce and consumer seems to be on the same thread.

I don't want the batch buffer to sleep on the thread that is producing. Tried SubscribeOn did not help much. What am I doing wrong here, how do I get this to print different thread Ids on producer and consumer thread.

static void Main(string[] args)
{
    var stream = new ReplaySubject<int>();

    Task.Factory.StartNew(() =>
    {
        int seed = 1;
        while (true)
        {
            Console.WriteLine("Thread {0} Producing {1}",
                Thread.CurrentThread.ManagedThreadId, seed);

            stream.OnNext(seed);
            seed++;

            Thread.Sleep(TimeSpan.FromMilliseconds(500));
         }
    });

    stream.Buffer(5).Do(x =>
    {
        Console.WriteLine("Thread {0} sleeping to create time gap between batches",
            Thread.CurrentThread.ManagedThreadId);

        Thread.Sleep(TimeSpan.FromSeconds(2));
    })
    .SubscribeOn(NewThreadScheduler.Default).Subscribe(items =>
    {
        foreach (var item in items)
        {
            Console.WriteLine("Thread {0} Consuming {1}",
                Thread.CurrentThread.ManagedThreadId, item);
        }
    });
    Console.Read();
}

回答1:

Understanding the difference between ObserveOn and SubscribeOn is key here. See - ObserveOn and SubscribeOn - where the work is being done for an in depth explanation of these.

Also, you absolutely don't want to use a Thread.Sleep in your Rx. Or anywhere. Ever. Do is almost as evil, but Thead.Sleep is almost always totally evil. Buffer has serveral overloads you want to use instead - these include a time based overload and an overload that accepts a count limit and a time-limit, returning a buffer when either of these are reached. A time-based buffering will introduce the necessary concurrency between producer and consumer - that is, deliver the buffer to it's subscriber on a separate thread from the producer.

Also see these questions and answers which have good discussions on keeping consumers responsive (in the context of WPF here, but the points are generally applicable).

  • Process lots of small tasks and keep the UI responsive
  • Buffer data from database cursor while keeping UI responsive

The last question above specifically uses the time-based buffer overload. As I said, using Buffer or ObserveOn in your call chain will allow you to add concurrency between producer and consumer. You still need to take care that the processing of a buffer is still fast enough that you don't get a queue building up on the buffer subscriber.

If queues do build up, you'll need to think about means of applying backpressure, dropping updates and/or conflating the updates. These is a big topic too broad for in depth discussion here - but basically you either:

  • Drop events. There have been many ways discussed to tackle this in Rx. I current like Ignore incoming stream updates if last callback hasn't finished yet but also see With Rx, how do I ignore all-except-the-latest value when my Subscribe method is running and there are many other discussions of this.
  • Signal the producer out of band to tell it to slow down or send conflated updates, or
  • You introduce an operator that does in-stream conflation - like a smarter Buffer that could compress events to, for example, only include the latest price on a stock item etc. You can author operators that are sensitive to the time that OnNext invocations take to process, for example.

See if proper buffering helps first, then think about throttling/conflating events at the source as (a UI can only show so much infomation anway) - then consider smarter conflation as this can get quite complex. https://github.com/AdaptiveConsulting/ReactiveTrader is a good example of a project using some advanced conflation techniques.



回答2:

Although the other answers are correct, I'd like to identify your actual problem as perhaps a misunderstanding of the behavior of Rx. Putting the producer to sleep blocks subsequent calls to OnNext and it seems as though you're assuming Rx automatically calls OnNext concurrently, but in fact it doesn't for very good reasons. Actually, Rx has a contract that requires serialized notifications.

See §§4.2, 6.7 in the Rx Design Guidelines for details.

Ultimately, it looks as though you're trying to implement the BufferIntrospective operator from Rxx. This operator allows you to pass in a concurrency-introducing scheduler, similar to ObserveOn, to create a concurrency boundary between a producer and a consumer. BufferIntrospective is a dynamic backpressure strategy that pushes out heterogeneously-sized batches based on the changing latencies of an observer. While the observer is processing the current batch, the operator buffers all incoming concurrent notifications. To accomplish this, the operator takes advantage of the fact that OnNext is a blocking call (per the §4.2 contract) and for that reason this operator should be applied as close to the edge of the query as possible, generally immediately before you call Subscribe.

As James described, you could call it a "smart buffering" strategy itself, or see it as the baseline for implementing such a strategy; e.g., I've also defined a SampleIntrospective operator that drops all but the last notification in each batch.



回答3:

ObserveOn is probably what you want. It takes a SynchronizationContext as an argument, that should be the SynchronizationContext of your UI. If you don't know how to get it, see Using SynchronizationContext for sending events back to the UI for WinForms or WPF