RxJava + JavaFX Property

2019-09-03 02:34发布

问题:

I have a method called rxToProperty() that turns an Observable into a JavaFX Property.

 public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.onBackpressureLatest().serialize().subscribe(v -> { 
            synchronized(property) { 
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }

How do I ensure that the returned Property is always up-to-date especially when it is bound to controls in JavaFX? I have noticed some very bizarre, random behaviors that seem to indicate thread safety has been compromised with the schedulers and JavaFX.

Like for example, I try to use this method to show one source Observable scheduled in several different ways, and the results are random and haphazard when used in a TableView, with cells alternating between having values and not having values, as well as thread activity that sometimes will never end.

Whenever I try to schedule on the FX thread using Platform.invokeLater() it only makes the behavior more crazy. What is wrong with my rxToProperty() method?

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable());

        stage.setScene(scene);
        stage.sizeToScene();
        stage.show();
    }

    private static final class ReactiveRecord {

        private final Observable<Number> obs = Observable.just(10,20,30,40,50,60).cast(Number.class);

        public Observable<Number> subscribeImmediate() { 
            return obs;
        }
        public Observable<Number> subscribeComputation() { 
            return obs.subscribeOn(Schedulers.computation());
        }
        public Observable<Number> subscribeNewthread() { 
            return obs.subscribeOn(Schedulers.newThread());
        }
        public Observable<Number> subscribeIo() { 
            return obs.subscribeOn(Schedulers.io());
        }
        public Observable<Number> subscribeParallel() {
            return obs.flatMap(i -> Observable.just(i).subscribeOn(Schedulers.computation()));
        }
        public Observable<Number> subscribeTrampoline() { 
            return obs.subscribeOn(Schedulers.trampoline());
        }

        public ImmutableList<Observable<Number>> getAll() { 
            return ImmutableList.of(subscribeImmediate(),subscribeComputation(),subscribeNewthread(),subscribeIo(),subscribeParallel(),subscribeTrampoline());
        }
        public ImmutableList<String> getHeaders() {
            return ImmutableList.of("IMMEDIATE","COMPUTATION","NEW","IO","PARALLEL","TRAMPOLINE");
        }
    }

    private static final class ReactiveTable extends TableView<ReactiveRecord> {


        private ReactiveTable() { 
            ReactiveRecord record = new ReactiveRecord();

            this.getItems().add(record);

            ImmutableList<Observable<Number>> observables = record.getAll();
            ImmutableList<String> headers = record.getHeaders();

            for (int i = 0; i < observables.size(); i++) { 
                 TableColumn<ReactiveRecord,Number> col = new TableColumn<>(headers.get(i));
                 final int index = i;
                 col.setCellValueFactory(cb -> rxToProperty(observables.get(index)));
                 this.getColumns().add(col);
            }
        }
    }
    public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.onBackpressureLatest().serialize().subscribe(v -> { 
            synchronized(property) { 
                System.out.println("Emitting val " + v + " on " + Thread.currentThread().getName());
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }
    public static void main(String[] args) { 
        launch(args);
    }
}

回答1:

Tomas Mikula gave some very helpful insight to this behavior, as well as the solution, on the ReactFX GitHub project.

https://github.com/TomasMikula/ReactFX/issues/22