I am using RxJava in my Android app and I want to load data from the database.
In this way, I am creating a new Observable using Observable.create()
which returns a list of EventLog
public Observable<List<EventLog>> loadEventLogs() {
return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
@Override
public void call(Subscriber<? super List<EventLog>> subscriber) {
List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
List<EventLog> eventLogs = new ArrayList<>(logs.size());
for (int i = 0; i < logs.size(); i++) {
eventLogs.add(new EventLog(logs.get(i)));
}
subscriber.onNext(eventLogs);
}
});
}
Though it works correctly, I read that using Observable.create()
is not actually a best practice for Rx Java (see here).
So I changed this method in this way.
public Observable<List<EventLog>> loadEventLogs() {
return Observable.fromCallable(new Func0<List<EventLog>>() {
@Override
public List<EventLog> call() {
List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
List<EventLog> eventLogs = new ArrayList<>(logs.size());
for (int i = 0; i < logs.size(); i++) {
eventLogs.add(new EventLog(logs.get(i)));
}
return eventLogs;
}
});
}
Is this a better approach using Rx Java? Why? What is actually the difference among the two methods?
Moreover, since the database load a list of elements, makes sense to emit the entire list at once? Or should I emit one item at a time?
The two methods may look like similar and behave similar but fromCallable
deals with the difficulties of backpressure for you whereas the create
version does not. Dealing with backpressure inside an OnSubscribe
implementation ranges from simple to outright mind-melting; however, if omitted, you may get MissingBackpressureException
s along asynchronous boundaries (such as observeOn
) or even on continuation boundaries (such as concat
).
RxJava tries to offer proper backpressure support for as much factories and operators as possible, however, there are quite a few factories and operators that can't support it.
The second problem with manual OnSubscribe
implementation is the lack of cancellation support, especially if you generate a lot of onNext
calls. Many of these can be replaced by standard factory methods (such as from
) or helper classes (such as SyncOnSubscribe
) that deal with all the complexity for you.
You may find a lot of introductions and examples that (still) use create
for two reasons.
- It is much easier to introduce push-based datastreams by showing how the push of events work in an imperative fashion. In my opinion, such sources spend too much time with
create
proportionally instead of talking about the standard factory methods and showing how certain common tasks (such as yours) can be achieved safely.
- Many of these examples were created the time RxJava didn't require backpressure support or even proper synchronous cancellation support or were just ported from the Rx.NET examples (which to date doesn't support backpressure and synchronous cancellation works somehow, courtesy of C# I guess.) Generating values by calling onNext was worry-free back then. However, such use does lead to buffer bloat and excessive memory usage, therefore, the Netflix team came up with a way of limiting the memory usage by requiring observers to state how many items are they willing to proceed. This became known as backpressure.
For the second question, namely if one should create a List or a sequence of values, it depends on your source. If your source supports some kind of iteration or streaming of individual data elements (such as JDBC), you can just hook onto it and emit one by one (see SyncOnSubscribe
). If it doesn't support it or you need it in List form anyway, then keep it as it is. You can always convert between the two forms via toList
and flatMapIterable
if necessary.
As it's explain in the response you linked, with Observable.create
you'll may need to violate the advanced requirements of RxJava.
For example, you'll need to implement backpressure, or how to unsubscribe.
In you case, you want to emit an item, without having to deal with backpressure or subscription. So Observable.fromCallable
is a good call. RxJava will deal with the rest.