n-ary Cartesian product inRxJava

2019-06-24 00:50发布


Now I hold an Observable<Observable<Integer>, how can I transfer it into Observable<int[]> that contains the n-ary Cartesian product?

For example:

Observable<Observable<Integer> ob = Observable.just(
ob...... ->   (0,2,4), (0,3,4), (0,2,5), (0,3,5), (1,2,4), (1,3,4), (1,2,5), (1,3,5)


First of all, you need a fixed number of input Observables. Second, there is no need for blocking but there is likely the need for caching because the 2nd, 3rd etc Observables need to be consumed multiple times.

import java.util.*;

import io.reactivex.Observable;

public class Cartesian {

    static Observable<int[]> cartesian(Observable<Observable<Integer>> sources) {
        return sources.toList().flatMapObservable(list -> cartesian(list));

    static Observable<int[]> cartesian(List<Observable<Integer>> sources) {
        if (sources.size() == 0) {
            return Observable.<int[]>empty();
        Observable<int[]> main = sources.get(0).map(v -> new int[] { v });

        for (int i = 1; i < sources.size(); i++) {
            int j = i;
            Observable<Integer> o = sources.get(i).cache();
            main = main.flatMap(v -> {
                return o.map(w -> {
                    int[] arr = Arrays.copyOf(v, j + 1);
                    arr[j] = w;
                    return arr;

        return main;

    public static void main(String[] args) {
            Observable.just(0, 1), 
            Observable.just(2, 3), 
            Observable.just(4, 5)
        .subscribe(v -> System.out.println(Arrays.toString(v)));


Creating Cartesian product in an asynchronous way is hard or in some sense is impossible. And if blocking is OK, you can do something like this

public class Main

    static class ProductIterator<T> implements Iterator<T[]>
        private final List<List<T>> componentsList;
        private final Class<T> componentClass;
        private final int[] indices;
        private boolean hasNext;

        public ProductIterator(List<List<T>> componentsList, Class<T> componentClass)
            this.componentsList = componentsList;
            this.componentClass = componentClass;
            this.indices = new int[componentsList.size()];
            this.hasNext = this.indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();

        public boolean hasNext()
            return hasNext;

        public T[] next()
            T[] res = (T[]) Array.newInstance(componentClass, componentsList.size());
            for (int i = 0; i < componentsList.size(); i++)
                res[i] = componentsList.get(i).get(indices[i]);

            // move next
            for (int i = 0; i < componentsList.size() - 1; i++)
                if (indices[i] == componentsList.get(i).size())
                    indices[i] = 0;
                    indices[i + 1]++;
            hasNext = indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();

            return res;

    public static <T> Observable<T[]> product(Observable<Observable<T>> components, Class<T> componentClass)
        return Observable.fromIterable(new Iterable<T[]>()
            public Iterator<T[]> iterator()
                // postpone blocking up until iterator is requested 
                // and by this point we can't postpone anymore 
                Single<List<List<T>>> componentsList = components.map(o -> o.toList().blockingGet()).toList();
                return new ProductIterator<T>(componentsList.blockingGet(), componentClass);

    public static void main(String[] args) throws Exception
        Observable<Observable<Integer>> ob = Observable.just(
                Observable.just(0, 1),
                Observable.just(2, 3),
                Observable.just(4, 5)

        Observable<Integer[]> product = product(ob, Integer.class);
        product.forEach(a -> System.out.println(Arrays.toString(a)));

It is possible to improve this code to avoid blocking but you still will have to cache all results from all Observables and code will be much more complicated. Most probably is blocking is not acceptable for you, trying to get Cartesian product is bad idea anyway.


Well, I can resolve it myself. But is there any more elegant way?
(The toArray method convert an Observable<T> to T[])

    Observable<int[]> toObservableArray(Observable<Observable<Integer>> obs) {
        List<int[]> list = obs.map(ob -> toArray(ob)).toList().toBlocking().last();
        return Observable.create(new SyncOnSubscribe<int[], int[]>() {
            protected int[] generateState() {
                int[] array = new int[list.size()];
                Arrays.fill(array, 0);
                return array;

            protected int[] next(int[] state, Observer<? super int[]> observer) {
                int[] next = new int[list.size()];
                for (int i = 0; i < next.length; i++) {
                    next[i] = list.get(i)[state[i]];
                state[state.length - 1]++;
                for (int i = state.length - 1; i >= 0; i--) {
                    int delta = list.get(i).length - state[i];
                    if (delta > 0) {
                    } else if (delta == 0) {
                        state[i] = 0;
                        if (i == 0) {
                        state[i - 1]++;
                return state;