In the chapter "Programming Multicore CPUs" of the Programming Erlang book, Joe Armstrong gives a nice example of parallelization of a map function:
pmap(F, L) ->
S = self(),
%% make_ref() returns a unique reference
%% we'll match on this later
Ref = erlang:make_ref(),
Pids = map(fun(I) ->
spawn(fun() -> do_f(S, Ref, F, I) end)
end, L),
%% gather the results
gather(Pids, Ref).
do_f(Parent, Ref, F, I) ->
Parent ! {self(), Ref, (catch F(I))}.
gather([Pid|T], Ref) ->
receive
{Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
end;
gather([], _) ->
[].
It works nicely, but I believe there is a bottleneck in it causing it to work really slow on lists with 100,000+ elements.
When the gather()
function is executed, it starts to match a first Pid
from a Pids
list with a message in the main process mailbox. But what if the oldest message in the mailbox is not from this very Pid
? Then it tries all other messages until it finds a match. That being said, there is a certain probability, that while executing the gather()
function we would have to loop through all mailbox messages to find a match with a Pid
that we have taken from the Pids
list. That is N * N worst case scenario for a list of size N.
I have even managed to prove the existence of this bottleneck:
gather([Pid|T], Ref) ->
receive
{Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
%% Here it is:
Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
end;
How can I avoid this bottleneck?
The problem is that if you want to have a correct solution you still have to:
- check if a given reply comes from one of the processes you have
spawned
- ensure proper result order
Here's a solution which makes use of counters instead of lists - this eliminates necessity to traverse inbox multiple times. Matching of Ref
ensures that messages we are receiving are from our children. Proper order is ensured by sorting the result with lists:keysort/2
at the very end of the pmap
, which adds some overhead, but it's likely to be less than O(n^2)
.
-module(test).
-compile(export_all).
pmap(F, L) ->
S = self(),
% make_ref() returns a unique reference
% we'll match on this later
Ref = erlang:make_ref(),
Count = lists:foldl(fun(I, C) ->
spawn(fun() ->
do_f(C, S, Ref, F, I)
end),
C+1
end, 0, L),
% gather the results
Res = gather(0, Count, Ref),
% reorder the results
element(2, lists:unzip(lists:keysort(1, Res))).
do_f(C, Parent, Ref, F, I) ->
Parent ! {C, Ref, (catch F(I))}.
gather(C, C, _) ->
[];
gather(C, Count, Ref) ->
receive
{C, Ref, Ret} -> [{C, Ret}|gather(C+1, Count, Ref)]
end.
Joe's example is neat, but in practice you want a more heavyweight solution to your problem. Take a look at http://code.google.com/p/plists/source/browse/trunk/src/plists.erl for instance.
In general, there are three things you want to do:
Pick a work unit which is "big enough". If the work unit is too small, you die by processing overhead. If it is too big, you die by workers being idle, especially if your work is not evenly divided by element count in the list.
Upper bound the number of simultaneous workers. Psyeugenic proposes splitting it by schedulers, I propose splitting it by a job count limit, 100 jobs say. That is, you want to start off 100 jobs and then wait till some of those completes before you start more jobs.
Consider screwing the order of elements if possible. It is much faster if you don't need to take the order into account. For many problems this is possible. If the order does matter, then use a dict
to store the stuff in as proposed. It is faster for large-element lists.
The basic rule is that as soon as you want parallel, you rarely want a list-based representation of your data. The list has an inherent linearity to it, which you don't want. There is a talk by Guy Steele on the very subject: http://vimeo.com/6624203
In this case you can use dict
(from pid of spawned process to index in original list) as Pids
instead.