Python Multiprocessing using Queue to write to sam

2020-05-24 05:22发布

问题:

I know there are many post on Stack Exchange related to writing results from multiprocessing to single file and I have developed my code after reading only those posts. What I am trying to achieve is that run 'RevMapCoord' function in parallel and write its result in one single file using multiprocess.queue. But I am having problem while queuing my job. My Code:

def RevMapCoord(list):
    "Read a file, Find String and Do something"

def feed(queue, parlist):
    for par in parlist:
        print ('Echo from Feeder: %s' % (par))
        queue.put(par)
    print ('**Feeder finished queing**')

def calc(queueIn, queueOut):
     print ('Worker function started')
     while True:
         try:
             par = queueIn.get(block = False)
             res = RevMapCoord(final_res)
             queueOut.put((par,res))
         except:
             break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
         try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
         except:
            break
    fhandle.close()


feedProc = Process(target = feed , args = (workerQueue, final_res))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)]
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno))

feedProc.start()
print ('Feeder is joining')
feedProc.join ()
for p in calcProc:
    p.start()
for p in calcProc:
    p.join()
writProc.start()
writProc.join ()

When I run this code script stucks at "feedProc.start()" step. The last few output lines from screen shows print statement from the end of "feedProc.start()":

Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine
**Feeder finished queing**

But hangs before executing next line "feedProc.join ()". Code gives no error and keep on running but doing nothing(hangs). Please tell me what mistake I am making.

回答1:

I think you should slim your example to the basics. For example:

from multiprocessing import Process, Queue

def f(q):
    q.put('Hello')
    q.put('Bye')
    q.put(None)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    with open('file.txt', 'w') as fp:
        while True:
            item = q.get()
            print(item)
            if item is None:
                break
            fp.write(item)
    p.join()

Here I have two process (the main process, a p). p puts strings in a queue which are retrieved by the main process. When the main process finds None (a sentinel that I am using to indicate: "I am done" it breaks the loop.

Extending this to many process (or threads) is trivial.



回答2:

I achieved writing results from multiprocessing to a single file by uing 'map_async' function in Python3. Here is the function I wrote:

def PPResults(module,alist):##Parallel processing
    npool = Pool(int(nproc))    
    res = npool.map_async(module, alist)
    results = (res.get())###results returned in form of a list 
    return results

So, I provide this function with a list of parameters in 'a_list' and 'module' is a function that does the processing and returns result. The above function keeps on collecting the results in form of list and returns back when all the parameters from 'a_list' have been processed. The results might not be correct order but as order was not important for me this worked well. The 'result' list can be iterated and individual results written in file like:

fh_out = open('./TestResults', 'w')
for i in results:##Write Results from list to file
    fh_out.write(i)

To keep the order of the results we might need to use 'queues' similar to I mentioned in my question (above). Though I am being able to fix the code but I believe it is not required to be mentioned here.

Thanks

AK