How to implement observeLatestOn in RxJava (RxScal

2019-02-27 05:18发布

问题:

I'm trying to implement the ObserveLatestOn operator in RxJava (actually, RxScala).

This operator is useful, when we've got a fast producer and a slow subscriber, but the subscriber doesn't care about any items lost while it was consuming an item.

A marble diagram:

--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|

The = character represents a long-running work being performed by the subscriber, the > character represents the work just finishing. As the canonical example of usage imagine a producer of some data that need displaying, and a screen renderer of the data as a subscriber. The rendering takes quite long, but we don't need to render every step on the screen, just the last one is perfectly good.

In the above marble diagram, the producer signals 1. The subscriber begins to process it, and it takes a long time. Meanwhile, the producer emits 2 and 3, and it isn't after that that the subscriber finishes the work. It sees that the last item emitted by the producer was 3, so it begins processing that. That is quick, no new item has been produced meanwhile, so the subscriber can rest. Then, 5 arrives and the story goes on in the same manner.

I've spent hours trying to implement this seemingly simple operator, but I'm still not satisfied. The very nature of the operator indicates that it should be an asynchronous one, it should emit its items on a different scheduler than it receives them. But at the same time, of course I don't want to have a thread occupied by a worker while there is no work to be done.

This is what I've come up with so far:

def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
  @volatile var maybeNextItem: Option[Notification[T]] = None
  @volatile var isWorkScheduled = false
  val itemsQueueLock = new Object()

  Observable(subscriber ⇒ {
    def signalToSubscriber(materializedItem: Notification[T]): Unit = {
      materializedItem match {
        case OnNext(item) ⇒ subscriber onNext item
        case OnError(error) ⇒ subscriber onError error
        case OnCompleted ⇒ subscriber.onCompleted()
      }
    }

    def queueItem(item: Notification[T]): Unit = {
      val worker = scheduler.createWorker

      val shouldScheduleWork = itemsQueueLock synchronized {
        val result = !isWorkScheduled
        maybeNextItem = Some(item)
        isWorkScheduled = true
        result
      }

      if (shouldScheduleWork) {
        worker.scheduleRec {
          val maybeNextItemToSignal = itemsQueueLock synchronized {
            val result = maybeNextItem
            if (result.isEmpty) {
              worker.unsubscribe()
              isWorkScheduled = false
            }
            maybeNextItem = None
            result
          }

          maybeNextItemToSignal foreach signalToSubscriber
        }
      }
    }

    o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
      next ⇒ queueItem(OnNext(next)),
      error ⇒ queueItem(OnError(error)),
      () ⇒ queueItem(OnCompleted)
    )
  })
}

It seems to work, but I don't feel confident that there are no race conditions or deadlocks. Also, I'm not sure if the solution could perhaps be made simpler. I've also been thinking of another approaches, like

  • some clever use of OperatorDebounceWithSelector
  • combination of an observable requesting just one item at a time, observeOn and onBackpressureBuffer(1)

I also don't know how to write deterministic unit tests for this. The work scheduled by scheduleRec can't be interrupted when used with TestScheduler, I need to use a scheduler that really works on a different thread. I find it hard to write correct unit tests for race conditions of multi-threaded code.

So, the question remains: Is my solution correct? Is there any simpler, better or more correct approach to this? And how to test it's correctness?

回答1:

I recommend that using lift to implement this operator. Here is my solution:

package object ObservableEx {

  implicit class ObserveLatestOn[T](val o: Observable[T]) {

    def observeLatestOn(scheduler: Scheduler): Observable[T] = {
      o.lift { (child: Subscriber[T]) =>
        val worker = scheduler.createWorker
        child.add(worker)

        val parent = new Subscriber[T] {

          private val lock = new AnyRef

          // protected by "lock"
          private var latest: Notification[T] = null
          // protected by "lock"
          // Means no task runs in the worker
          private var idle = true

          private var done = false

          override def onStart(): Unit = {
            request(Long.MaxValue)
          }

          override def onNext(v: T): Unit = {
            if (!done) {
              emit(OnNext(v))
            }
          }

          override def onCompleted(): Unit = {
            if (!done) {
              done = true
              emit(OnCompleted)
            }
          }

          override def onError(e: Throwable): Unit = {
            if (!done) {
              done = true
              emit(OnError(e))
            }
          }

          def emit(v: Notification[T]): Unit = {
            var shouldSchedule = false
            lock.synchronized {
              latest = v
              if (idle) {
                // worker is idle so we should schedule a task
                shouldSchedule = true
                // We will schedule a task, so the worker will be busy
                idle = false
              }
            }
            if (shouldSchedule) {
              worker.schedule {
                var n: Notification[T] = null
                var exit = false
                while (!exit) {
                  lock.synchronized {
                    if (latest == null) {
                      // No new item arrives and we are leaving the worker, so set "idle"
                      idle = true
                      exit = true
                    } else {
                      n = latest
                      latest = null
                    }
                  }
                  if (!exit) {
                    n.accept(child)
                  }
                }
              }
            }
          }
        }

        child.add(parent)

        parent
      }
    }
  }

}

And a unit test

import ObservableEx.ObserveLatestOn

@Test
def testObserveLatestOn(): Unit = {
  val scheduler = TestScheduler()
  val xs = mutable.ArrayBuffer[Long]()
  var completed = false
  Observable.interval(100 milliseconds, scheduler).take(10).observeLatestOn(scheduler).subscribe(v => {
    scheduler.advanceTimeBy(200 milliseconds)
    xs += v
  },
    e => e.printStackTrace(),
    () => completed = true
  )
  scheduler.advanceTimeBy(100 milliseconds)
  assert(completed === true)
  assert(xs === List(0, 2, 4, 6, 8))
}


回答2:

I have a PR in which the operator onBackpressureLatest() should have the expected behavior, but you need concurrency and can use observeOn as usual.