Best way to perform multiprocessing on a large fil

2019-07-30 02:05发布

问题:

I have a python script that would traverse a list(>1000 elements), find the variable in a large file and then output the result. I am reading the entire file >1000 times. I tried using multiprocessing, but not of much help. Here's what I am trying to do:

import gzip
from multiprocessing.pool import ThreadPool as Pool

def getForwardIP(clientIP, requestID):
   with gzip.open("xyz.log") as infile:
     for lines in infile:
        line= lines.split(" ")
        myRequestID= line[0]
        forwardIP= line[1]
        if myRequestID==requestID:
           print forwardIP
if __name__== "__main__":
    pool_size=8
    pool= Pool(pool_size)
    request_id_list= list()
    #request_id_list contains >1000 elements
    for id in request_id_list:
      pool.apply_async(getForwardIP, ("1.2.3.4.", id, ))
   pool.close()
   pool.join()

Is there a faster way? Any help will be appreciated. Thanks!

EDIT

(I AM WRITING MY ENTIRE CODE HERE) Thanks everyone for the suggestions. Now I am writing the file into a list rather than reading 1000 times. I tried to multi-process the for loop, but it didn't work. Below is the code:

import gzip
import datetime
from multiprocessing.pool import ThreadPool as Pool

def getRequestID(r_line_filename):
  requestIDList= list()
  with gzip.open(r_line_filename) as infile:
  #r_line_filename is a file with request_id and client_ip
    for lines in infile:
        line= lines.split(" ")
        requestID= line[1].strip("\n")
        myclientIP= line[0]
        if myclientIP==clientIP:
            requestIDList.append(requestID)  
   print "R line List Ready!"
   return(requestIDList)  



def getFLineList(fFilename):
   fLineList= list()
   with gzip.open(fFilename) as infile:
   #fFilename is a file with format request_id, forward_ip, epoch time
     for lines in infile:
        fLineList.append(lines.split())
  print "F line list ready!"
  return(fLineList)

def forwardIP(lines, requestID):
 myrequestID= lines[0]
 forwardIP= lines[1]
 epoch= int(lines[2].split(".")[0])
 timex= datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d %H:%M:%S')
 if myrequestID==requestID:
    print "%s %s %s"%(clientIP, timex, forwardIP)

if __name__== "__main__":
 pool= Pool()
 clientIP= "x.y.z.a"
 rLineList= getRequestID("rLine_subset.log.gz")
 fLineList= getFLineList("fLine_subset.log.gz")
 for RID in rLineList:
    for lines in fLineList:
        pool.apply_async(forwardIP, (lines, RID,))
    pool.close()
    pool.join()

The multi-processing part is not working. Actually, this one is much slower. If I don't do multi-processing and simply traverse the list, it is faster. Thanks for your help in advance!

回答1:

There is indeed a faster way. Don't read and parse the file in 1000 times. Instead, read it in once, parse it once, and store it. File I/O is one of the slowest things you can do (in any language). In memory processing is much faster!

Something like this (obviously untested since I don't have "xyz.log" accessible to me. And for the hawks: obviously I didn't profile it either, but I have a sneaky suspicion reading a file once is faster than reading it 1000 times):

import gzip

def readFile():
  my_lines = []
  with gzip.open("xyz.log") as infile:
     for lines in infile:
        line = lines.split(" ")
        my_lines.append(line)

  return my_lines

def getForwardIp(lines, requestID): #Doesn't look like you need client IP (yet), so I nuked it
  myRequestID= line[0]
  forwardIP= line[1]
  if myRequestID==requestID:
     print forwardIP

if __name__ == "__main__":
  parsed_lines = readFile()
  request_id_list= list()
  #request_id_list contains >1000 elements
  for id in request_id_list:
    getForwardIp(parsed_lines, requestID)


回答2:

I agree with mwm314 that you shouldn't be reading the file 1000 times.

I'm assuming you haven't given us the complete code because the client_ip parameter seems to be unused, but here I have rewritten it to only open the file once and to only iterate through each line in the file once. I've also modified getForwardIP to take a list of request ids and immediately turn it into a set for optimal lookup performance.

import gzip


def getForwardIP(client_ip, request_ids):
    request_ids = set(request_ids)  # to get O(1) lookup
    with gzip.open("xyz.log") as infile:
        for lines in infile:
            line = lines.split(" ")
            found_request_id = line[0]
            found_forward_ip = line[1]
            if found_request_id in request_ids:
                print found_forward_ip


if __name__ == "__main__":
    request_id_list = list()
    # request_id_list contains >1000 elements
    getForwardIP("1.2.3.4.", request_id_list)


回答3:

I would probably scan through the single large file for all of the requested IDs, then fully utilize the ThreadPool for invoking getForwardIP().

You could partition the single large file into multiple regions and have multiple workers process different partitions of the file, but this approach has some challenges and might not work on all file systems.