Data service in Reactive Extension

2020-07-22 17:05发布

问题:

I want to have a generic class for an in-memory cache of data handling creations, updates and deletions. Underlying model inherits from an interface with an Id of type string.

interface IModel
{
  string Id { get; }
}

Handling creations & updates is easy. For instance, if I want to subscribe to the stream and populate a Dictionary, I know that a creation is needed if the model Id is not already there, otherwise it is an update.

My question is:
How would you handle deletions without introducing another class to wrap my models? I would like to keep an IObservable<TModel>, not something like IObservable<Event<TModel>> or IObservable<Pair<string, TModel>>, but I don't see how. Is that possible?

interface IDataService<TModel>
{
  IObservable<TModel> DataStream { get; }
}

回答1:

As @Enigmativity suggested you could use nested observable sequences to solve this problem. This is touched on in the Sequences of coincidence section in IntroToRx.

How would this work?

You can think of nested sequences to be something like a 2d Array, or even more specifically a jagged array. The outer sequence is the container for the inner sequences. The arrival of an inner sequence represents a creation of a model.

interface IDataService<TModel>
{
    IObservable<IObservable<TModel>> DataStream { get; }
}

Once you have an inner sequence all values that it produces are updates (except for the first one). An inner sequence will only produce updates for a single id. When the inner sequence completes, the represent a deletion.

This pattern works well for various use-cases, as outlined in the initial paragraph in the link above.

As a marble diagram you would have something like the following. Each row represents an inner sequence.

m1  1---2----3--|
m2     a----|
m3       x----y---z--

This would result in the following logical flow:

  1. Create m1 with the state of '1'
  2. Create m2 with the state of 'a'
  3. Update m1 with value '2'
  4. Create m3 with value 'x'
  5. Delete m2
  6. Update m1 with value '3'
  7. Update m3 with value 'y'
  8. Delete m1
  9. Update m3 with value 'z'