I have a large vector of vectors of strings: There are around 50,000 vectors of strings, each of which contains 2-15 strings of length 1-20 characters.
MyScoringOperation
is a function which operates on a vector of strings (the datum) and returns an array of 10100 scores (as Float64s). It takes about 0.01 seconds to run MyScoringOperation
(depending on the length of the datum)
function MyScoringOperation(state:State, datum::Vector{String})
...
score::Vector{Float64} #Size of score = 10000
I have what amounts to a nested loop. The outer loop typically would runs for 500 iterations
data::Vector{Vector{String}} = loaddata()
for ii in 1:500
score_total = zeros(10100)
for datum in data
score_total+=MyScoringOperation(datum)
end
end
On one computer, on a small test case of 3000 (rather than 50,000) this takes 100-300 seconds per outer loop.
I have 3 powerful servers with Julia 3.9 installed (and can get 3 more easily, and then can get hundreds more at the next scale).
I have basic experience with @parallel, however it seems like it is spending a lot of time copying the constant (It more or less hang on the smaller testing case)
That looks like:
data::Vector{Vector{String}} = loaddata()
state = init_state()
for ii in 1:500
score_total = @parallel(+) for datum in data
MyScoringOperation(state, datum)
end
state = update(state, score_total)
end
My understanding of the way this implementation works with @parallel is that it:
For Each ii
:
- partitions
data
into a chuck for each worker - sends that chuck to each worker
- works all process there chunks
- main procedure sums the results as they arrive.
I would like to remove step 2,
so that instead of sending a chunk of data to each worker,
I just send a range of indexes to each worker, and they look it up from their own copy of data
. or even better, only giving each only their own chunk, and having them reuse it each time (saving on a lot of RAM).
Profiling backs up my belief about the functioning of @parellel. For a similarly scoped problem (with even smaller data), the non-parallel version runs in 0.09seconds, and the parallel runs in And the profiler shows almost all the time is spent 185 seconds. Profiler shows almost 100% of this is spend interacting with network IO.
This should get you started:
rchunk_data
breaks the data into chunks, (defined byget_chunks
method) and sends those chunks each to a different worker, where they are stored in RemoteRefs. The RemoteRefs are references to memory on your other proccesses(and potentially computers), thatprechunked_map_reduce
does a variation on a kind of map reduce to have each worker first runmap_fun
on each of it's chucks elements, then reduce over all the elements in its chuck usingred_acc
(a reduction accumulator function). Finally each worker returns there result which is then combined by reducing them all together usingred_acc
this time using thefetch_reduce
so that we can add the first ones completed first.fetch_reduce
is a nonblocking fetch and reduce operation. I believe it has no raceconditions, though this maybe because of a implementation detail in@async
and@sync
. When julia 0.4 comes out, it is easy enough to put a lock in to make it obviously have no race conditions.This code isn't really battle hardened. I don;t believe the You also might want to look at making the chuck size tunable, so that you can seen more data to faster workers (if some have better network or faster cpus)
You need to reexpress your code as a map-reduce problem, which doesn't look too hard.
Testing that with:
Took ~0.03 seconds, when distributed across 8 workers (none of them on the same machine as the launcher)
vs running just locally:
took ~0.06 seconds.