Are Java 8 streams similar to RxJava observables?
Java 8 stream definition:
Classes in the new
java.util.stream
package provide a Stream API to support functional-style operations on streams of elements.
Are Java 8 streams similar to RxJava observables?
Java 8 stream definition:
Classes in the new
java.util.stream
package provide a Stream API to support functional-style operations on streams of elements.
TL;DR: All sequence/stream processing libs are offering very similar API for pipeline building. The differences are in API for handling multi-threading and composition of pipelines.
RxJava is quite different from Stream. Of all JDK things, the closest to rx.Observable is perhaps java.util.stream.Collector Stream + CompletableFuture combo (which comes at a cost of dealing with extra monad layer, i. e. having to handle conversion between Stream<CompletableFuture<T>>
and CompletableFuture<Stream<T>>
).
There are significant differences between Observable and Stream:
Stream#parallel()
splits sequence into partitions, Observable#subscribeOn()
and Observable#observeOn()
do not; it is tricky to emulate Stream#parallel()
behavior with Observable, it once had .parallel()
method but this method caused so much confusion that .parallel()
support was moved to separate repository on github, RxJavaParallel. More details are in another answer.Stream#parallel()
does not allow to specify a thread pool to use, unlike most of RxJava methods accepting optional Scheduler. Since all stream instances in a JVM use the same fork-join pool, adding .parallel()
can accidentally affect the behaviour in another module of your programObservable#interval()
, Observable#window()
and many others; this is mostly because Streams are pull-basedtakeWhile()
, takeUntil()
); workaround using Stream#anyMatch()
is limited: it is terminal operation, so you can't use it more than once per streamObservable#using()
); you can wrap IO stream or mutex with it and be sure that the user will not forget to free the resource - it will be disposed automatically on subscription termination; Stream has onClose(Runnable)
method, but you have to call it manually or via try-with-resources. E. g. you have to keep in mind that Files#lines() must be enclosed in try-with-resources block.Round-up: RxJava differs from Streams significantly. Real RxJava alternatives are other implementations of ReactiveStreams, e. g. relevant part of Akka.
Update. There's trick to use non-default fork-join pool for Stream#parallel
, see Custom thread pool in Java 8 parallel stream
Update. All of the above is based on the experience with RxJava 1.x. Now that RxJava 2.x is here, this answer may be out-of-date.
Java 8 Stream and RxJava looks pretty similar. They have look alike operators (filter, map, flatMap...) but are not built for the same usage.
You can perform asynchonus tasks using RxJava.
With Java 8 stream, you'll traverse items of your collection.
You can do pretty much the same thing in RxJava (traverse items of a collection) but, as RxJava is focussed on concurrent task, ..., it use synchronization, latch, ... So the same task using RxJava may be slower than with Java 8 stream.
RxJava can be compared to CompletableFuture
, but that can be able to compute more than just one value.
There are a few technical and conceptional differences, for example, Java 8 streams are single use, pull based, synchronous sequences of values whereas RxJava Observables are re-observable, adaptively push-pull based, potentially asynchronous sequences of values. RxJava is aimed at Java 6+ and works on Android as well.
Java 8 Streams are pull based. You iterate over a Java 8 stream consuming each item. And it could be an endless stream.
RXJava Observable
is by default push based. You subscribe to an Observable and you will get notified when the next item arrives (onNext
), or when the stream is completed (onCompleted
), or when an error occurred (onError
).
Because with Observable
you receive onNext
, onCompleted
, onError
events, you can do some powerful functions like combining different Observable
s to a new one (zip
, merge
, concat
). Other stuff you could do is caching, throttling, ...
And it uses more or less the same API in different languages (RxJava, RX in C#, RxJS, ...)
By default RxJava is single threaded. Unless you start using Schedulers, everything will happen on the same thread.
The existing answers are comprehensive and correct, but a clear example for beginners is lacking. Allow me to put some concrete behind terms like "push/pull-based" and "re-observable". Note: I hate the term Observable
(it's a stream for heaven's sake), so will simply refer to J8 vs RX streams.
Consider a list of integers,
digits = [1,2,3,4,5]
A J8 Stream is a utility to modify the collection. For example even digits can be extracted as,
evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())
This is basically Python's map, filter, reduce, a very nice (and long overdue) addition to Java. But what if digits weren't collected ahead of time - what if the digits were streaming in while the app was running - could we filter the even's in realtime.
Imagine a separate thread process is outputting integers at random times while the app is running (---
denotes time)
digits = 12345---6------7--8--9-10--------11--12
In RX, even
can react to each new digit and apply the filter in real-time
even = -2-4-----6---------8----10------------12
There's no need to store input and output lists. If you want an output list, no problem that's streamable too. In fact, everything is a stream.
evens_stored = even.collect()
This is why terms like "stateless" and "functional" are more associated with RX
RxJava is also closely related to the reactive streams initiative and considers it self as a simple implementation of the reactive streams API (e.g. compared to the Akka streams implementation). The main difference is, that the reactive streams are designed to be able to handle back pressure, but if you have a look at the reactive streams page, you will get the idea. They describe their goals pretty well and the streams are also closely related to the reactive manifesto.
The Java 8 streams are pretty much the implementation of an unbounded collection, pretty similar to the Scala Stream or the Clojure lazy seq.
Java 8 Streams enable processing of really large collections efficiently, while leveraging multicore architectures. In contrast, RxJava is single-threaded by default (without Schedulers). So RxJava won't take advantage of multi-core machines unless you code that logic yourself.