I would like to write a function akin to List.concat/1 that takes an enumerable of lists and emits the concatenated lists as a continuous stream.
It would work like this:
iex> 1..3 |> Stream.map(&([&1])) |> Enum.to_list
[[1], [2], [3]]
iex> 1..3 |> Stream.map(&([&1])) |> MyStream.concat |> Enum.to_list
[1, 2, 3]
What I have come up with so far is this:
defmodule MyStream do
def concat(lists) do
Enumerable.reduce(lists, [], fn(x, acc) -> acc ++ x end)
end
end
This produces the correct result but obviously isn't lazy.
I have unsuccessfully tried using Stream.Lazy
but really fail to understand the inner workings of it. Any explanation on Stream.Lazy
would be greatly appreciated!
Enumerables in Elixir are represented via reducing functions. We can map any structure as long as you tell us how to reduce over it.
The whole idea of Stream is that you can compose those reducing functions. Let's take map as an example:
def map(enumerable, f) do
Lazy[enumerable: enumerable,
fun: fn(f1) ->
fn(entry, acc) ->
f1.(f.(entry), acc)
end
end]
end
YOu receive an enumerable and you want to map over each element with the function f
. The lazy version receives the actual reducing function f1
and returns a new function, which receives entry
and acc
(the same arguments f1
would receive) and then call f.(entry)
effectively mapping the element before you call f1
(the reducing function). Notice how we are mapping over elements one by one.
The flat map variant of this would probably be something like:
def flat_map(enumerable, f) do
Lazy[enumerable: enumerable,
fun: fn(f1) ->
fn(entry, acc) ->
Enumerable.reduce(f.(entry), acc, f1)
end
end]
end
Now, every time you call f.(entry)
, you get a list back and you want to iterate over each element of this new list, instead of iterating over the list as a whole.
I have not tried the code above (and I may have missed some detail) but that's how Streams work in general.
With the help of José Valim it was just a very small step from his code to what I was looking for. I probably posed this question rather badly but what I was really looking for was an equivalent to Python's itertools.chain
function.
def chain(enumerable) do
Stream.Lazy[enumerable: enumerable,
fun: fn(f1) ->
fn(entry, acc) ->
Enumerable.reduce(entry, acc, f1)
end
end]
end
This allows you to chain potentially infinite enumerables of both streams or lists.
iex> 1..1000000 |> Stream.map(&(1..(&1))) |> MyModule.chain |> Enum.take(20)
[1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5]