Creating Observable without using Observable.creat

2019-01-16 18:49发布

问题:

I am using RxJava in my Android app and I want to load data from the database.

In this way, I am creating a new Observable using Observable.create() which returns a list of EventLog

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
        @Override
        public void call(Subscriber<? super List<EventLog>> subscriber) {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            subscriber.onNext(eventLogs);
        }
    });
}

Though it works correctly, I read that using Observable.create() is not actually a best practice for Rx Java (see here).

So I changed this method in this way.

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.fromCallable(new Func0<List<EventLog>>() {
        @Override
        public List<EventLog> call() {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            return eventLogs;
        }
    });
}

Is this a better approach using Rx Java? Why? What is actually the difference among the two methods?

Moreover, since the database load a list of elements, makes sense to emit the entire list at once? Or should I emit one item at a time?

回答1:

The two methods may look like similar and behave similar but fromCallable deals with the difficulties of backpressure for you whereas the create version does not. Dealing with backpressure inside an OnSubscribe implementation ranges from simple to outright mind-melting; however, if omitted, you may get MissingBackpressureExceptions along asynchronous boundaries (such as observeOn) or even on continuation boundaries (such as concat).

RxJava tries to offer proper backpressure support for as much factories and operators as possible, however, there are quite a few factories and operators that can't support it.

The second problem with manual OnSubscribe implementation is the lack of cancellation support, especially if you generate a lot of onNext calls. Many of these can be replaced by standard factory methods (such as from) or helper classes (such as SyncOnSubscribe) that deal with all the complexity for you.

You may find a lot of introductions and examples that (still) use create for two reasons.

  1. It is much easier to introduce push-based datastreams by showing how the push of events work in an imperative fashion. In my opinion, such sources spend too much time with create proportionally instead of talking about the standard factory methods and showing how certain common tasks (such as yours) can be achieved safely.
  2. Many of these examples were created the time RxJava didn't require backpressure support or even proper synchronous cancellation support or were just ported from the Rx.NET examples (which to date doesn't support backpressure and synchronous cancellation works somehow, courtesy of C# I guess.) Generating values by calling onNext was worry-free back then. However, such use does lead to buffer bloat and excessive memory usage, therefore, the Netflix team came up with a way of limiting the memory usage by requiring observers to state how many items are they willing to proceed. This became known as backpressure.

For the second question, namely if one should create a List or a sequence of values, it depends on your source. If your source supports some kind of iteration or streaming of individual data elements (such as JDBC), you can just hook onto it and emit one by one (see SyncOnSubscribe). If it doesn't support it or you need it in List form anyway, then keep it as it is. You can always convert between the two forms via toList and flatMapIterable if necessary.



回答2:

As it's explain in the response you linked, with Observable.create you'll may need to violate the advanced requirements of RxJava.

For example, you'll need to implement backpressure, or how to unsubscribe.

In you case, you want to emit an item, without having to deal with backpressure or subscription. So Observable.fromCallable is a good call. RxJava will deal with the rest.