I'm new to reactive extensions (rx) and try to do the following thing in .NET (but should be the same in JS and other languages.):
I have a stream incoming with objects containing a string and a bool property. The stream would be infinite. I have the following conditions:
- The first object should always be printed.
- Now all incoming objects should be skipped until an object arrives with the bool property set to "true".
- When an object arrives with the bool property set to "true", this object should be skipped but the next object should be printed (no matter what the properties are).
- Now it goes on this way, every object that follows an object with the property set to true should be printed.
Example:
("one", false)--("two", true)--("three", false)--("four", false)--("five", true)--("six", true)--("seven", true)--("eight", false)--("nine", true)--("ten", false)
Expected result:
"one"--"three"--"six"--"seven"--"eight"--"ten"
Beware that "six" and "seven" have been printed because they follow an object with the property set to true, even if their own property is also set to "true".
Simple .NET program to test it:
using System;
using System.Reactive.Linq;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
class Result
{
public bool Flag { get; set; }
public string Text { get; set; }
}
static void Main(string[] args)
{
var source =
Observable.Create<Result>(f =>
{
f.OnNext(new Result() { Text = "one", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "two", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "three", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "four", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "five", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "six", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "seven", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "eight", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "nine", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "ten", Flag = false });
return () => Console.WriteLine("Observer has unsubscribed");
});
}
}
}
I tried to use the .Scan
and .Buffer
extensions but I don't know how exactly to use them in my scenario.
Performance of course should be as good as possible, cause in the end the stream would be infinite.
This is the way I would code it in TypeScript
The basic idea is to use an higher level function,
printItem
, which returns a function that knows if and what to print. The function returned is stored in a variable,printItemFunction
.For each item emitted by the source Observable, the first thing to be done is to execute
printItemFunction
passing the data notified by the source Observable.The second thing is to evaluate
printItem
function and store the result in the variableprintItemFunction
so that it is ready for the following notification.At the beginning of the program,
printItemFunction
is initialized withtrue
, so that the first item is always printed.I am not familiar with C# to give you an answer for .NET
Here's a number of ways to do it:
I'm partial to the Scan one, but really, up to you.
I've found a way to do it thanks to the answer of @Picci:
However, I'm not quite sure if this is the best way since it seems a little strange to me not to use Subscribe() but side-effects. In the end i don't only want to print the values but also call a Webservice with it.
Try this approach:
The above just produces the
source
in a more idiomatic way than yourObservable.Create<Result>
approach.Now here's the
query
:The use of
.Publish
here allows the source observable to have only one subscription, but for it to be used multiple times within the.Publish
method. Then the standardSkip(1).Zip
approach can be used to inspect the subsequent values being produced.Here's the output:
After inspiration from Shlomo, here's my approach using
.Buffer(2, 1)
: