how to multiprocess large text files in python?

2019-08-04 18:02发布

问题:

I tried to digest lines of a DictReader object after I read in a 60 MB csv file. I asked the question here: how to chunk a csv (dict)reader object in python 3.2?. (Code repated below.)

However, now I realize that chunking up the original text file might as well do the trick (and do the DictRead and the line-by-line digest later on). However, I found no io tool that multiprocessing.Pool could use.

Thanks for any thoughts!

source = open('/scratch/data.txt','r')
def csv2nodes(r):
    strptime = time.strptime
    mktime = time.mktime
    l = []
    ppl = set()
    for row in r:
        cell = int(row['cell'])
        id = int(row['seq_ei'])
        st = mktime(strptime(row['dat_deb_occupation'],'%d/%m/%Y'))
        ed = mktime(strptime(row['dat_fin_occupation'],'%d/%m/%Y'))
        # collect list
        l.append([(id,cell,{1:st,2: ed})])
        # collect separate sets
        ppl.add(id)
    return (l,ppl)


def csv2graph(source):
    r = csv.DictReader(source,delimiter=',')
    MG=nx.MultiGraph()
    l = []
    ppl = set()
    # Remember that I use integers for edge attributes, to save space! Dic above.
    # start: 1
    # end: 2
    p = Pool(processes=4)
    node_divisor = len(p._pool)*4
    node_chunks = list(chunks(r,int(len(r)/int(node_divisor))))
    num_chunks = len(node_chunks)
    pedgelists = p.map(csv2nodes,
                       zip(node_chunks))
    ll = []
    for l in pedgelists:
        ll.append(l[0])
        ppl.update(l[1])
    MG.add_edges_from(ll)
    return (MG,ppl)