可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I want to have custom delays between each item emitted from an observable list as a function of the items themselves.
Let's say we have a list as (item, delay):
[("item1", 2),("item2", 1),("item3", 2),("item4", 3),("item5", 2),("item6", 3)]
I want output to be something like:
0 seconds:
1 seconds:
item1
2 seconds:
item2
3 seconds:
4 seconds:
item3
5 seconds:
6 seconds:
7 seconds:
item4
8 seconds:
9 seconds:
item5
10 seconds:
11 seconds:
12 seconds:
item6
Completed!
13 seconds:
I am not sure how to best accomplish this with delay/timer operators. Went through delay documentation but couldn't figure out a straightforward way. Any pointers would be helpful. Thanks!
回答1:
No need for anything fancy. Just use concatMap
and delay
operators
jla.concatMap(s -> Observable.just(s).delay(s.delay, TimeUnit.SECONDS))
.subscribe(s1 -> System.out.println(s1.name + " just came..."),
e -> {},
() -> System.out.println("Everybody came!"));
回答2:
You may try to use this override of .delay()
http://reactivex.io/RxJava/javadoc/rx/Observable.html#delay(rx.functions.Func1)
It seems exactly what you need
The code would be something like:
yourObservable.delay((item) -> Observable.timer(item.getDelay(), TimeUnit.SECONDS))
回答3:
I might have a possible solution as below. Though I feel like there can be a better way to implement it, but this works for now. Do suggest alternatives/improvements if you see ways to make this better:
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
public class Watchtower {
public static void main(String[] arg) throws Exception {
// Boilerplate for setup
System.out.println("Welcome to Watchtower");
class SuperHero {
public String name;
public long delay;
public SuperHero(String name, int delay) {
this.name = name;
this.delay = (long) delay;
}
}
Observable<Long> clock = Observable.interval(1, TimeUnit.SECONDS).take(14);
clock.subscribe(tick -> System.out.println(tick + " seconds: "));
List<SuperHero> jla = new ArrayList<>(Arrays.asList(
new SuperHero("Bruce", 2),
new SuperHero("Barry", 1),
new SuperHero("Clark", 2),
new SuperHero("Hal", 3),
new SuperHero("Arthur", 2),
new SuperHero("Diana", 3)
)
);
// Rxjava stuff starts form here:
Observable<SuperHero> jl = Observable.from(jla);
final long[] cumDelay = {0};
Observable<SuperHero> delays = jl
.doOnNext(s -> cumDelay[0] += s.delay)
.delay(s -> Observable.timer(cumDelay[0], TimeUnit.SECONDS));
Observable.zip(jl, delays, (s, d) -> s)
.doOnNext(s -> System.out.println(s.name + " just came..."))
.doOnCompleted(() -> System.out.println("Everybody came!"))
.subscribe();
// Just to have program remain alive and run on normal command line
Thread.sleep(15000);
}
}
The output it produces:
Welcome to Watchtower
0 seconds:
1 seconds:
Bruce just came...
2 seconds:
Barry just came...
3 seconds:
4 seconds:
Clark just came...
5 seconds:
6 seconds:
7 seconds:
Hal just came...
8 seconds:
9 seconds:
Arthur just came...
10 seconds:
11 seconds:
12 seconds:
Diana just came...
Everybody came!
13 seconds:
回答4:
In order to delay a particular step you can use zip and combine that every item emitted in your first Observable.from go with an interval of X time.
/**
* If we want to delay the every single item emitted in the pipeline we will need a hack,
* one possible hack is use zip operator and combine every item emitted with an interval so every item emitted has to wait until interval emit the item.
*/
@Test
public void delay() {
long start = System.currentTimeMillis();
Subscription subscription = Observable.zip(Observable.from(Arrays.asList(1, 2, 3)), Observable.interval(200, TimeUnit.MILLISECONDS), (i, t) -> i)
.subscribe(n ->System.out.println("time:" + (System.currentTimeMillis() - start)));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(3000, TimeUnit.MILLISECONDS);
}
This will print
time:537
time:738
time:936
More practicle examples here https://github.com/politrons/reactive
回答5:
You can also make user of Timer
operator
Have a look at this
Integer[] arr = {1, 2, 3, 4, 5, 6, 7};
Observable.fromArray(arr)
// timer returns a 0 value, note the nested map operator usage
// in order to preserve original integers
.flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
.map(y -> x))
.timestamp()
.subscribe(timedIntegers ->
Log.i(TAG, "Timed String: "
+ timedIntegers.value()
+ " "
+ timedIntegers.time()));