Julia: How to give multiple workers access to func

2019-07-17 06:21发布

I have the following test module (MyMod.jl) to store some functions in Julia. Some of the core functions are written in serial. Other functions call the core functions in parallel.

module MyMod

export Dummy,distribute_data,recombine_data,regular_test,parallel_test

function Dummy(icol,model,data,A,B)
    nz,nx,nh = size(model)  # = size(A) = size(B)
    for ih = 1:nh
        for ix = 1:nx
            for iz = 1:nz
                data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
            end  
        end
    end
end

function distribute_data(X, obj_name_on_worker::Symbol;mod=Main)
    dim = length(size(X))
    size_per_worker = floor(Int,size(X,1) / nworkers())
    StartIdx = 1
    EndIdx = size_per_worker
    for (idx, pid) in enumerate(workers())
        if idx == nworkers()
            EndIdx = size(X,1)
        end
        println(StartIdx:EndIdx)
        if dim == 3
            @spawnat(pid, eval(mod, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:,:])))
        elseif dim == 2
            @spawnat(pid, eval(mod, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:])))
        end
        StartIdx = EndIdx + 1
        EndIdx = EndIdx + size_per_worker - 1
    end
end

function recombine_data(Data::Symbol;mod=Main)
    Results = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        Results[idx] = fetch(@spawnat(pid, getfield(mod, Data)))
    end
    return vcat(Results...)
end

function regular_test(model,data,A,B)
    ncol=size(data,2)
    map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
end

function parallel_test(model,data,A,B)
    distribute_data(model, :model)
    distribute_data(A, :A)
    distribute_data(B, :B)
    distribute_data(data, :data)
    @everywhere ncol=size(data,2)
    @everywhere begin
        if myid() != 1
            map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
        end
    end
    P_Data = recombine_data(:data)
    return P_Data
end

end

This module works as expected. When I open a Julia session and run the following commands I find that the regular_test gives the same result as the parallel_test without any errors.

addprocs(3)

@everywhere using MyMod

nx = 250;
nz = 350;
nh = 150;
ncol = 125;

model = rand(nz,nx,nh);
data = SharedArray(Float64,nz,ncol);
A = rand(nz,nx,nh);
B = rand(nz,nx,nh);

@time P_Data = parallel_test(model,data,A,B);
@time regular_test(model,data,A,B); 

P_Data == data 

For larger/complicated functions it becomes quite messy to store modules in this way. Previously I have cleaned things up by storing each function as a separate file and then using include(..) to bring them into the module. The following module is my attempt to do just that:

module MyMod_2

export Dummy,distribute_data,recombine_data,regular_test,parallel_test

@everywhere include("Dummy.jl")
@everywhere include("distribute_data.jl")
@everywhere include("recombine_data.jl")
@everywhere include("regular_test.jl")
@everywhere include("parallel_test.jl")

end

where each of the functions are stored as a separate file in the same directory as MyMod2. However when I open a Julia session and try to run the same set of commands as before I get the following lengthy error when executing @everywhere using MyMod_2

WARNING: Module MyMod_2 not defined on process 4
fatal error on WARNING: Module MyMod_2 not defined on process 3
4: fatal error on WARNING: Module MyMod_2 not defined on process 2
3: fatal error on 2: ERROR: UndefVarError: MyMod_2 not defined
 in deserialize at serialize.jl:504
 in handle_deserialize at serialize.jl:465
 in deserialize at serialize.jl:560
 in handle_deserialize at serialize.jl:465
 in deserialize at serialize.jl:538
 in handle_deserialize at serialize.jl:465
 in deserialize at serialize.jl:696
 in deserialize_datatype at serialize.jl:651
 in handle_deserialize at serialize.jl:465
 in message_handler_loop at multi.jl:862
 in process_tcp_streams at multi.jl:851
 in anonymous at task.jl:63
ERROR: UndefVarError: MyMod_2 not defined
 in deserialize at serialize.jl:504
 in handle_deserialize at serialize.jl:465
 in deserialize at serialize.jl:560
 in handle_deserialize at serialize.jl:465
 in deserialize at serialize.jl:538
 in handle_deserialize at serialize.jl:465
 in deserialize at serialize.jl:696
 in deserialize_datatype at serialize.jl:651
 in handle_deserialize at serialize.jl:465
 in message_handler_loop at multi.jl:862
 in process_tcp_streams at multi.jl:851
 in anonymous at task.jl:63
ERROR: UndefVarError: MyMod_2 not defined
 in deserialize at serialize.jl:504
 in handle_deserialize at serialize.jl:465
 in deserialize at serialize.jl:560
 in handle_deserialize at serialize.jl:465
 in deserialize at serialize.jl:538
 in handle_deserialize at serialize.jl:465
 in deserialize at serialize.jl:696
 in deserialize_datatype at serialize.jl:651
 in handle_deserialize at serialize.jl:465
 in message_handler_loop at multi.jl:862
 in process_tcp_streams at multi.jl:851
 in anonymous at task.jl:63
Worker 2 terminated.
ERROR: ProcessExitedException()
 in yieldto at ./task.jl:71
 in wait at ./task.jl:371
 in wait at ./task.jl:286
 in wait at ./channels.jl:63
 in take! at ./channels.jl:53
 in take! at ./multi.jl:803
 in remotecall_fetch at multi.jl:729
 in remotecall_fetch at multi.jl:734
 in anonymous at multi.jl:1380

...and 3 other exceptions.

 in sync_end at ./task.jl:413
 in anonymous at multi.jl:1389
Worker 3 terminated.ERROR (unhandled task failure): EOFError: read end of file


julia> Worker 4 terminated.ERROR (unhandled task failure): EOFError: read end of file
julia> 
fatal error on ERROR (unhandled task failure): EOFError: read end of file
1: ERROR: attempt to send to unknown socket
fatal error on 1: ERROR: attempt to send to unknown socket

Is there something I can change to in MyMod_2 to fix this error and get it to load into the Julia session correctly?

1条回答
迷人小祖宗
2楼-- · 2019-07-17 06:55

Small tweak - in your module, omit the @everywhere macro. Just using the single @everywhere when loading your module should do the trick. Otherwise, you get into a strange recursive type situation where the workers are all executing the code in your module (from the @everywhere using MyMod_2, and then each of those workers is also calling @everywhere include())

查看更多
登录 后发表回答