As requested by Lee Campbell, this is a follow-on question to this original. It is intended to present the question in the context of the use case I was attempting to solve.
I have a WebApiService
that wraps a raw web API and provides token management. That is, it keeps track of the authentication token, passing it through to the raw API. Here's an example of one of the public methods in the WebApiService
:
public IObservable<User> UpdateUserAsync(int id, UpdateUserRequest request) =>
this
.EnsureAuthenticatedAsync()
.SelectMany(
_ =>
this
.rawWebApi
.UpdateUserAsync(this.TokenValue, id, request));
As you can, it simply calls EnsureAuthenticatedAsync
prior to forwarding onto the raw web API, passing in the token using this.TokenValue
.
The EnsureAuthenticatedAsync
method looks like this:
public IObservable<Unit> EnsureAuthenticatedAsync() =>
this
.Token
.FirstAsync()
.SelectMany(token => string.IsNullOrWhiteSpace(token) ? this.authenticate : Observable.Return(Unit.Default));
My original question was spurred by my attempts to write the authentication pipeline (this.authenticate
in the above). Note that this was the first step towards replacing the entire EnsureAuthenticatedAsync
method with a single observable.
For authenticate
, I wanted an observable that:
- does nothing until someone subscribes (cold/lazy)
- only does its work once, even if there are multiple subscribers at once
- does its work again if all subscribers disconnect
To that end, I came up with something like this:
this.authenticate = Observable
.Defer(() =>
Observable
.Create<Unit>(
async (observer, cancellationToken) =>
{
while (!cancellationToken.IsCancellationRequested)
{
var result = await this
.authenticationService
.AuthenticateAsync(this);
if (result.WasSuccessful)
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
return;
}
}
}))
.Publish()
.RefCount();
The idea here is to allow any number of simultaneous calls to WebApiService
methods to result in a single authentication loop being executed. Once authenticated, all subscribers will complete and any future subscriber would mean we need to re-authenticate again and thus re-execute the deferred observable.
Of course, the above observable suffers from the same problem as the simplified one in my original question: once the deferred observable completes once, Publish
will immediately complete any future observable (despite the deferred observable being re-requested).
As hinted above, my ultimate goal would be to replace EnsureAuthenticatedAsync
altogether with a pipeline that only executes this authentication when the token is null. But that was step two, and I failed at one :)
So to bring this back to the original question: is there a way of writing a pipeline that will execute once regardless of the number of current subscribers, but execute again if all subscribers disconnect and reconnect again?