Splitting a list in equal sized chunks in Erlang

2019-04-05 16:25发布

问题:

I want to split:

[1,2,3,4,5,6,7,8]

into:

[[1,2],[3,4],[5,6],[7,8]]

It generally works great with:

[ lists:sublist(List, X, 2) || X <- lists:seq(1,length(List),2) ] .

But it is really slow this way. 10000 Elements take amazing 2.5 seconds on my netbook. I have also written a really fast recursive function, but I am simply interested: Could this list comprehension also be written in a different way, so that it is faster?

回答1:

Try this:

part(List) ->
        part(List, []).
part([], Acc) ->
        lists:reverse(Acc);
part([H], Acc) ->
        lists:reverse([[H]|Acc]);
part([H1,H2|T], Acc) ->
        part(T, [[H1,H2]|Acc]).

Test in erlang-shell (I've declared this function in module part):

2> part:part([1,2,3,4,5,6,7,8]).
[[1,2],[3,4],[5,6],[7,8]]
3> 
3> timer:tc(part, part, [lists:seq(1,10000)]).
{774,
 [[1,2],
  [3,4],
  [5,6],
  [7,8],
  "\t\n","\v\f",
  [13,14],
  [15,16],
  [17,18],
  [19,20],
  [21,22],
  [23,24],
  [25,26],
  [27,28],
  [29,30],
  [31,32],
  "!\"","#$","%&","'(",")*","+,","-.","/0","12","34",
  [...]|...]}

Just 774 microseconds (which is ~0,8 milliseconds)



回答2:

Here are two quick solutions for you that are both flexible. One is easy to read, but only slightly faster than your proposed solution. The other is quite fast, but is a bit cryptic to read. And note that both of my proposed algorithms will work for lists of anything, not just numeric ordered lists.

Here is the "easy-to-read" one. Call by n_length_chunks(List,Chunksize). For example, to get a list of chunks 2 long, call n_length_chunks(List,2). This works for chunks of any size, ie, you could call n_length_chunks(List,4) to get [[1,2,3,4],[5,6,7,8],...]

n_length_chunks([],_) -> [];
n_length_chunks(List,Len) when Len > length(List) ->
    [List];
n_length_chunks(List,Len) ->
    {Head,Tail} = lists:split(Len,List),
    [Head | n_length_chunks(Tail,Len)].

The much faster one is here, but is definitely harder to read, and is called in the same way: n_length_chunks_fast(List,2) (I've made one change to this compared with the one above, in that it pads the end of the list with undefined if the length of the list isn't cleanly divisible by the desired chunk length.

n_length_chunks_fast(List,Len) ->
  LeaderLength = case length(List) rem Len of
      0 -> 0;
      N -> Len - N
  end,
  Leader = lists:duplicate(LeaderLength,undefined),
  n_length_chunks_fast(Leader ++ lists:reverse(List),[],0,Len).

n_length_chunks_fast([],Acc,_,_) -> Acc;
n_length_chunks_fast([H|T],Acc,Pos,Max) when Pos==Max ->
    n_length_chunks_fast(T,[[H] | Acc],1,Max);
n_length_chunks_fast([H|T],[HAcc | TAcc],Pos,Max) ->
    n_length_chunks_fast(T,[[H | HAcc] | TAcc],Pos+1,Max);
n_length_chunks_fast([H|T],[],Pos,Max) ->
    n_length_chunks_fast(T,[[H]],Pos+1,Max).

Tested on my (really old) laptop:

  • Your proposed solution took about 3 seconds.
  • My slow-but-readable one was slightly faster and takes about 1.5 seconds (still quite slow)
  • My fast version takes about 5 milliseconds.
  • For completeness, Isac's solution took about 180 milliseconds on my same machine.

Edit: wow, I need to read the complete question first. Oh well I'll keep here for posterity if it helps. As far as I can tell, there's not a good way to do this using list comprehensions. Your original version is slow because each iteration of sublist needs to traverse the list each time to get to each successive X, resulting in complexity just under O(N^2).



回答3:

Or with a fold:

  lists:foldr(fun(E, []) -> [[E]]; 
                 (E, [H|RAcc]) when length(H) < 2 -> [[E|H]|RAcc] ;
                 (E, [H|RAcc]) -> [[E],H|RAcc]
              end, [], List).


回答4:

I want to submit slightly complicated but more flexible (and mostly faster) solution of one proposed by @Tilman

split_list(List, Max) ->
    element(1, lists:foldl(fun
        (E, {[Buff|Acc], C}) when C < Max ->
            {[[E|Buff]|Acc], C+1};
        (E, {[Buff|Acc], _}) ->
            {[[E],Buff|Acc], 1};
        (E, {[], _}) ->
            {[[E]], 1}
    end, {[], 0}, List)).

so function part can be implemented as

part(List) ->
     RevList = split_list(List, 2),
     lists:foldl(fun(E, Acc) ->
         [lists:reverse(E)|Acc]
     end, [], RevList).

update I've added reverse in case if you want to preserve order, but as I can see it adds no more than 20% of processing time.



回答5:

You could do it like this:

1> {List1, List2} = lists:partition(fun(X) -> (X rem 2) == 1 end, List).
{[1,3,5|...],[2,4,6|...]}
2> lists:zipwith(fun(X, Y) -> [X, Y] end, List1, List2).
[[1,2],[3,4],[5,6]|...]

This takes ~73 milliseconds with a 10000 elements List on my computer. The original solution takes ~900 miliseconds.

But I would go with the recursive function anyway.



回答6:

I've slightly altered the implementation from @JLarky to remove the guard expression, which should be slightly faster:

split_list(List, Max) ->
    element(1, lists:foldl(fun
        (E, {[Buff|Acc], 1}) ->
            {[[E],Buff|Acc], Max};
        (E, {[Buff|Acc], C}) ->
            {[[E|Buff]|Acc], C-1};
        (E, {[], _}) ->
            {[[E]], Max}
    end, {[], Max}, List)).


回答7:

I was looking for a partition function which can split a large list to small amount of workers. With lkuty's partition you might get that one worker gets almost double work than all the others. If that's not what you want, here is a version which sublist lengths differ by at most 1.

Uses PropEr for testing.

%% @doc Split List into sub-lists so sub-lists lengths differ most by 1.
%% Does not preserve order.
-spec split_many(pos_integer(), [T]) -> [[T]] when T :: term().
split_many(N, List) ->
    PieceLen = length(List) div N,
    lists:reverse(split_many(PieceLen, N, List, [])).

-spec split_many(pos_integer(), pos_integer(), [T], [[T]]) ->
    [[T]] when T :: term().
split_many(PieceLen, N, List, Acc) when length(Acc) < N ->
    {Head, Tail} = lists:split(PieceLen, List),
    split_many(PieceLen, N, Tail, [Head|Acc]);

split_many(_PieceLen, _N, List, Acc) ->
    % Add an Elem to each list in Acc
    {Appendable, LeaveAlone} = lists:split(length(List), Acc),
    Appended = [[Elem|XS] || {Elem, XS} <- lists:zip(List, Appendable)],
    lists:append(Appended, LeaveAlone).

Tests:

split_many_test_() ->
    [
     ?_assertEqual([[1,2]], elibs_lists:split_many(1, [1,2])),
     ?_assertEqual([[1], [2]], elibs_lists:split_many(2, [1,2])),
     ?_assertEqual([[1], [3,2]], elibs_lists:split_many(2, [1,2,3])),
     ?_assertEqual([[1], [2], [4,3]], elibs_lists:split_many(3, [1,2,3,4])),
     ?_assertEqual([[1,2], [5,3,4]], elibs_lists:split_many(2, [1,2,3,4,5])),
     ?_assert(proper:quickcheck(split_many_proper1())),
     ?_assert(proper:quickcheck(split_many_proper2()))
    ].


%% @doc Verify all elements are preserved, number of groups is correct,
%% all groups have same number of elements (+-1)
split_many_proper1() ->
    ?FORALL({List, Groups},
            {list(), pos_integer()},
            begin
                Split = elibs_lists:split_many(Groups, List),

                % Lengths of sub-lists
                Lengths = lists:usort(lists:map(fun erlang:length/1, Split)),

                length(Split) =:= Groups andalso
                lists:sort(lists:append(Split)) == lists:sort(List) andalso
                length(Lengths) =< 2 andalso
                case Lengths of
                    [Min, Max] -> Max == Min + 1;
                    [_] -> true
                end
            end
           ).

%% @doc If number of groups is divisable by number of elements, ordering must
%% stay the same
split_many_proper2() ->
    ?FORALL({Groups, List},
            ?LET({A, B},
                 {integer(1, 20), integer(1, 10)},
                 {A, vector(A*B, term())}),
            List =:= lists:append(elibs_lists:split_many(Groups, List))
           ).


回答8:

Here is a more general answer that works with any sublist size.

1> lists:foreach(fun(N) -> io:format("~2.10.0B -> ~w~n",[N, test:partition([1,2,3,4,5,6,7,8,9,10],N)]    ) end, [1,2,3,4,5,6,7,8,9,10]).
01 -> [[1],[2],[3],[4],[5],[6],[7],[8],[9],[10]]
02 -> [[1,2],[3,4],[5,6],[7,8],[9,10]]
03 -> [[1,2,3],[4,5,6],[7,8,9],[10]]
04 -> [[1,2,3,4],[5,6,7,8],[10,9]]
05 -> [[1,2,3,4,5],[6,7,8,9,10]]
06 -> [[1,2,3,4,5,6],[10,9,8,7]]
07 -> [[1,2,3,4,5,6,7],[10,9,8]]
08 -> [[1,2,3,4,5,6,7,8],[10,9]]
09 -> [[1,2,3,4,5,6,7,8,9],[10]]
10 -> [[1,2,3,4,5,6,7,8,9,10]]

And the code to achieve this is stored inside a file called test.erl:

-module(test).
-compile(export_all).

partition(List, N) ->
    partition(List, 1, N, []).

partition([], _C, _N, Acc) ->
    lists:reverse(Acc) ;

partition([H|T], 1, N, Acc) ->
    partition(T, 2, N, [[H]|Acc]) ;

partition([H|T], C, N, [HAcc|TAcc]) when C < N ->
    partition(T, C+1, N, [[H|HAcc]|TAcc]) ;

partition([H|T], C, N, [HAcc|TAcc]) when C == N ->
    partition(T, 1, N, [lists:reverse([H|HAcc])|TAcc]) ;

partition(L, C, N, Acc) when C > N ->
    partition(L, 1, N, Acc).

It could probably be more elegant regarding the special case where C > N. Note that C is the size of the current sublist being constructed. At start, it is 1. And then it increments until it reaches the partition size of N.

We could also use a modified version of @chops code to let the last list contains the remaining items even if its size < N :

-module(n_length_chunks_fast).

-export([n_length_chunks_fast/2]).

n_length_chunks_fast(List,Len) ->
    SkipLength = case length(List) rem Len of
        0 -> 0;
        N -> Len - N
    end,
    n_length_chunks_fast(lists:reverse(List),[],SkipLength,Len).

n_length_chunks_fast([],Acc,_Pos,_Max) -> Acc;

n_length_chunks_fast([H|T],Acc,Pos,Max) when Pos==Max ->
    n_length_chunks_fast(T,[[H] | Acc],1,Max);

n_length_chunks_fast([H|T],[HAcc | TAcc],Pos,Max) ->
    n_length_chunks_fast(T,[[H | HAcc] | TAcc],Pos+1,Max);

n_length_chunks_fast([H|T],[],Pos,Max) ->
    n_length_chunks_fast(T,[[H]],Pos+1,Max).


标签: erlang