I'm trying to implement the ObserveLatestOn operator in RxJava (actually, RxScala).
This operator is useful, when we've got a fast producer and a slow subscriber, but the subscriber doesn't care about any items lost while it was consuming an item.
A marble diagram:
--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|
The =
character represents a long-running work being performed by the subscriber, the >
character represents the work just finishing. As the canonical example of usage imagine a producer of some data that need displaying, and a screen renderer of the data as a subscriber. The rendering takes quite long, but we don't need to render every step on the screen, just the last one is perfectly good.
In the above marble diagram, the producer signals 1. The subscriber begins to process it, and it takes a long time. Meanwhile, the producer emits 2 and 3, and it isn't after that that the subscriber finishes the work. It sees that the last item emitted by the producer was 3, so it begins processing that. That is quick, no new item has been produced meanwhile, so the subscriber can rest. Then, 5 arrives and the story goes on in the same manner.
I've spent hours trying to implement this seemingly simple operator, but I'm still not satisfied. The very nature of the operator indicates that it should be an asynchronous one, it should emit its items on a different scheduler than it receives them. But at the same time, of course I don't want to have a thread occupied by a worker while there is no work to be done.
This is what I've come up with so far:
def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
@volatile var maybeNextItem: Option[Notification[T]] = None
@volatile var isWorkScheduled = false
val itemsQueueLock = new Object()
Observable(subscriber ⇒ {
def signalToSubscriber(materializedItem: Notification[T]): Unit = {
materializedItem match {
case OnNext(item) ⇒ subscriber onNext item
case OnError(error) ⇒ subscriber onError error
case OnCompleted ⇒ subscriber.onCompleted()
}
}
def queueItem(item: Notification[T]): Unit = {
val worker = scheduler.createWorker
val shouldScheduleWork = itemsQueueLock synchronized {
val result = !isWorkScheduled
maybeNextItem = Some(item)
isWorkScheduled = true
result
}
if (shouldScheduleWork) {
worker.scheduleRec {
val maybeNextItemToSignal = itemsQueueLock synchronized {
val result = maybeNextItem
if (result.isEmpty) {
worker.unsubscribe()
isWorkScheduled = false
}
maybeNextItem = None
result
}
maybeNextItemToSignal foreach signalToSubscriber
}
}
}
o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
next ⇒ queueItem(OnNext(next)),
error ⇒ queueItem(OnError(error)),
() ⇒ queueItem(OnCompleted)
)
})
}
It seems to work, but I don't feel confident that there are no race conditions or deadlocks. Also, I'm not sure if the solution could perhaps be made simpler. I've also been thinking of another approaches, like
- some clever use of
OperatorDebounceWithSelector
- combination of an observable requesting just one item at a time,
observeOn
andonBackpressureBuffer(1)
I also don't know how to write deterministic unit tests for this. The work scheduled by scheduleRec
can't be interrupted when used with TestScheduler
, I need to use a scheduler that really works on a different thread. I find it hard to write correct unit tests for race conditions of multi-threaded code.
So, the question remains: Is my solution correct? Is there any simpler, better or more correct approach to this? And how to test it's correctness?