在后台执行的函数,同时使用核/线程的数量有限和队列额外处决?在后台执行的函数,同时使用核/线程的数量

2019-05-12 13:39发布

我想使用的线程数量有限(最多2 )运行在一个类中的功能在后台磁盘上删除一些文件。 同一类中我的代码的其余部分是独立的这一背景功能,并可能会执行数十倍比背景功能的更多。 不过,我仍然需要强制执行的核心/线程限制。 因此,它是可能的后台作业可能超过2 ,我需要排队它们。 请注意,我的背景函数带任何参数。

我是很新的多线程和多处理,但我想我已经做了功课,看了看很多帖子在这里对堆栈溢出 ,并试图一对夫妇的做法。 然而,没有这些方法似乎为我工作。 这里是我的代码的结构:

class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        self.backgroundFunc() #I want to run this in the background

下面是如何运行的代码

import myClass

myClassInstance = myClass()
For element in someList:
    myClassInstance.mainFunc(elem=element)

请注意,在东西之前,我无法启动后台作业mainFunc已开始运行。

这是我的第一次尝试threading在我的类文件:

from threading import Thread
class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        thr = Thread(target=self.backgroundFunc)
        thr.start()

但是,这种方法的问题是程序崩溃随机时间; 有时就在prpgram执行,有时年初以后埃罗消息每次也不同。 我想这可能是因为线程不会阻止一块内存,事情可能会被写入/从这些存储单元读取。 或者,不太可能的,也许这是因为我的服务器上运行我的代码,并有从分配的资源服务器强制一些限制。 此外,我不能设置线程数限制,不能做到排队,万一mainFunc而我已经运行两个后台作业代码被执行两倍以上。

这里的另一个尝试multiprocessing.Process

from multiprocessing import Process
class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        p = Process(target=self.backgroundFunc)
        p.start()

这种方法的问题是,过程要使用尽可能多线程/内核,我的机器在其处置和因为我的代码的其余部分将自动被并行运行,一切都变得超慢速度非常快。

我最终抵达multiprocessing.Pool但我还是我如何能有效地使用它很困惑。 不管怎么说,这是我尝试用Pool

from multiprocessing import Pool
class myClass(object):
    def __init__(self):
        #some stuff
        self.pool = Pool(processes=2)
    def backgroundFunc(self):
        # delete some files on disk
        print('some stuff')
    def mainFunc(self, elem):
        # Do some other things
        self.pool.apply_async(self.backgroundFunc)

然而, apply_async似乎不起作用。 没有任何的print ,我在发言backgroundFunc在屏幕上打印任何东西。 我加self.pool.close()apply_async ,但我得到了一些错误后不久,第二个过程将开始。 我试图使用像self.pool.apply和其他一些人,但他们似乎需要一个函数,有限的参数。 但我的backgroundFunc不带任何参数。 最后,我不知道我该怎么做的queuing ,我刚才解释使用游泳池。

另外,我想有过多少次,当我想运行控制backgroundFunc 。 此外, mainFunc不应该等待所有线程完成运行在退出前。 如果出现这种情况,我也不会从多线程中获益,因为后台功能可能需要很长时间才能完成。 也许我本来应该在问题更加清晰; 对于那个很抱歉。

所以,我真的很感激,如果有人能帮助我与此有关。 我很困惑。 提前致谢!

Answer 1:

程序随机崩溃。 我猜

它会更容易集中在一个时间一个问题,没有猜测,那么,什么是崩溃

下面是一个测试threading可能会激发你的基础上,对例如queue

#!python3
#coding=utf-8
""" https://stackoverflow.com/q/49081260/ """

import sys, time, threading, queue

print(sys.version)

class myClass:
    """ """

    def __init__(self):
        """ """
        self.q = queue.Queue()
        self.threads = []
        self.num_worker_threads = 2

    def backgroundFunc(self):
        """ """
        print("thread started")
        while True:
            item = self.q.get()
            if item is None:
                #self.q.task_done()
                break
            print("working on ", item)
            time.sleep(0.5)
            self.q.task_done()
        print("thread stopping")

    def mainFunc(self):
        """ """

        print("starting thread(s)")
        for i in range(self.num_worker_threads):
            t = threading.Thread(target=self.backgroundFunc)
            t.start()
            self.threads.append(t)

        print("giving thread(s) some work")
        for item in range(5):
            self.q.put(item)

        print("giving thread(s) more work")
        for item in range(5,10):
            self.q.put(item)

        # block until all tasks are done
        print("waiting for thread(s) to finish")
        self.q.join()

        # stop workers
        print("stopping thread(s)")
        for i in range(self.num_worker_threads):
            self.q.put(None)
        for t in self.threads:
            self.q.join()

        print("finished")



if __name__ == "__main__":
    print("instance")
    myClassInstance = myClass()

    print("run")
    myClassInstance.mainFunc()

    print("end.")

它打印

3.6.1 (v3.6.1:69c0db5, Mar 21 2017, 17:54:52) [MSC v.1900 32 bit (Intel)]
instance
run
starting thread(s)
thread started
thread started
giving thread(s) some work
giving thread(s) more work
waiting for thread(s) to finish
working on  0
working on  1
working on  2
working on  3
working on  4
working on  5
working on  6
working on  7
working on  8
working on  9
stopping thread(s)
thread stopping
thread stopping
finished
end.


文章来源: Executing a function in the background while using limited number of cores/threads and queue the extra executions?