How to optimize the receive loop for thousands of

2019-05-02 19:52发布

In the chapter "Programming Multicore CPUs" of the Programming Erlang book, Joe Armstrong gives a nice example of parallelization of a map function:

pmap(F, L) ->
    S = self(),
    %% make_ref() returns a unique reference
    %% we'll match on this later
    Ref = erlang:make_ref(),
    Pids = map(fun(I) ->
        spawn(fun() -> do_f(S, Ref, F, I) end)
    end, L),
    %% gather the results
    gather(Pids, Ref).

do_f(Parent, Ref, F, I) ->
    Parent ! {self(), Ref, (catch F(I))}.

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
    end;

gather([], _) ->
    [].

It works nicely, but I believe there is a bottleneck in it causing it to work really slow on lists with 100,000+ elements.

When the gather() function is executed, it starts to match a first Pid from a Pids list with a message in the main process mailbox. But what if the oldest message in the mailbox is not from this very Pid? Then it tries all other messages until it finds a match. That being said, there is a certain probability, that while executing the gather() function we would have to loop through all mailbox messages to find a match with a Pid that we have taken from the Pids list. That is N * N worst case scenario for a list of size N.

I have even managed to prove the existence of this bottleneck:

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
        %% Here it is:
        Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
    end;

How can I avoid this bottleneck?

3条回答
霸刀☆藐视天下
2楼-- · 2019-05-02 20:23

In this case you can use dict (from pid of spawned process to index in original list) as Pids instead.

查看更多
虎瘦雄心在
3楼-- · 2019-05-02 20:43

Joe's example is neat, but in practice you want a more heavyweight solution to your problem. Take a look at http://code.google.com/p/plists/source/browse/trunk/src/plists.erl for instance.

In general, there are three things you want to do:

  1. Pick a work unit which is "big enough". If the work unit is too small, you die by processing overhead. If it is too big, you die by workers being idle, especially if your work is not evenly divided by element count in the list.

  2. Upper bound the number of simultaneous workers. Psyeugenic proposes splitting it by schedulers, I propose splitting it by a job count limit, 100 jobs say. That is, you want to start off 100 jobs and then wait till some of those completes before you start more jobs.

  3. Consider screwing the order of elements if possible. It is much faster if you don't need to take the order into account. For many problems this is possible. If the order does matter, then use a dict to store the stuff in as proposed. It is faster for large-element lists.

The basic rule is that as soon as you want parallel, you rarely want a list-based representation of your data. The list has an inherent linearity to it, which you don't want. There is a talk by Guy Steele on the very subject: http://vimeo.com/6624203

查看更多
Evening l夕情丶
4楼-- · 2019-05-02 20:45

The problem is that if you want to have a correct solution you still have to:

  • check if a given reply comes from one of the processes you have spawned
  • ensure proper result order

Here's a solution which makes use of counters instead of lists - this eliminates necessity to traverse inbox multiple times. Matching of Ref ensures that messages we are receiving are from our children. Proper order is ensured by sorting the result with lists:keysort/2 at the very end of the pmap, which adds some overhead, but it's likely to be less than O(n^2).

-module(test).

-compile(export_all).

pmap(F, L) ->
    S = self(),
    % make_ref() returns a unique reference
    % we'll match on this later
    Ref = erlang:make_ref(),
    Count = lists:foldl(fun(I, C) ->
                                spawn(fun() ->
                                              do_f(C, S, Ref, F, I)
                                      end),
                                C+1
                        end, 0, L),
    % gather the results
    Res = gather(0, Count, Ref),
    % reorder the results
    element(2, lists:unzip(lists:keysort(1, Res))).


do_f(C, Parent, Ref, F, I) ->
    Parent ! {C, Ref, (catch F(I))}.


gather(C, C, _) ->
    [];
gather(C, Count, Ref) ->
    receive
        {C, Ref, Ret} -> [{C, Ret}|gather(C+1, Count, Ref)]
    end.
查看更多
登录 后发表回答