rxjava delay: How to get variable delay on each it

2020-04-05 06:50发布

I want to have custom delays between each item emitted from an observable list as a function of the items themselves. Let's say we have a list as (item, delay):

[("item1", 2),("item2", 1),("item3", 2),("item4", 3),("item5", 2),("item6", 3)]

I want output to be something like:

0 seconds: 
1 seconds: 
item1
2 seconds: 
item2
3 seconds: 
4 seconds: 
item3
5 seconds: 
6 seconds: 
7 seconds: 
item4
8 seconds: 
9 seconds: 
item5
10 seconds: 
11 seconds: 
12 seconds: 
item6
Completed!
13 seconds: 

I am not sure how to best accomplish this with delay/timer operators. Went through delay documentation but couldn't figure out a straightforward way. Any pointers would be helpful. Thanks!

标签: java rx-java
5条回答
老娘就宠你
2楼-- · 2020-04-05 07:10

I might have a possible solution as below. Though I feel like there can be a better way to implement it, but this works for now. Do suggest alternatives/improvements if you see ways to make this better:

import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

public class Watchtower {
    public static void main(String[] arg) throws Exception {

        // Boilerplate for setup
        System.out.println("Welcome to Watchtower");

        class SuperHero {
            public String name;
            public long delay;

            public SuperHero(String name, int delay) {
                this.name = name;
                this.delay = (long) delay;
            }
        }

        Observable<Long> clock = Observable.interval(1, TimeUnit.SECONDS).take(14);

        clock.subscribe(tick -> System.out.println(tick + " seconds: "));

        List<SuperHero> jla = new ArrayList<>(Arrays.asList(
            new SuperHero("Bruce", 2),
            new SuperHero("Barry", 1),
            new SuperHero("Clark", 2),
            new SuperHero("Hal", 3),
            new SuperHero("Arthur", 2),
            new SuperHero("Diana", 3)
            )
        );

        // Rxjava stuff starts form here:

        Observable<SuperHero> jl = Observable.from(jla);

        final long[] cumDelay = {0};

        Observable<SuperHero> delays = jl
            .doOnNext(s -> cumDelay[0] += s.delay)
            .delay(s -> Observable.timer(cumDelay[0], TimeUnit.SECONDS));

        Observable.zip(jl, delays, (s, d) -> s)
            .doOnNext(s -> System.out.println(s.name + " just came..."))
            .doOnCompleted(() -> System.out.println("Everybody came!"))
            .subscribe();

        // Just to have program remain alive and run on normal command line
        Thread.sleep(15000);
    }
}

The output it produces:

Welcome to Watchtower
0 seconds: 
1 seconds: 
Bruce just came...
2 seconds: 
Barry just came...
3 seconds: 
4 seconds: 
Clark just came...
5 seconds: 
6 seconds: 
7 seconds: 
Hal just came...
8 seconds: 
9 seconds: 
Arthur just came...
10 seconds: 
11 seconds: 
12 seconds: 
Diana just came...
Everybody came!
13 seconds: 
查看更多
贪生不怕死
3楼-- · 2020-04-05 07:15

You can also make user of Timer operator Have a look at this

Integer[] arr = {1, 2, 3, 4, 5, 6, 7};
Observable.fromArray(arr)
        // timer returns a 0 value, note the nested map operator usage
        // in order to preserve original integers
        .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
                    .map(y -> x))
        .timestamp()
        .subscribe(timedIntegers ->
                Log.i(TAG, "Timed String: "
                        + timedIntegers.value()
                        + " "
                        + timedIntegers.time()));
查看更多
家丑人穷心不美
4楼-- · 2020-04-05 07:18

In order to delay a particular step you can use zip and combine that every item emitted in your first Observable.from go with an interval of X time.

/**
 * If we want to delay the every single item emitted in the pipeline we will need a hack,
 * one possible hack is use zip operator and combine every item emitted with an interval so every item emitted has to wait until interval emit the item.
   */
   @Test
   public void delay() {
       long start = System.currentTimeMillis();
       Subscription subscription = Observable.zip(Observable.from(Arrays.asList(1, 2, 3)), Observable.interval(200, TimeUnit.MILLISECONDS), (i, t) -> i)
                                      .subscribe(n ->System.out.println("time:" + (System.currentTimeMillis() - start)));
           new TestSubscriber((Observer) subscription).awaitTerminalEvent(3000, TimeUnit.MILLISECONDS);
       }

This will print

          time:537
          time:738
          time:936

More practicle examples here https://github.com/politrons/reactive

查看更多
唯我独甜
5楼-- · 2020-04-05 07:25

No need for anything fancy. Just use concatMap and delay operators

jla.concatMap(s -> Observable.just(s).delay(s.delay, TimeUnit.SECONDS))           
  .subscribe(s1 -> System.out.println(s1.name + " just came..."), 
             e -> {}, 
             () -> System.out.println("Everybody came!")); 
查看更多
forever°为你锁心
6楼-- · 2020-04-05 07:31

You may try to use this override of .delay() http://reactivex.io/RxJava/javadoc/rx/Observable.html#delay(rx.functions.Func1)

It seems exactly what you need

The code would be something like:

yourObservable.delay((item) -> Observable.timer(item.getDelay(), TimeUnit.SECONDS))
查看更多
登录 后发表回答