It is possible to pull on demand from a number (say two for simplicity) of sources using streams (lazy lists). Iteratees can be used to process data coming from a single source.
Is there an Iteratee-like functional concept for processing multiple input sources? I could imagine an Iteratee whose state signals from which source does it want to pull.
Conduits (and, it can be built for Pipes, but that code hasn't been released yet) has a
zip
primitive that takes two upstreams and combines them as a stream of tuples.We're using Machines in Scala to pull in not just two, but an arbitrary amount of sources.
Two examples of binary joins are provided by the library itself, on the
Tee
module:mergeOuterJoin
andhashJoin
. Here is what the code forhashJoin
looks like (it assumes both streams are sorted):This code builds up a
Plan
which is "compiled" to aMachine
with therepeatedly
method. The type being built here isTee[A, B, (A, B)]
which is a machine with two inputs. You request inputs on the left and right withawaits(left)
andawaits(right)
, and you output withemit
.There is also a Haskell version of Machines.
Check out the pipes library, where vertical concatenation might do what you want. For example,
The sequencing operator
(>>)
vertically concatenates the sources, yielding the output (on arunPipe
)To do this using pipes you nest the Pipe monad transformer within itself, once for each producer you wish to interact with. For example:
Just like a Haskell curried function of multiple variables, you partially apply it to each source using composition and runPipe:
The above function outputs when run:
This trick works for yielding or awaiting to any number of pipes upstream or downstream. It also works for proxies, the bidirectional analogs to pipes.
Edit: Note that this also works for any iteratee library, not just
pipes
. In fact, John Milikin and Oleg were the original advocates for this approach and I just stole the idea from them.