Observable.Generate in RxJava?

2019-07-11 03:17发布

.NET Reactive Extensions has a neat method to generate sequences using corecursion which is called Observable.Generate.

Is there analogues method in RxJava that allows data generation via corecursion? If not, could it be implemented based on existing methods?

1条回答
甜甜的少女心
2楼-- · 2019-07-11 03:50

It's not an exact match but we have SyncOnSubscribe (and AsyncOnSubscriber) that can generate values, for example:

@Test
public void testRange() {
    final int start = 1;
    final int count = 4000;
    OnSubscribe<Integer> os = SyncOnSubscribe.createStateful(new Func0<Integer>(){
        @Override
        public Integer call() {
            return start;
        }}, 
        new Func2<Integer, Observer<? super Integer>, Integer>() {
            @Override
            public Integer call(Integer state, Observer<? super Integer> subscriber) {
                subscriber.onNext(state);
                if (state == count) {
                    subscriber.onCompleted();
                }
                return state + 1;
            }
        });

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);
    InOrder inOrder = inOrder(o);

    Observable.create(os).subscribe(o);

    verify(o, never()).onError(any(TestException.class));
    inOrder.verify(o, times(count)).onNext(any(Integer.class));
    inOrder.verify(o).onCompleted();
    inOrder.verifyNoMoreInteractions();
}
查看更多
登录 后发表回答