n-ary Cartesian product inRxJava

2019-06-24 00:50发布

问题:

Now I hold an Observable<Observable<Integer>, how can I transfer it into Observable<int[]> that contains the n-ary Cartesian product?

For example:

Observable<Observable<Integer> ob = Observable.just(
  Observable.just(0,1),
  Observable.just(2,3),
  Observable.just(4,5)
  );
ob...... ->   (0,2,4), (0,3,4), (0,2,5), (0,3,5), (1,2,4), (1,3,4), (1,2,5), (1,3,5)

回答1:

First of all, you need a fixed number of input Observables. Second, there is no need for blocking but there is likely the need for caching because the 2nd, 3rd etc Observables need to be consumed multiple times.

import java.util.*;

import io.reactivex.Observable;

public class Cartesian {

    static Observable<int[]> cartesian(Observable<Observable<Integer>> sources) {
        return sources.toList().flatMapObservable(list -> cartesian(list));
    }

    static Observable<int[]> cartesian(List<Observable<Integer>> sources) {
        if (sources.size() == 0) {
            return Observable.<int[]>empty();
        }
        Observable<int[]> main = sources.get(0).map(v -> new int[] { v });

        for (int i = 1; i < sources.size(); i++) {
            int j = i;
            Observable<Integer> o = sources.get(i).cache();
            main = main.flatMap(v -> {
                return o.map(w -> {
                    int[] arr = Arrays.copyOf(v, j + 1);
                    arr[j] = w;
                    return arr;
                });
            });
        }

        return main;
    }

    public static void main(String[] args) {
        cartesian(Observable.just(
            Observable.just(0, 1), 
            Observable.just(2, 3), 
            Observable.just(4, 5)
        ))
        .subscribe(v -> System.out.println(Arrays.toString(v)));
    }
}


回答2:

Creating Cartesian product in an asynchronous way is hard or in some sense is impossible. And if blocking is OK, you can do something like this

public class Main
{

    static class ProductIterator<T> implements Iterator<T[]>
    {
        private final List<List<T>> componentsList;
        private final Class<T> componentClass;
        private final int[] indices;
        private boolean hasNext;

        public ProductIterator(List<List<T>> componentsList, Class<T> componentClass)
        {
            this.componentsList = componentsList;
            this.componentClass = componentClass;
            this.indices = new int[componentsList.size()];
            this.hasNext = this.indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();
        }

        @Override
        public boolean hasNext()
        {
            return hasNext;
        }

        @Override
        public T[] next()
        {
            T[] res = (T[]) Array.newInstance(componentClass, componentsList.size());
            for (int i = 0; i < componentsList.size(); i++)
            {
                res[i] = componentsList.get(i).get(indices[i]);
            }

            // move next
            indices[0]++;
            for (int i = 0; i < componentsList.size() - 1; i++)
            {
                if (indices[i] == componentsList.get(i).size())
                {
                    indices[i] = 0;
                    indices[i + 1]++;
                }
            }
            hasNext = indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();

            return res;
        }
    }

    public static <T> Observable<T[]> product(Observable<Observable<T>> components, Class<T> componentClass)
    {
        return Observable.fromIterable(new Iterable<T[]>()
        {
            @Override
            public Iterator<T[]> iterator()
            {
                // postpone blocking up until iterator is requested 
                // and by this point we can't postpone anymore 
                Single<List<List<T>>> componentsList = components.map(o -> o.toList().blockingGet()).toList();
                return new ProductIterator<T>(componentsList.blockingGet(), componentClass);
            }
        });
    }

    public static void main(String[] args) throws Exception
    {
        Observable<Observable<Integer>> ob = Observable.just(
                Observable.just(0, 1),
                Observable.just(2, 3),
                Observable.just(4, 5)
        );

        Observable<Integer[]> product = product(ob, Integer.class);
        product.forEach(a -> System.out.println(Arrays.toString(a)));
    }
}

It is possible to improve this code to avoid blocking but you still will have to cache all results from all Observables and code will be much more complicated. Most probably is blocking is not acceptable for you, trying to get Cartesian product is bad idea anyway.



回答3:

Well, I can resolve it myself. But is there any more elegant way?
(The toArray method convert an Observable<T> to T[])

    Observable<int[]> toObservableArray(Observable<Observable<Integer>> obs) {
        List<int[]> list = obs.map(ob -> toArray(ob)).toList().toBlocking().last();
        return Observable.create(new SyncOnSubscribe<int[], int[]>() {
            @Override
            protected int[] generateState() {
                int[] array = new int[list.size()];
                Arrays.fill(array, 0);
                return array;
            }

            @Override
            protected int[] next(int[] state, Observer<? super int[]> observer) {
                int[] next = new int[list.size()];
                for (int i = 0; i < next.length; i++) {
                    next[i] = list.get(i)[state[i]];
                }
                observer.onNext(next);
                state[state.length - 1]++;
                for (int i = state.length - 1; i >= 0; i--) {
                    int delta = list.get(i).length - state[i];
                    if (delta > 0) {
                        break;
                    } else if (delta == 0) {
                        state[i] = 0;
                        if (i == 0) {
                            observer.onCompleted();
                            break;
                        }
                        state[i - 1]++;
                    }
                }
                return state;
            }
        });
    }