Python - Using threads or a queue to iterate over

2019-05-10 06:48发布

问题:

I'm fairly new to python and am making a script that allows one to bring point cloud data from other programs into Autodesk Maya. I have my script functioning fine but what i'm trying to do is make it faster. I have a for loop that iterates through a list of numbered files. I.e. datafile001.txt, datafile002.txt and so on. Is what i'm wondering is if there is a way to have it to do more then one at a time, possibly using threads or a queue? Below I have the code i have been working on:

     def threadedFuntion(args):
         if len(sourceFiles) > 3:
             for count, item in enumerate(sourceFiles):
                     t1=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber1], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType))
                     t1.start()
                     t2=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber2], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType))
                     t2.start()
                     t3=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber3], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType))
                     t3.start()
                     t4=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber4], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType))
                     t4.start()

This obviously doesn't work for a number of reasons, first it only will create 4 threads, I would like to be able to give an option for more or less. Second it errors because it's trying to reuse a thread? Like I said i'm quite new to python and am a little over my head, I've been reading several posts on here but can't get one to work quite right. I think a queue might be something I need but couldn't quite figure it out, I experimented with the condition statement and with the join statement, but once again couldn't get what I want.

I guess to be more specific what I want to achieve is that the function is reading through a text file, retrieving coords and then exporting them as a binary file for maya to read. It's common for one of these text files to have 5-10 million x,y,z coords which takes quite some time. It takes around 30mins-1hour to do 1 file on a pretty beastly computer, task manager says python is only using 12% processor and around 1% ram, so if I could do multiple of these at once, it would make doing those 100 or more files go by a lot faster. I wouldn't think it would be to hard to multithread/queue up a for loop, but I've been lost and trying failing solutions for about a week.

Thank you all for any help, I really appreciate it and think this website is amazing. This is my first post, but I feel like I have completely learned python just from reading on here.

回答1:

Subclass threading.Thread and put your work function in that class as part of run().

import threading
import time
import random

class Worker(threading.Thread):
    def __init__(self, srcfile, printlock,**kwargs):
        super(Worker,self).__init__(**kwargs)
        self.srcfile = srcfile
        self.lock = printlock # so threads don't step on each other's prints

    def run(self):
        with self.lock:
            print("starting %s on %s" % (self.ident,self.srcfile))
        # do whatever you need to, return when done
        # example, sleep for a random interval up to 10 seconds
        time.sleep(random.random()*10)
        with self.lock:
            print("%s done" % self.ident)


def threadme(srcfiles):
    printlock = threading.Lock()
    threadpool = []
    for file in srcfiles:
        threadpool.append(Worker(file,printlock))

    for thr in threadpool:
        thr.start()

    # this loop will block until all threads are done
    # (however it won't necessarily first join those that are done first)
    for thr in threadpool:
        thr.join()

    print("all threads are done")

if __name__ == "__main__":
    threadme(["abc","def","ghi"])

As requested, to limit the number of threads, use the following:

def threadme(infiles,threadlimit=None,timeout=0.01):
    assert threadlimit is None or threadlimit > 0, \
           "need at least one thread";
    printlock = threading.Lock()
    srcfiles = list(infiles)
    threadpool = []

    # keep going while work to do or being done
    while srcfiles or threadpool:

        # while there's room, remove source files
        # and add to the pool
        while srcfiles and \
           (threadlimit is None \
            or len(threadpool) < threadlimit):
            file = srcfiles.pop()
            wrkr = Worker(file,printlock)
            wrkr.start()
            threadpool.append(wrkr)

        # remove completed threads from the pool
        for thr in threadpool:
            thr.join(timeout=timeout)
            if not thr.is_alive():
                threadpool.remove(thr)

    print("all threads are done")

if __name__ == "__main__":
    for lim in (1,2,3,4):
        print("--- Running with thread limit %i ---" % lim)
        threadme(("abc","def","ghi"),threadlimit=lim)

Note that this will actually process the sources in reverse (due to the list pop()). If you require them to be done in order, reverse the list somewhere, or use a deque and popleft().



回答2:

I would recommend using mrjob for this.

Mr Job is a python implementation of map reduce.

Below is the mr job code to do a multithreaded word count over a lot of text files:

from mrjob.job import MRJob

class MRWordCounter(MRJob):
    def get_words(self, key, line):
        for word in line.split():
            yield word, 1

    def sum_words(self, word, occurrences):
        yield word, sum(occurrences)

    def steps(self):
        return [self.mr(self.get_words, self.sum_words),]

if __name__ == '__main__':
    MRWordCounter.run()

This code maps all the files in parallel (counts the words for each file), then reduces the various counts into one single total word count.