I have the following test module (MyMod.jl) to store some functions in Julia. Some of the core functions are written in serial. Other functions call the core functions in parallel.
module MyMod
export Dummy,distribute_data,recombine_data,regular_test,parallel_test
function Dummy(icol,model,data,A,B)
nz,nx,nh = size(model) # = size(A) = size(B)
for ih = 1:nh
for ix = 1:nx
for iz = 1:nz
data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
end
end
end
end
function distribute_data(X, obj_name_on_worker::Symbol;mod=Main)
dim = length(size(X))
size_per_worker = floor(Int,size(X,1) / nworkers())
StartIdx = 1
EndIdx = size_per_worker
for (idx, pid) in enumerate(workers())
if idx == nworkers()
EndIdx = size(X,1)
end
println(StartIdx:EndIdx)
if dim == 3
@spawnat(pid, eval(mod, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:,:])))
elseif dim == 2
@spawnat(pid, eval(mod, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:])))
end
StartIdx = EndIdx + 1
EndIdx = EndIdx + size_per_worker - 1
end
end
function recombine_data(Data::Symbol;mod=Main)
Results = cell(nworkers())
for (idx, pid) in enumerate(workers())
Results[idx] = fetch(@spawnat(pid, getfield(mod, Data)))
end
return vcat(Results...)
end
function regular_test(model,data,A,B)
ncol=size(data,2)
map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
end
function parallel_test(model,data,A,B)
distribute_data(model, :model)
distribute_data(A, :A)
distribute_data(B, :B)
distribute_data(data, :data)
@everywhere ncol=size(data,2)
@everywhere begin
if myid() != 1
map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
end
end
P_Data = recombine_data(:data)
return P_Data
end
end
This module works as expected. When I open a Julia session and run the following commands I find that the regular_test
gives the same result as the parallel_test
without any errors.
addprocs(3)
@everywhere using MyMod
nx = 250;
nz = 350;
nh = 150;
ncol = 125;
model = rand(nz,nx,nh);
data = SharedArray(Float64,nz,ncol);
A = rand(nz,nx,nh);
B = rand(nz,nx,nh);
@time P_Data = parallel_test(model,data,A,B);
@time regular_test(model,data,A,B);
P_Data == data
For larger/complicated functions it becomes quite messy to store modules in this way. Previously I have cleaned things up by storing each function as a separate file and then using include(..)
to bring them into the module. The following module is my attempt to do just that:
module MyMod_2
export Dummy,distribute_data,recombine_data,regular_test,parallel_test
@everywhere include("Dummy.jl")
@everywhere include("distribute_data.jl")
@everywhere include("recombine_data.jl")
@everywhere include("regular_test.jl")
@everywhere include("parallel_test.jl")
end
where each of the functions are stored as a separate file in the same directory as MyMod2
. However when I open a Julia session and try to run the same set of commands as before I get the following lengthy error when executing @everywhere using MyMod_2
WARNING: Module MyMod_2 not defined on process 4
fatal error on WARNING: Module MyMod_2 not defined on process 3
4: fatal error on WARNING: Module MyMod_2 not defined on process 2
3: fatal error on 2: ERROR: UndefVarError: MyMod_2 not defined
in deserialize at serialize.jl:504
in handle_deserialize at serialize.jl:465
in deserialize at serialize.jl:560
in handle_deserialize at serialize.jl:465
in deserialize at serialize.jl:538
in handle_deserialize at serialize.jl:465
in deserialize at serialize.jl:696
in deserialize_datatype at serialize.jl:651
in handle_deserialize at serialize.jl:465
in message_handler_loop at multi.jl:862
in process_tcp_streams at multi.jl:851
in anonymous at task.jl:63
ERROR: UndefVarError: MyMod_2 not defined
in deserialize at serialize.jl:504
in handle_deserialize at serialize.jl:465
in deserialize at serialize.jl:560
in handle_deserialize at serialize.jl:465
in deserialize at serialize.jl:538
in handle_deserialize at serialize.jl:465
in deserialize at serialize.jl:696
in deserialize_datatype at serialize.jl:651
in handle_deserialize at serialize.jl:465
in message_handler_loop at multi.jl:862
in process_tcp_streams at multi.jl:851
in anonymous at task.jl:63
ERROR: UndefVarError: MyMod_2 not defined
in deserialize at serialize.jl:504
in handle_deserialize at serialize.jl:465
in deserialize at serialize.jl:560
in handle_deserialize at serialize.jl:465
in deserialize at serialize.jl:538
in handle_deserialize at serialize.jl:465
in deserialize at serialize.jl:696
in deserialize_datatype at serialize.jl:651
in handle_deserialize at serialize.jl:465
in message_handler_loop at multi.jl:862
in process_tcp_streams at multi.jl:851
in anonymous at task.jl:63
Worker 2 terminated.
ERROR: ProcessExitedException()
in yieldto at ./task.jl:71
in wait at ./task.jl:371
in wait at ./task.jl:286
in wait at ./channels.jl:63
in take! at ./channels.jl:53
in take! at ./multi.jl:803
in remotecall_fetch at multi.jl:729
in remotecall_fetch at multi.jl:734
in anonymous at multi.jl:1380
...and 3 other exceptions.
in sync_end at ./task.jl:413
in anonymous at multi.jl:1389
Worker 3 terminated.ERROR (unhandled task failure): EOFError: read end of file
julia> Worker 4 terminated.ERROR (unhandled task failure): EOFError: read end of file
julia>
fatal error on ERROR (unhandled task failure): EOFError: read end of file
1: ERROR: attempt to send to unknown socket
fatal error on 1: ERROR: attempt to send to unknown socket
Is there something I can change to in MyMod_2
to fix this error and get it to load into the Julia session correctly?
Small tweak - in your module, omit the
@everywhere
macro. Just using the single@everywhere
when loading your module should do the trick. Otherwise, you get into a strange recursive type situation where the workers are all executing the code in your module (from the@everywhere using MyMod_2
, and then each of those workers is also calling@everywhere include()
)