Now I hold an Observable<Observable<Integer>
, how can I transfer it into Observable<int[]>
that contains the n-ary Cartesian product?
For example:
Observable<Observable<Integer> ob = Observable.just(
Observable.just(0,1),
Observable.just(2,3),
Observable.just(4,5)
);
ob...... -> (0,2,4), (0,3,4), (0,2,5), (0,3,5), (1,2,4), (1,3,4), (1,2,5), (1,3,5)
First of all, you need a fixed number of input Observable
s. Second, there is no need for blocking but there is likely the need for caching because the 2nd, 3rd etc Observable
s need to be consumed multiple times.
import java.util.*;
import io.reactivex.Observable;
public class Cartesian {
static Observable<int[]> cartesian(Observable<Observable<Integer>> sources) {
return sources.toList().flatMapObservable(list -> cartesian(list));
}
static Observable<int[]> cartesian(List<Observable<Integer>> sources) {
if (sources.size() == 0) {
return Observable.<int[]>empty();
}
Observable<int[]> main = sources.get(0).map(v -> new int[] { v });
for (int i = 1; i < sources.size(); i++) {
int j = i;
Observable<Integer> o = sources.get(i).cache();
main = main.flatMap(v -> {
return o.map(w -> {
int[] arr = Arrays.copyOf(v, j + 1);
arr[j] = w;
return arr;
});
});
}
return main;
}
public static void main(String[] args) {
cartesian(Observable.just(
Observable.just(0, 1),
Observable.just(2, 3),
Observable.just(4, 5)
))
.subscribe(v -> System.out.println(Arrays.toString(v)));
}
}
Creating Cartesian product in an asynchronous way is hard or in some sense is impossible. And if blocking is OK, you can do something like this
public class Main
{
static class ProductIterator<T> implements Iterator<T[]>
{
private final List<List<T>> componentsList;
private final Class<T> componentClass;
private final int[] indices;
private boolean hasNext;
public ProductIterator(List<List<T>> componentsList, Class<T> componentClass)
{
this.componentsList = componentsList;
this.componentClass = componentClass;
this.indices = new int[componentsList.size()];
this.hasNext = this.indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();
}
@Override
public boolean hasNext()
{
return hasNext;
}
@Override
public T[] next()
{
T[] res = (T[]) Array.newInstance(componentClass, componentsList.size());
for (int i = 0; i < componentsList.size(); i++)
{
res[i] = componentsList.get(i).get(indices[i]);
}
// move next
indices[0]++;
for (int i = 0; i < componentsList.size() - 1; i++)
{
if (indices[i] == componentsList.get(i).size())
{
indices[i] = 0;
indices[i + 1]++;
}
}
hasNext = indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();
return res;
}
}
public static <T> Observable<T[]> product(Observable<Observable<T>> components, Class<T> componentClass)
{
return Observable.fromIterable(new Iterable<T[]>()
{
@Override
public Iterator<T[]> iterator()
{
// postpone blocking up until iterator is requested
// and by this point we can't postpone anymore
Single<List<List<T>>> componentsList = components.map(o -> o.toList().blockingGet()).toList();
return new ProductIterator<T>(componentsList.blockingGet(), componentClass);
}
});
}
public static void main(String[] args) throws Exception
{
Observable<Observable<Integer>> ob = Observable.just(
Observable.just(0, 1),
Observable.just(2, 3),
Observable.just(4, 5)
);
Observable<Integer[]> product = product(ob, Integer.class);
product.forEach(a -> System.out.println(Arrays.toString(a)));
}
}
It is possible to improve this code to avoid blocking but you still will have to cache all results from all Observable
s and code will be much more complicated. Most probably is blocking is not acceptable for you, trying to get Cartesian product is bad idea anyway.
Well, I can resolve it myself. But is there any more elegant way?
(The toArray
method convert an Observable<T>
to T[]
)
Observable<int[]> toObservableArray(Observable<Observable<Integer>> obs) {
List<int[]> list = obs.map(ob -> toArray(ob)).toList().toBlocking().last();
return Observable.create(new SyncOnSubscribe<int[], int[]>() {
@Override
protected int[] generateState() {
int[] array = new int[list.size()];
Arrays.fill(array, 0);
return array;
}
@Override
protected int[] next(int[] state, Observer<? super int[]> observer) {
int[] next = new int[list.size()];
for (int i = 0; i < next.length; i++) {
next[i] = list.get(i)[state[i]];
}
observer.onNext(next);
state[state.length - 1]++;
for (int i = state.length - 1; i >= 0; i--) {
int delta = list.get(i).length - state[i];
if (delta > 0) {
break;
} else if (delta == 0) {
state[i] = 0;
if (i == 0) {
observer.onCompleted();
break;
}
state[i - 1]++;
}
}
return state;
}
});
}