I am trying to write a simple program using RxJava to generate an infinite sequence of natural numbers. So, far I have found two ways to generate sequence of numbers using Observable.timer() and Observable.interval(). I am not sure if these functions are the right way to approach this problem. I was expecting a simple function like one we have in Java 8 to generate infinite natural numbers.
IntStream.iterate(1, value -> value +1).forEach(System.out::println);
I tried using IntStream with Observable but that does not work correctly. It sends infinite stream of numbers only to first subscriber. How can I correctly generate infinite natural number sequence?
import rx.Observable;
import rx.functions.Action1;
import java.util.stream.IntStream;
public class NaturalNumbers {
public static void main(String[] args) {
Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
IntStream stream = IntStream.iterate(1, val -> val + 1);
stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber));
});
Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber);
Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber);
Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber);
naturalNumbers.subscribe(first);
naturalNumbers.subscribe(second);
naturalNumbers.subscribe(third);
}
}
The problem is that the on naturalNumbers.subscribe(first);
, the OnSubscribe
you implemented is being called and you are doing a forEach
over an infinite stream, hence why your program never terminates.
One way you could deal with it is to asynchronously subscribe them on a different thread. To easily see the results I had to introduce a sleep into the Stream processing:
Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
IntStream stream = IntStream.iterate(1, i -> i + 1);
stream.peek(i -> {
try {
// Added to visibly see printing
Thread.sleep(50);
} catch (InterruptedException e) {
}
}).forEach(subscriber::onNext);
});
final Subscription subscribe1 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(first);
final Subscription subscribe2 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(second);
final Subscription subscribe3 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(third);
Thread.sleep(1000);
System.out.println("Unsubscribing");
subscribe1.unsubscribe();
subscribe2.unsubscribe();
subscribe3.unsubscribe();
Thread.sleep(1000);
System.out.println("Stopping");
Observable.Generate
is exactly the operator to solve this class of problem reactively. I also assume this is a pedagogical example, since using an iterable for this is probably better anyway.
Your code produces the whole stream on the subscriber's thread. Since it is an infinite stream the subscribe
call will never complete. Aside from that obvious problem, unsubscribing is also going to be problematic since you aren't checking for it in your loop.
You want to use a scheduler to solve this problem - certainly do not use subscribeOn
since that would burden all observers. Schedule the delivery of each number to onNext
- and as a last step in each scheduled action, schedule the next one.
Essentially this is what Observable.generate
gives you - each iteration is scheduled on the provided scheduler (which defaults to one that introduces concurrency if you don't specify it). Scheduler operations can be cancelled and avoid thread starvation.
Rx.NET solves it like this (actually there is an async/await
model that's better, but not available in Java afaik):
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.Schedule(0, (i, self) =>
{
if (i < count)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(start + i);
self(i + 1);
}
else
{
observer.OnCompleted();
}
});
});
}
Two things to note here:
- The call to Schedule returns a subscription handle that is passed back to the observer
- The Schedule is recursive - the
self
parameter is a reference to the scheduler used to call the next iteration. This allows for unsubscription to cancel the operation.
Not sure how this looks in RxJava, but the idea should be the same. Again, Observable.generate
will probably be simpler for you as it was designed to take care of this scenario.
When creating infinite sequencies care should be taken to:
- subscribe and observe on different threads; otherwise you will only serve single subscriber
- stop generating values as soon as subscription terminates; otherwise runaway loops will eat your CPU
The first issue is solved by using subscribeOn()
, observeOn()
and various schedulers.
The second issue is best solved by using library provided methods Observable.generate()
or Observable.fromIterable()
. They do proper checking.
Check this:
Observable<Integer> naturalNumbers =
Observable.<Integer, Integer>generate(() -> 1, (s, g) -> {
logger.info("generating {}", s);
g.onNext(s);
return s + 1;
}).subscribeOn(Schedulers.newThread());
Disposable sub1 = naturalNumbers
.subscribe(v -> logger.info("1 got {}", v));
Disposable sub2 = naturalNumbers
.subscribe(v -> logger.info("2 got {}", v));
Disposable sub3 = naturalNumbers
.subscribe(v -> logger.info("3 got {}", v));
Thread.sleep(100);
logger.info("unsubscribing...");
sub1.dispose();
sub2.dispose();
sub3.dispose();
Thread.sleep(1000);
logger.info("done");