RxJava +物业的JavaFX(RxJava + JavaFX Property)

2019-11-03 20:53发布

我有一个名为方法rxToProperty()轮流的Observable到一个JavaFX Property

 public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.onBackpressureLatest().serialize().subscribe(v -> { 
            synchronized(property) { 
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }

如何确保返回的Property始终是最新尤其是当它必然要控制在JavaFX的? 我注意到,似乎表明线程安全性受到了损害与调度和JavaFX一些很奇怪的,随机的行为。

例如像,我尝试使用这种方法来显示一个源Observable计划在几种不同的方式,并在使用时的结果是随机的,偶然TableView ,与细胞中具有值的和没有价值,以及线程活动之间交替有时将永远不会结束。

每当我尝试安排使用FX线程Platform.invokeLater()它不仅使行为更加疯狂。 有什么不对我rxToProperty()方法?

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable());

        stage.setScene(scene);
        stage.sizeToScene();
        stage.show();
    }

    private static final class ReactiveRecord {

        private final Observable<Number> obs = Observable.just(10,20,30,40,50,60).cast(Number.class);

        public Observable<Number> subscribeImmediate() { 
            return obs;
        }
        public Observable<Number> subscribeComputation() { 
            return obs.subscribeOn(Schedulers.computation());
        }
        public Observable<Number> subscribeNewthread() { 
            return obs.subscribeOn(Schedulers.newThread());
        }
        public Observable<Number> subscribeIo() { 
            return obs.subscribeOn(Schedulers.io());
        }
        public Observable<Number> subscribeParallel() {
            return obs.flatMap(i -> Observable.just(i).subscribeOn(Schedulers.computation()));
        }
        public Observable<Number> subscribeTrampoline() { 
            return obs.subscribeOn(Schedulers.trampoline());
        }

        public ImmutableList<Observable<Number>> getAll() { 
            return ImmutableList.of(subscribeImmediate(),subscribeComputation(),subscribeNewthread(),subscribeIo(),subscribeParallel(),subscribeTrampoline());
        }
        public ImmutableList<String> getHeaders() {
            return ImmutableList.of("IMMEDIATE","COMPUTATION","NEW","IO","PARALLEL","TRAMPOLINE");
        }
    }

    private static final class ReactiveTable extends TableView<ReactiveRecord> {


        private ReactiveTable() { 
            ReactiveRecord record = new ReactiveRecord();

            this.getItems().add(record);

            ImmutableList<Observable<Number>> observables = record.getAll();
            ImmutableList<String> headers = record.getHeaders();

            for (int i = 0; i < observables.size(); i++) { 
                 TableColumn<ReactiveRecord,Number> col = new TableColumn<>(headers.get(i));
                 final int index = i;
                 col.setCellValueFactory(cb -> rxToProperty(observables.get(index)));
                 this.getColumns().add(col);
            }
        }
    }
    public static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.onBackpressureLatest().serialize().subscribe(v -> { 
            synchronized(property) { 
                System.out.println("Emitting val " + v + " on " + Thread.currentThread().getName());
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }
    public static void main(String[] args) { 
        launch(args);
    }
}

Answer 1:

托马斯Mikula给了一些非常有益的见解,以这种行为,以及解决方案,在ReactFX GitHub的项目。

https://github.com/TomasMikula/ReactFX/issues/22



文章来源: RxJava + JavaFX Property