Python multiprocess/multithreading to speed up fil

2019-02-15 18:26发布

问题:

I have a program which copies large numbers of files from one location to another - I'm talking 100,000+ files (I'm copying 314g in image sequences at this moment). They're both on huge, VERY fast network storage RAID'd in the extreme. I'm using shutil to copy the files over sequentially and it is taking some time, so I'm trying to find the best way to opimize this. I've noticed some software I use effectively multi-threads reading files off of the network with huge gains in load times so I'd like to try doing this in python.

I have no experience with programming multithreading/multiprocessesing - does this seem like the right area to proceed? If so what's the best way to do this? I've looked around a few other SO posts regarding threading file copying in python and they all seemed to say that you get no speed gain, but I do not think this will be the case considering my hardware. I'm nowhere near my IO cap at the moment and resources are sitting around 1% (I have 40 cores and 64g of RAM locally).

  • Spencer

回答1:

UPDATE:

I never did get Gevent working (first answer) because I couldn't install the module without an internet connection, which I don't have on my workstation. However I was able to decrease file copy times by 8 just using the built in threading with python (which I have since learned how to use) and I wanted to post it up as an additional answer for anyone interested! Here's my code below, and it is probably important to note that my 8x copy time will most likely differ from environment to environment due to your hardware/network set-up.

import Queue, threading, os, time
import shutil

fileQueue = Queue.Queue()
destPath = 'path/to/cop'

class ThreadedCopy:
    totalFiles = 0
    copyCount = 0
    lock = threading.Lock()

    def __init__(self):
        with open("filelist.txt", "r") as txt: #txt with a file per line
            fileList = txt.read().splitlines()

        if not os.path.exists(destPath):
            os.mkdir(destPath)

        self.totalFiles = len(fileList)

        print str(self.totalFiles) + " files to copy."
        self.threadWorkerCopy(fileList)


    def CopyWorker(self):
        while True:
            fileName = fileQueue.get()
            shutil.copy(fileName, destPath)
            fileQueue.task_done()
            with self.lock:
                self.copyCount += 1
                percent = (self.copyCount * 100) / self.totalFiles
                print str(percent) + " percent copied."

    def threadWorkerCopy(self, fileNameList):
        for i in range(16):
            t = threading.Thread(target=self.CopyWorker)
            t.daemon = True
            t.start()
        for fileName in fileNameList:
            fileQueue.put(fileName)
        fileQueue.join()

ThreadedCopy()


回答2:

This can be parallelized by using gevent in Python.

I would recommend the following logic to achieve speeding up 100k+ file copying:

  1. Put names of all the 100K+ files, which need to be copied in a csv file, for eg: 'input.csv'.

  2. Then create chunks from that csv file. The number of chunks should be decided based on no.of processors/cores in your machine.

  3. Pass each of those chunks to separate threads.

  4. Each thread sequentially reads filename in that chunk and copies it from one location to another.

Here goes the python code snippet:

import sys
import os
import multiprocessing

from gevent import monkey
monkey.patch_all()

from gevent.pool import Pool

def _copyFile(file):
    # over here, you can put your own logic of copying a file from source to destination

def _worker(csv_file, chunk):
    f = open(csv_file)
    f.seek(chunk[0])
    for file in f.read(chunk[1]).splitlines():
        _copyFile(file)


def _getChunks(file, size):
    f = open(file)
    while 1:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline()
        yield start, f.tell() - start
        if not s:
            f.close()
            break

if __name__ == "__main__":
    if(len(sys.argv) > 1):
        csv_file_name = sys.argv[1]
    else:
        print "Please provide a csv file as an argument."
        sys.exit()

    no_of_procs = multiprocessing.cpu_count() * 4

    file_size = os.stat(csv_file_name).st_size

    file_size_per_chunk = file_size/no_of_procs

    pool = Pool(no_of_procs)

    for chunk in _getChunks(csv_file_name, file_size_per_chunk):
        pool.apply_async(_worker, (csv_file_name, chunk))

    pool.join()

Save the file as file_copier.py. Open terminal and run:

$ ./file_copier.py input.csv