我有一个名为方法rxToProperty()
轮流的Observable
到一个JavaFX Property
。
public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) {
ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();
obs.onBackpressureLatest().serialize().subscribe(v -> {
synchronized(property) {
property.set(v);
}
});
return property.getReadOnlyProperty();
}
如何确保返回的Property
始终是最新尤其是当它必然要控制在JavaFX的? 我注意到,似乎表明线程安全性受到了损害与调度和JavaFX一些很奇怪的,随机的行为。
例如像,我尝试使用这种方法来显示一个源Observable
计划在几种不同的方式,并在使用时的结果是随机的,偶然TableView
,与细胞中具有值的和没有价值,以及线程活动之间交替有时将永远不会结束。
每当我尝试安排使用FX线程Platform.invokeLater()
它不仅使行为更加疯狂。 有什么不对我rxToProperty()
方法?
public class ReactiveTableViewTest extends Application {
@Override
public void start(Stage stage) throws Exception {
Group root = new Group();
Scene scene = new Scene(root);
root.getChildren().add(new ReactiveTable());
stage.setScene(scene);
stage.sizeToScene();
stage.show();
}
private static final class ReactiveRecord {
private final Observable<Number> obs = Observable.just(10,20,30,40,50,60).cast(Number.class);
public Observable<Number> subscribeImmediate() {
return obs;
}
public Observable<Number> subscribeComputation() {
return obs.subscribeOn(Schedulers.computation());
}
public Observable<Number> subscribeNewthread() {
return obs.subscribeOn(Schedulers.newThread());
}
public Observable<Number> subscribeIo() {
return obs.subscribeOn(Schedulers.io());
}
public Observable<Number> subscribeParallel() {
return obs.flatMap(i -> Observable.just(i).subscribeOn(Schedulers.computation()));
}
public Observable<Number> subscribeTrampoline() {
return obs.subscribeOn(Schedulers.trampoline());
}
public ImmutableList<Observable<Number>> getAll() {
return ImmutableList.of(subscribeImmediate(),subscribeComputation(),subscribeNewthread(),subscribeIo(),subscribeParallel(),subscribeTrampoline());
}
public ImmutableList<String> getHeaders() {
return ImmutableList.of("IMMEDIATE","COMPUTATION","NEW","IO","PARALLEL","TRAMPOLINE");
}
}
private static final class ReactiveTable extends TableView<ReactiveRecord> {
private ReactiveTable() {
ReactiveRecord record = new ReactiveRecord();
this.getItems().add(record);
ImmutableList<Observable<Number>> observables = record.getAll();
ImmutableList<String> headers = record.getHeaders();
for (int i = 0; i < observables.size(); i++) {
TableColumn<ReactiveRecord,Number> col = new TableColumn<>(headers.get(i));
final int index = i;
col.setCellValueFactory(cb -> rxToProperty(observables.get(index)));
this.getColumns().add(col);
}
}
}
public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) {
ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();
obs.onBackpressureLatest().serialize().subscribe(v -> {
synchronized(property) {
System.out.println("Emitting val " + v + " on " + Thread.currentThread().getName());
property.set(v);
}
});
return property.getReadOnlyProperty();
}
public static void main(String[] args) {
launch(args);
}
}