Use reactive extension to handle multiple response

2019-04-10 04:56发布

问题:

Situation

I have a system where one request produces two responses. The request and responses have corresponding observables:

IObservable<RequestSent> _requests;
IObservable<MainResponseReceived> _mainResponses;
IObservable<SecondResponseReceived> _secondaryResponses;

it is guaranteed that RequestSent event occurs earlier than MainResponseReceived and SecondaryResponseReceived but the responses come in random order.

What I have

Originally I wanted handler that handles both responses, so I zipped the observables:

_requests
    .SelectMany(async request =>
    {
        var main = _mainResponses.FirstAsync(m => m.Id == request.Id);
        var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id);

        var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived {
            Request = request,
            Main = m, 
            Secondary = s
        });
        return await zippedResponse.FirstAsync(); ;
    })
    .Subscribe(OnMainAndSecondaryResponseReceived);

What I need

Now I need to handle also MainResponseReceived without waiting for SecondaryResponseRecieved and it must be guaranteed, that the OnMainResponseRecieved completes before OnMainAndSecondaryResponseReceived is called

How to define the two subscriptions, please?

Test case 1:

  1. RequestSent occurs
  2. MainResponseReceived occurs -> OnMainResponseReceived is called
  3. SecondaryResponseReceived occurs -> OnMainAndSecondaryResponseReceived is called

Test case 2:

  1. RequestSent occurs
  2. SecondaryResponseReceived occurs
  3. MainResponseReceived occurs -> OnMainResponseReceived is called -> OnMainAndSecondaryResponseReceived is called

回答1:

I think you're pretty much on the right track. I would stop mucking around with all the Async stuff - that's just making things complicated.

Try this query:

var query =
    _requests
        .SelectMany(request =>
            _mainResponses.Where(m => m.Id == request.Id).Take(1)
                .Do(m => OnMainResponseReceived(m))
                .Zip(
                    _secondaryResponses.Where(s => s.Id == request.Id).Take(1),
                    (m, s) => new MainAndSecondaryResponseReceived()
                    {
                        Request = request,
                        Main = m, 
                        Secondary = s
                    }));

var subscription =
    query.Subscribe(x => OnMainAndSecondaryResponseReceived(x));

The .Do(...) is the important missing part in your code. It ensures that OnMainResponseReceived is called before OnMainAndSecondaryResponseReceived no matter if the main or secondary response comes in first.

I tested this with:

Subject<RequestSent> _requestsSubject = new Subject<RequestSent>();
Subject<MainResponseReceived> _mainResponsesSubject = new Subject<MainResponseReceived>();
Subject<SecondResponseReceived> _secondaryResponsesSubject = new Subject<SecondResponseReceived>();

IObservable<RequestSent> _requests = _requestsSubject.AsObservable();
IObservable<MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
IObservable<SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();

_requestsSubject.OnNext(new RequestSent() { Id = 42 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 42 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 42 });

_requestsSubject.OnNext(new RequestSent() { Id = 99 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 99 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 99 });