RX - Zip outputs an unexpected result

2019-06-04 21:18发布

Please help me understand a phenomenon :

Why is X NOT equal to index in the Observable's items ?

Building blocks for example :

        public class EcgSample
        {               
            public EcgSample(int y)
            {
                Y = y;   
            } 

            public int X { get; set; }
            public int Y { get; set; }  
        }

        private void Print(Tuple<EcgSample, int> s)
        {
              Debug.WriteLine("X : {0} , Y : {1} , Index : {2}", s.Item1.X, s.Item1.Y, s.Item2);
        }

        private List<EcgSample> CreateSamples()
        {
            var testSamples = new List<EcgSample>();

            for (short i = 0; i < 1400; i++)
            {
               testSamples.Add(new EcgSample(i));   
            }

            return testSamples;
        }

Example observable : (Which outputs an expected result)

       // (1) Create From Collection .
       IObservable<EcgSample> sampleObservable = CreateSamples().ToObservable(new EventLoopScheduler());

       // (2) Repeat 
       IObservable<EcgSample> repeated = sampleObservable.Repeat();

       // (3) Indexed 
       IObservable<Tuple<EcgSample,int>> indexed = repeated.Select((item, index) =>
       {
           item.X = index;
           return new Tuple<EcgSample, int>(item, index);
       }); 

       // (4) Buffered 
       IObservable<IList<Tuple<EcgSample, int>>> buffered = indexed.Buffer(250); 

       // (5) SelectMany and Print .
       _disposable = buffered.SelectMany(buf => buf).Subscribe(Print);

OUTPUT : This is the expected output of the Observable sequence .

       [8384] X : 0 , Y : 0 , Index : 0 
       [8384] X : 1 , Y : 1 , Index : 1 
       [8384] X : 2 , Y : 2 , Index : 2 
       [8384] X : 3 , Y : 3 , Index : 3 
       [8384] X : 4 , Y : 4 , Index : 4 

Modification : (Which DOES NOT outputs an UN-Expected result)

Now i wan't each buffer to be taken every interval :

     // (5) Create an Observable from a Timer. 
     IObservable<ElapsedEventArgs> timerObservable = Observable.Create<ElapsedEventArgs>(
            observer =>
            {
                var timer = new Timer();
                timer.Interval = 250;
                timer.Elapsed += (s, e) => observer.OnNext(e);
                timer.Start();
                return Disposable.Create(() =>
                {
                    timer.Stop();
                });
            });

        // (6) Zip with the buffer observable 
        IObservable<IList<Tuple<EcgSample, int>>> zipped = timerObservable.Zip(buffered, (t, b) => b);

        // (7) SelectMany and Print .
        _disposable = zipped.SelectMany(buf => buf).Subscribe(Print);

OUTPUT : This outputs an unexpected result : notice that X is not equal to index.

   [9708] X : 187600 , Y : 0 , Index : 0 
   [9708] X : 187601 , Y : 1 , Index : 1 
   [9708] X : 187602 , Y : 2 , Index : 2 
   [9708] X : 187603 , Y : 3 , Index : 3 

Any ideas why X starts at 187600 ( Needless to say this value is different every time i run my program) ..?

EDIT :

I Solved the issue by simply projecting at the end , but i would still like to know why the first issue occurs .

        List<EcgSample> list = CreateSamples();     

        var loop = new EventLoopScheduler();
        var sampleObservable = list.ToObservable(loop);

        IObservable<EcgSample> reapeted = sampleObservable.Repeat();

        IObservable<IList<EcgSample>> buffered = reapeted.Buffer(250);

        IObservable<ElapsedEventArgs> timerObservable = Observable.Create<ElapsedEventArgs>(
            observer =>
            {
                var timer = new Timer();
                timer.Interval = 250;
                timer.Elapsed += (s, e) => observer.OnNext(e);
                timer.Start();
                return Disposable.Create(() =>
                {
                    timer.Stop();
                });
            });

        IObservable<IList<EcgSample>> zipped = timerObservable.Zip(buffered, (t, b) => b);

        _disposable = zipped.SelectMany(buf => buf).Select((item, index) =>
        {
            item.X = index;
            return new Tuple<EcgSample, int>(item, index);

        }).Subscribe(Print);

1条回答
我想做一个坏孩纸
2楼-- · 2019-06-04 22:04

Your answer shows a single thing you can change to get the behaviour you want, but it's not really the reason why it didn't work the way you expected.

If you want to associate each entry in the Observable with a number, you should actually associate it with a number. The way you're doing it, there's no actual connection between each element in the stream and the number. Your fix just ensures that you handle each item before the next one comes through, so the number happens to be at the right value. But that's a very flaky situation.

If you just want a running count of what item you're up to on the stream, have a look at the overload of Select that gives you the index:

stream.Select((item, index) => new { item, index })
      .Subscribe(data => Debug.WriteLine("Item at index {0} is {1}", data.index, data.item))

Alternately, if you want something that's different from just a count of items on the stream, you could do something like:

stream.Select(item => new { item, index = <some value you calculate> })
...

This way your object and its index are tied together. You can use the item's index at any future point and still know what its index was. Whereas your code relies on getting to each item before the next one is processed.

To address the edits in your question

Firstly, have a look at Observable.Interval. It does what you're trying to do with your timer, but much more easily.

Secondly, have a look at the below example which reproduces what you're doing in your question. Running this code produces the correct output:

var items = Enumerable.Range(65, 26)
                      .Select(i => (char)i)
                      .Repeat();

var observableItems = items.ToObservable()
                           .Select((c, i) => new { Char = c, Index = i });

var interval = Observable.Interval(TimeSpan.FromSeconds(0.25));

var buffered = observableItems.Buffer(10);
var zipped = buffered.Zip(interval, (buffer, _) => buffer);

zipped.SelectMany(buffer => buffer).Dump();

You can run that code in LinqPad, which is a very useful tool for exploring Rx (and other parts of .Net).

Lastly - I assume this is a simplified exercise to try to work out what's happening in your situation. It looks like you're possibly trying to cope with sensor data that pushes more updates than you want to handle. Using Zip with an interval won't help much with that. You'll slow the rate of arrival of data, but it will just build up a bigger and bigger queue of data waiting to get through Zip.

If you want to get a data point every 250 milliseconds, look at Sample. If you want to get 250 milliseconds worth of readings at a time, look at the overload of Buffer that takes a timespan instead of a count.

查看更多
登录 后发表回答