Observable.Generate in RxJava?

2019-07-11 03:02发布

问题:

.NET Reactive Extensions has a neat method to generate sequences using corecursion which is called Observable.Generate.

Is there analogues method in RxJava that allows data generation via corecursion? If not, could it be implemented based on existing methods?

回答1:

It's not an exact match but we have SyncOnSubscribe (and AsyncOnSubscriber) that can generate values, for example:

@Test
public void testRange() {
    final int start = 1;
    final int count = 4000;
    OnSubscribe<Integer> os = SyncOnSubscribe.createStateful(new Func0<Integer>(){
        @Override
        public Integer call() {
            return start;
        }}, 
        new Func2<Integer, Observer<? super Integer>, Integer>() {
            @Override
            public Integer call(Integer state, Observer<? super Integer> subscriber) {
                subscriber.onNext(state);
                if (state == count) {
                    subscriber.onCompleted();
                }
                return state + 1;
            }
        });

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);
    InOrder inOrder = inOrder(o);

    Observable.create(os).subscribe(o);

    verify(o, never()).onError(any(TestException.class));
    inOrder.verify(o, times(count)).onNext(any(Integer.class));
    inOrder.verify(o).onCompleted();
    inOrder.verifyNoMoreInteractions();
}