parallel excution and file writing on python

2020-06-18 09:38发布

问题:

I have a very large datasets distributed in 10 big clusters and the task is to do some computations for each cluster and write (append) the results line by line into 10 files where each file contains the results obtained corresponding to each one of the 10 clusters, each cluster can be computed independently, and I want to parallelize the code into ten CPUs (or threads) such that I can do the computations on all the clusters at once, a simplified pseudo code for my task is the following:

for(c in range (1,10)):  #this is the loop over the clusters
    for(l in "readlines from cluster C")
         # do some computations for line l in cluster c
         # append the results in file named "cluster_c" one file for each cluter c

回答1:

#!/usr/bin/env python
from multiprocessing import Pool

def compute_cluster(c):
    """each cluster can be computed independently"""
    ... # compute a cluster here 

if __name__=="__main__":
   pool = Pool(10) # run 10 task at most in parallel
   pool.map(compute_cluster, range(10))


回答2:

You can use joblib to parallelizate the analysis. If you have a function process_line:

from joblib import Parallel, delayed
data = Parallel(n_jobs=-1)(delayed(process_line)(line)
                           for line in open('bigfile'))

You want to save the information serially. Depending on the ratio of computation time/size of data to be saved you can use different approaches:

Lots of computing time to get a few numbers

The overhead of communicating between threads is very small. The simplest option then is for each process to write on an independent file, and just cat together the results at the end. You can make sure you are not overwritting by passing an index and using it to create the file.

A more advanced solution is to pass the file handler as an argument and write to the file only after acquiring a multiprocessing.Lock. The only problem would be if many processes try to acquire the lock at the same time, they will be taking up CPU resources but not computing.

def process_line(line, outfile, lock)
   data = line[0]
   lock.aquire()
   print >> outfile, data
   lock.release()

Shorter computing time

If you have more data, writing to a file could induce some overhead, specially if you are going to reload it on memory afterwards. Here you have two options:

  • All the data fits in memory: you are lucky. With joblib, just make it the return of the function. At the end you have a list with all your results in order.
  • The data does not fit in memory, you have to consume it on the fly. You need a consumer-producer pattern. Something like:

    from multiprocessing import Process, JoinableQueue
    from joblib import Parallel, delayed
    
    def saver(q):
        with open('out.txt', 'w') as out:
            while True:
                val = q.get()
                if val is None: break
                print >> out, val
                q.task_done()
            # Finish up
            q.task_done()
    
    def foo(x):
        q.put(x**3+2)
    
    q = JoinableQueue()
    p = Process(target=saver, args=(q,))
    p.start()
    Parallel(n_jobs=2, verbose=0)(delayed(foo)(i) for i in xrange(1000))
    q.put(None) # Poison pill
    q.join()
    p.join()
    

If the amount of data is very big compared to the computing time, you will find a a lot of overhead just transferring the data from one process to the others. If that is your limit, then you should use more advanced technology, like OpenMP, and perhaps Cython to get rid of the GIL, and use threads instead of processes.

Note that I have not specified how small is "small"; as this depends very much on the configuration of your cluster. How fast is the communication, the underlying file system, etc; but nothing you can't experiment with fairly easily, for example, timing the time it takes for a dummy program to send a line to another process.



回答3:

Just like the answer form @Davidmh, but working in python3:

from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed

def saver(q):
    with open('out.txt', 'w') as out:
        while True:
            val = q.get()
            if val is None: break
            out.write(val + '\n')
        q.task_done()
        # Finish up
        q.task_done()

def foo(x):
    import os
    q.put(str(os.getpid()) + '-' + str(x**3+2))

q = JoinableQueue()
p = Process(target=saver, args=(q,))
p.start()
Parallel(n_jobs=-1, verbose=0)(delayed(foo)(i) for i in range(1000))
q.put(None) # Poison pill
p.join()

PS: I've also added the PID to each output line in order to check that everything is working as expected ;-)