I have a method called rxToProperty()
that turns an Observable
into a JavaFX Property
.
public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) {
ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();
obs.onBackpressureLatest().serialize().subscribe(v -> {
synchronized(property) {
property.set(v);
}
});
return property.getReadOnlyProperty();
}
How do I ensure that the returned Property
is always up-to-date especially when it is bound to controls in JavaFX? I have noticed some very bizarre, random behaviors that seem to indicate thread safety has been compromised with the schedulers and JavaFX.
Like for example, I try to use this method to show one source Observable
scheduled in several different ways, and the results are random and haphazard when used in a TableView
, with cells alternating between having values and not having values, as well as thread activity that sometimes will never end.
Whenever I try to schedule on the FX thread using Platform.invokeLater()
it only makes the behavior more crazy. What is wrong with my rxToProperty()
method?
public class ReactiveTableViewTest extends Application {
@Override
public void start(Stage stage) throws Exception {
Group root = new Group();
Scene scene = new Scene(root);
root.getChildren().add(new ReactiveTable());
stage.setScene(scene);
stage.sizeToScene();
stage.show();
}
private static final class ReactiveRecord {
private final Observable<Number> obs = Observable.just(10,20,30,40,50,60).cast(Number.class);
public Observable<Number> subscribeImmediate() {
return obs;
}
public Observable<Number> subscribeComputation() {
return obs.subscribeOn(Schedulers.computation());
}
public Observable<Number> subscribeNewthread() {
return obs.subscribeOn(Schedulers.newThread());
}
public Observable<Number> subscribeIo() {
return obs.subscribeOn(Schedulers.io());
}
public Observable<Number> subscribeParallel() {
return obs.flatMap(i -> Observable.just(i).subscribeOn(Schedulers.computation()));
}
public Observable<Number> subscribeTrampoline() {
return obs.subscribeOn(Schedulers.trampoline());
}
public ImmutableList<Observable<Number>> getAll() {
return ImmutableList.of(subscribeImmediate(),subscribeComputation(),subscribeNewthread(),subscribeIo(),subscribeParallel(),subscribeTrampoline());
}
public ImmutableList<String> getHeaders() {
return ImmutableList.of("IMMEDIATE","COMPUTATION","NEW","IO","PARALLEL","TRAMPOLINE");
}
}
private static final class ReactiveTable extends TableView<ReactiveRecord> {
private ReactiveTable() {
ReactiveRecord record = new ReactiveRecord();
this.getItems().add(record);
ImmutableList<Observable<Number>> observables = record.getAll();
ImmutableList<String> headers = record.getHeaders();
for (int i = 0; i < observables.size(); i++) {
TableColumn<ReactiveRecord,Number> col = new TableColumn<>(headers.get(i));
final int index = i;
col.setCellValueFactory(cb -> rxToProperty(observables.get(index)));
this.getColumns().add(col);
}
}
}
public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) {
ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();
obs.onBackpressureLatest().serialize().subscribe(v -> {
synchronized(property) {
System.out.println("Emitting val " + v + " on " + Thread.currentThread().getName());
property.set(v);
}
});
return property.getReadOnlyProperty();
}
public static void main(String[] args) {
launch(args);
}
}