What are the functional benefits of recursive sche

2019-08-26 03:11发布

问题:

I'm current reading http://www.introtorx.com/ and I'm getting really interested in stripping Subject<T> out of my reactive code. I'm starting to understand how to encapsulate sequence generation so that I can reason better about a given sequence. I read a few SO questions and ended up reading about scheduling. Of particular interest is recursive scheduling, using the Schedule(this IScheduler scheduler, Action<TState,Action<TState>>)overloads - like this one.

The book is starting to show its age in a few areas, and the biggest i see is that it never compares its techniques to alternatives that may be achieved using the Task and async/await language features. I always end up feeling like I could write less code by ignoring the book advice and using the asynchronous toys, but the back of my mind nags me about being lazy and not learning the pattern properly.

With that, here is my question. If I wanted to schedule a sequence at an interval, support cancellation and perform work on a background thread, I might do this:

    static void Main(string[] args)
    {
        var sequence = Observable.Create<object>(o =>
        {
            CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
            DoWerk(o, cancellationTokenSource);

            return cancellationTokenSource.Cancel;
        });

        sequence.Subscribe(p => Console.Write(p));

        Console.ReadLine();
    }

    private static async void DoWerk(IObserver<object> o, CancellationTokenSource cancellationTokenSource)
    {
        string message =  "hello world!";
        for (int i = 0; i < message.Length; i++)
        {
            await Task.Delay(250, cancellationTokenSource.Token);
            o.OnNext(message[i]);

            if (cancellationTokenSource.IsCancellationRequested)
            {
                break;
            }
        }

        o.OnCompleted();
    }

Note the use of async void to create concurrency without explicitly borrowing a thread pool thread with Task.Run(). await Task.Delay() will, however, do just that but it will not lease the thread for long.

What are the limitations and pitfalls here? What are the reasons that you might prefer to use recursive scheduling?

回答1:

I personally wouldn't use await Task.Delay(250, cancellationTokenSource.Token); as a way to slow down a loop. It's better than Thread.Sleep(250), but it's still code smell to me.

I would look at it that you should use a built-in operator in preference to a roll-your-own solution like this.

The operator you need is one of the most powerful, but often overlooked. Try Observable.Generate. He's how:

static void Main(string[] args)
{
    IObservable<char> sequence = Observable.Create<char>(o =>
    {
        string message =  "hello world!";
        return
            Observable
                .Generate(
                    0,
                    n => n < message.Length,
                    n => n + 1,
                    n => message[n],
                    n => TimeSpan.FromMilliseconds(250.0))
                .Subscribe(o);
    });

    using (sequence.Subscribe(p => Console.Write(p)))
    {
        Console.ReadLine();
    }
}

This is self-cancelling (when you call .Dispose() on the subscription) and produces values every 250.0 milliseconds.

I've continued to use the Observable.Create operator to ensure that the message variable is encapsulated within the observable - otherwise it is possible for someone to change the value of message as the observable is working with it and thus break it.

As an alternative, that might not be as efficient with memory, but is self-encapsulating, try this:

IObservable<char> sequence =
    Observable
        .Generate(
            "hello world!",
            n => !String.IsNullOrEmpty(n),
            n => n.Substring(1),
            n => n[0],
            n => TimeSpan.FromMilliseconds(250.0));

And, finally, there's nothing "recursive" about the scheduling in your question. What did you mean by that?


I finally figured out what you're looking at. I missed it in the question.

Here's an example using the recursive scheduling:

IObservable<char> sequence = Observable.Create<char>(o =>
{
    string message = "hello world!";
    return Scheduler.Default.Schedule<string>(message, TimeSpan.FromMilliseconds(250.0), (state, schedule) =>
    {
        if (!String.IsNullOrEmpty(state))
        {
            o.OnNext(state[0]);
            schedule(state.Substring(1), TimeSpan.FromMilliseconds(250.0));
        }
        else
        {
            o.OnCompleted();
        }
    });
});