如何多线程的操作中Python中的循环(How to Multi-thread an Operati

2019-07-20 18:21发布

说我有一个非常大名单,我执行,像这样的操作:

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

我的问题是双重的:

  • 有很多项目
  • api.my_operation需要永远返回

我想使用多线程同时旋转了一堆api.my_operations的,所以我可以同时处理,也许5或10年,甚至100个项目。

如果my_operation()返回一个异常(因为也许我已经处理该项目) - 这是确定。 它不会破坏任何东西。 循环可以继续到下一个项目。

:这是为Python 2.7.3

Answer 1:

首先,在Python中,如果你的代码是CPU密集型的,多线程也无济于事,因为只有一个线程可持有全局解释器锁,因此运行Python代码,在一个时间。 所以,你需要使用的过程,而不是线程。

如果你的操作“需要永远返回”,因为它的IO的限制,也就是说,等待网络或磁盘备份或类似这是不正确的。 我会回来以后。


接下来,在一次处理5个或10个或100个项目的方法是创建5或10或100名工人池,并把物品放入队列,工人的服务。 幸运的是,STDLIB multiprocessingconcurrent.futures库都包了大部分的细节给你。

前者是传统的编程更加强大和灵活; 后者则是,如果你需要共谱等待更简单; 为了微不足道的情况下,它其实并不重要,你选择哪一个。 (在这种情况下,与每个最明显的执行需要3行与futures ,4行multiprocessing )。

如果您使用2.6-2.7或3.0-3.1, futures是不是内置的,但你可以从安装它的PyPI ( pip install futures )。


最后,它通常是简单了很多并行的事情,如果你可以把整个循环迭代到一个函数调用(你可以,例如,传递给map ),让我们做的第一:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

全部放在一起:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

如果你有很多相对较小的职位,多的开销可能会淹没收益。 要解决的方式是批量了工作纳入较大的作业。 例如(使用grouperitertools食谱 ,您可以复制并粘贴到你的代码,或者从一开始more-itertools PyPI上的项目):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

最后,如果你的代码是IO约束呢? 然后线程一样好流程,并以更少的开销(和更少的限制,但这些限制通常不会影响到你的情况是这样)。 有时,“少开销”足以意味着你不需要使用线程配料,但你做的过程,这是一个很好的胜利。

那么,你如何使用线程,而不是流程? 只要改变ProcessPoolExecutorThreadPoolExecutor

如果你的代码是否是CPU绑定或IO的限制还不能确定,只是尝试两者兼得。


我能做到这一点的多重功能在我的python脚本? 举例来说,如果我有另外的for循环中,我想并行代码的其他地方。 是否有可能做两个多线程功能,在同一个脚本?

是。 事实上,有两种不同的方式来做到这一点。

首先,你可以共享相同的(线程或进程)执行,并用它从多个地方,没有问题。 任务和期货的整点是,他们是自包含的; 你不关心他们跑哪里,只是你排队起来,并最终得到答案了。

或者,你可以在没有问题同方案二执行者。 这有一个性能开销,如果你同时使用这两种执行人,你最终会试图在8个内核,这意味着有将是一些上下文切换运行(例如)16条忙线。 但有时这是值得做的事情,因为,比如说,两个执行者在同一时间很少忙,它使你的代码变得简单许多。 或者,也许一个执行程序运行需要一段时间才能完成非常大的任务,以及其他正在运行需要尽快完成,因为反应比吞吐量更重要的是你的计划的一部分,很小的任务。

如果你不知道哪个是适合您的程序,通常它是第一个。



Answer 2:

编辑2018年2月6日 :修订基于此评论

编辑 :忘了提及,这个工程上的Python 2.7.x

有multiprocesing.pool,和下面的示例说明了如何使用其中的一种:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

现在,如果你真的确定你的进程CPU绑定为@abarnert提到,改变线程池的进程池实现(下线程池进口评论)。 你可以在这里找到更多的细节: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers



Answer 3:

您可以分割加工成使用这样的方法的线程指定数量的:

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)


文章来源: How to Multi-thread an Operation Within a Loop in Python