When using Rx (specifically, RxJava), is there an operator that will package the input variable along with the function's output, in order to use both in the next step?
For example, let's say I start with a list of Tweet ID's, and my program a) does a REST call to get the text of that tweet and b) saves the id and text into a local database. A normal "map" will return the text for step a, but it discards the original id. In code:
Observable.from(tweet_id_list)
.specialMap(i -> getTweetText(i)) // is there a "specialMap" which returns the string result AND ALSO the id?
.map((i, s) -> saveToLocalDB(i, s) // because we need both for the ensuing function
I hacked getTweetText(i)
to return a HashMap with both variables instead of a simple String, but it would be highly useful if there was an Rx operator which could do this without having to modify the underlying function.
Create or use a general purpose Pair type or a Tuple type and map your stream using that. Something like this:
Observable.from(tweet_id_list)
.map(id -> Pair.create(id,getTweetText(id))
.map(pair -> saveToLocalDB(pair.first(),pair.second())
If you want a full map with all the ids->texts populated:
Observable.from(tweet_id_list)
.toMap((id) -> id, (id) -> getTweetText(id));
.subscribe((tweetMap) -> saveAllToLocalDB(tweetMap));
That will emit (assuming Integer IDs and String text) a single Map. This would be useful (as I assumed above) if you can save the whole map to the DB at once, instead of having multiple save calls as in your original code.
You want to make sure to avoid side effects in your observable calls. Side effects such as DB saving should only happen when you make a subscribe call and observe the results, not in the map itself. If you need saving to happen in the background, you could add something like .observeOn(Schedulers.io())
before the .subscribe
call, depending on your threading needs.
If you prefer having single results and saving them one-by-one, you probably want to use a flatmap so the lookups can be done in parallel. I'm using defer
here so that we do not immediately block on the getTweetText call, but rather wait for each to be subscribed to. This gives us control over the thread we do the work on. Note that since the requests may be concurrent, the results will not necessarily be in the same order as tweet_id_list
.
Observable.from(tweet_id_list)
.flatMap((id) -> Observable.defer(() -> Observable.just(
Pair.create(id, getTweetText(id))).subscribeOn(Schedulers.io()))
.subscribe((pair) -> saveToLocalDB(pair.first(), pair.second())