Why is RefCount not working after all initial subs

2020-07-23 04:10发布

问题:

As requested by Lee Campbell, this is a follow-on question to this original. It is intended to present the question in the context of the use case I was attempting to solve.

I have a WebApiService that wraps a raw web API and provides token management. That is, it keeps track of the authentication token, passing it through to the raw API. Here's an example of one of the public methods in the WebApiService:

public IObservable<User> UpdateUserAsync(int id, UpdateUserRequest request) =>
    this
        .EnsureAuthenticatedAsync()
        .SelectMany(
            _ =>
                this
                    .rawWebApi
                    .UpdateUserAsync(this.TokenValue, id, request));

As you can, it simply calls EnsureAuthenticatedAsync prior to forwarding onto the raw web API, passing in the token using this.TokenValue.

The EnsureAuthenticatedAsync method looks like this:

public IObservable<Unit> EnsureAuthenticatedAsync() =>
    this
        .Token
        .FirstAsync()
        .SelectMany(token => string.IsNullOrWhiteSpace(token) ? this.authenticate : Observable.Return(Unit.Default));

My original question was spurred by my attempts to write the authentication pipeline (this.authenticate in the above). Note that this was the first step towards replacing the entire EnsureAuthenticatedAsync method with a single observable.

For authenticate, I wanted an observable that:

  1. does nothing until someone subscribes (cold/lazy)
  2. only does its work once, even if there are multiple subscribers at once
  3. does its work again if all subscribers disconnect

To that end, I came up with something like this:

this.authenticate = Observable
    .Defer(() =>
        Observable
            .Create<Unit>(
                async (observer, cancellationToken) =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        var result = await this
                            .authenticationService
                            .AuthenticateAsync(this);

                        if (result.WasSuccessful)
                        {
                            observer.OnNext(Unit.Default);
                            observer.OnCompleted();

                            return;
                        }
                    }
                }))
    .Publish()
    .RefCount();

The idea here is to allow any number of simultaneous calls to WebApiService methods to result in a single authentication loop being executed. Once authenticated, all subscribers will complete and any future subscriber would mean we need to re-authenticate again and thus re-execute the deferred observable.

Of course, the above observable suffers from the same problem as the simplified one in my original question: once the deferred observable completes once, Publish will immediately complete any future observable (despite the deferred observable being re-requested).

As hinted above, my ultimate goal would be to replace EnsureAuthenticatedAsync altogether with a pipeline that only executes this authentication when the token is null. But that was step two, and I failed at one :)

So to bring this back to the original question: is there a way of writing a pipeline that will execute once regardless of the number of current subscribers, but execute again if all subscribers disconnect and reconnect again?

回答1:

Observable sequences can't complete more than once. What you want here is to remove the OnCompleted call so that authenticate can't complete more than once, and add .Take(1) to EnsureAuthenticatedAsync so that the subscription to authenticate will complete after one value.

Below is a working console app. Replace references to obs1 (which has Take) with obs to reproduce your problem. In both cases you can press enter quickly to have all four subscribers get the same value.

class Program
{
    static int value = 0;

    static void Main(string[] args)
    {
        var obs = Observable.Create<int>(observer =>
        {
            Console.WriteLine("Generating");

            Interlocked.Increment(ref value);

            return Observable.Return(value)
                .Delay(TimeSpan.FromSeconds(1))
                .Subscribe(observer);
        })
        .Publish() 
        .RefCount();

        var obs1 = obs.Take(1);

        obs1.Subscribe(
            i => Console.WriteLine("First {0}", i), 
            () => Console.WriteLine("First complete"));
        obs1.Subscribe(
            i => Console.WriteLine("Second {0}", i), 
            () => Console.WriteLine("Second complete"));

        Console.ReadLine();

        obs1.Subscribe(
            i => Console.WriteLine("Third {0}", i), 
            () => Console.WriteLine("Third complete"));
        obs1.Subscribe(
            i => Console.WriteLine("Fourth {0}", i), 
            () => Console.WriteLine("Fourth complete"));

        Console.WriteLine("Press enter to exit");
        Console.ReadLine();
    }
}