RX框架:执行是在超时的动作,而不会中断原观察到的序列(Rx Framework: execute

2019-09-23 17:09发布

鉴于可观察到的源,通过轮询产生的低级别的设备的状态下的(的变化)...

// observable source metacode:
IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5))
    .Select(tick => new DeviceState(_device.ReadValue()))
    .DistinctUntilChanged();

...和消费者是更新UI ...

// UI metacode:
service.GetObservableDeviceStates()
    .Subscribe(state => viewModel.CurrentState = state.ToString());

......我需要执行源的“无为”的x秒后自定义操作,无需中断订阅源。 事情是这样的:

// UI metacode:
service.GetObservableDeviceStates()
    .DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = "Idle")
    .Subscribe(state => viewModel.CurrentState = state.ToString());

什么是最好的做法? 浮现在脑海中可能的解决方案是(我是小白的Rx):

  1. 缓冲区 (即使它不是那么可读)
  2. 玩了这个超时超负荷 ;
  3. 返回一些特殊的“服务端”时没有什么变化(而不是使用DistinctUntilChanged),并用它的UI代码进行处理:

    service.GetObservableDeviceStates().Subscribe(?状态=> viewModel.CurrentState = state.Special “空闲”:state.ToString());

编辑:作为报道的答案 ,解决的办法是:

        service.GetObservableDeviceStates()
            .Do(onNext)
            .Throttle(TimeSpan.FromSeconds(x))
            .Subscribe(onTimeout);

EDIT2(警告)

如果onNext和onTimeout更新UI组件,以避免CrossThreadExceptions 2个 ObserveOn(uiSynchronizationContext)都需要,因为油门工作在另一个线程!

        service.GetObservableDeviceStates()
            .ObserveOn(uiSynchronizationContext)
            .Do(onNext)
            .Throttle(TimeSpan.FromSeconds(x))
            .ObserveOn(uiSynchronizationContext)
            .Subscribe(onTimeout);

Answer 1:

超时或多或少意味着它代表一个异步操作观测-例如,希望返回默认值或者OnError如果说观察到的未通知你在一定的时间。

你要找的操作是油门 ,即使它似乎并不像它在第一。 Throttle(p)给你产生当源数据流还没有产生对周期的值的值的流p

平行于现有的代码,你可以使用source.Throttle(period).Do(...side effect)



Answer 2:

我个人会避免这种情况的待办事项方法。 这确实让本例中的代码很容易,但我发现,一旦使用“做”潜入你很快就会有意大利面条的代码库。

你也可以考虑使用大使,定时器,TakeUntil,油门等的组合,得到的结果您正在寻找的同时仍保持单子*。 或者简单地说,我想你最好想有未来通过,而不是状态值的序列要求需要把一个计时器在你的代码(即关闭其负载的服务)。

public IObservable<DeviceStatus> GetObservableDeviceStates(TimeSpan silencePeriod)
{
    return Observable.Create<DeviceStatus>(
    o=>
    {
        var idle = Observable.Timer(silencePeriod).Select(_=>new DeviceStatus("Idle"));

        var polledStatus = Observable.Interval(TimeSpan.FromSeconds(0.5))
                        .Select(tick => new DeviceStatus(_device.ReadValue()))
                        .DistinctUntilChanged()
                        .Publish();

        var subscription = (from status in polledStatus
                            from cont in Observable.Return(status).Concat(idle.TakeUntil(polledStatus))
                            select cont)
                     .Subscribe(o);

        return new CompositeDisposable(subscription, polledStatus.Connect());
    });
}

此代码现在有服务返回空闲状态值,一旦发生指定的更改的沉默期。

这意味着你的UI元代码保持简单和相关DeviceStatus逻辑保持它所属

// UI metacode:
service.GetObservableDeviceStates(TimeSpan.FromSeconds(2))
    .Subscribe(state => viewModel.CurrentState = state.ToString());


文章来源: Rx Framework: execute an action on timeout without interrupting the original observable sequence