Lazily concatenate an enumerable of lists

2019-04-08 13:40发布

问题:

I would like to write a function akin to List.concat/1 that takes an enumerable of lists and emits the concatenated lists as a continuous stream.

It would work like this:

iex> 1..3 |> Stream.map(&([&1])) |> Enum.to_list
[[1], [2], [3]]
iex> 1..3 |> Stream.map(&([&1])) |> MyStream.concat |> Enum.to_list
[1, 2, 3]

What I have come up with so far is this:

defmodule MyStream do
  def concat(lists) do
    Enumerable.reduce(lists, [], fn(x, acc) -> acc ++ x end)
  end
end

This produces the correct result but obviously isn't lazy.

I have unsuccessfully tried using Stream.Lazy but really fail to understand the inner workings of it. Any explanation on Stream.Lazy would be greatly appreciated!

回答1:

Enumerables in Elixir are represented via reducing functions. We can map any structure as long as you tell us how to reduce over it.

The whole idea of Stream is that you can compose those reducing functions. Let's take map as an example:

def map(enumerable, f) do
  Lazy[enumerable: enumerable,
       fun: fn(f1) ->
         fn(entry, acc) ->
           f1.(f.(entry), acc)
         end
       end]
end

YOu receive an enumerable and you want to map over each element with the function f. The lazy version receives the actual reducing function f1 and returns a new function, which receives entry and acc (the same arguments f1 would receive) and then call f.(entry) effectively mapping the element before you call f1 (the reducing function). Notice how we are mapping over elements one by one.

The flat map variant of this would probably be something like:

def flat_map(enumerable, f) do
  Lazy[enumerable: enumerable,
       fun: fn(f1) ->
         fn(entry, acc) ->
           Enumerable.reduce(f.(entry), acc, f1)
         end
       end]
end

Now, every time you call f.(entry), you get a list back and you want to iterate over each element of this new list, instead of iterating over the list as a whole.

I have not tried the code above (and I may have missed some detail) but that's how Streams work in general.



回答2:

With the help of José Valim it was just a very small step from his code to what I was looking for. I probably posed this question rather badly but what I was really looking for was an equivalent to Python's itertools.chain function.

def chain(enumerable) do
  Stream.Lazy[enumerable: enumerable,
              fun: fn(f1) ->
                fn(entry, acc) ->
                  Enumerable.reduce(entry, acc, f1)
                end
              end]
end

This allows you to chain potentially infinite enumerables of both streams or lists.

iex> 1..1000000 |> Stream.map(&(1..(&1))) |> MyModule.chain |> Enum.take(20)
[1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5]