Suppose I have two possibly infinite streams:
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
I want to merge the streams and then map merged stream with slowish asynchronous operation (e.g. in Bacon with fromPromise
and flatMapConcat
).
I can combine them with merge
:
me = a12b3.c45d6.7e...
And then map
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..
As you see greedier s2
streams gets advantage in the long run. This is undesired behaviour.
The merge behaviour is not ok, as I want to have some kind of backpressure to have more interleaved, "fair", "round-robin" merge. Few examples of desired behaviour:
s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
One way to think this is that s1
and s2
send tasks to the worker which can handle only one task at the time. With merge
and flatMapConcat
I'll get a greedy task manager, but I want more fair one.
I'd like to find a simple and elegant solution. Would be nice if it is easily generalisable for arbitrary amount of streams:
// roundRobinPromiseMap(streams: [Stream a], f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1, s2], slowAsyncFunc);
Solution using RxJS or other Rx library is fine too.
Clarifications
Not zipAsArray
I don't want:
function roundRobinPromiseMap(streams, f) {
return Bacon.zipAsArray.apply(null, streams)
.flatMap(Bacon.fromArray)
.flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
Compare the example marble diagram:
s1 = a.....b..............c.......
s2 = ..1.2.3......................
mm = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based
Yes I'll run into buffering issues
... but so will I with straightforward unfair one:
function greedyPromiseMap(streams, f) {
Bacon.mergeAll(streams).flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
Marble diagram
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
merge = a...1...2...3...b....c...
The core challenge here was to understand, how to formalise fairness. In the question I already mentioned worker analogy. Turned out that the obvious fairness criteria is to pick a stream that generated less events than others, or taken even further: whom generated streams waited for less time.
After that it was quite trivial to formalise the desired output using denotational semantics: code is on GitHub
I didn't had time to develop the denotational combinators to include
withStateMachine
from Bacon.js, so the next step was to reimplement it in JavaScript with Bacon.js directly. The whole runnable solution is available as a gist.The idea is to make a state machine with
As output of the whole system is feeded back, we can dequeue the next event when the previous flatMapped stream is ended.
For that I had to make a bit ugly
rec
combinatorIt's type is
(EventStream a -> EventStream a) -> EventStream a
- the type resembles other recursion combinators, e.g.fix
.It can be made with better system-wide behaviour, as Bus breaks unsubscription propagation. We have to work on that.
The Second helper function is
stateMachine
, which takes an array of streams and turns them into single state machine. Essentially it's.withStateMachine ∘ mergeAll ∘ zipWithIndex
.Using this two helpers we can write a not-so-complex fair scheduler:
Rest of the code in the gist is quite straight-forward.
Here's a crazy chunk of code that might help.
It turns the input streams into a single stream of 'value' events, then merges them with 'send' events (and 'end' events for bookkeeping). Then, using a state machine, it builds up queues out of the 'value' events, and dispatches values on 'send' events.
Originally I wrote a roundRobinThrottle, but I've moved it to a gist.
Here is a roundRobinPromiseMap that is very similar. The code in the gist is tested, but this is not.