我有一个功能,我们称之为my_function(*args, **kwargs)
即根据传递给它的参数从30秒的任何地方需要很多很多时间(天)。 我有不同的参数列表,我想知道的功能需要多久给出的参数。
我还是很新的使用timeit
,但我有足够的学习做这部分; 然而,这不会是我的目的非常高效。 任何一套需要更长的时间超过4小时才能完成的考虑棘手的论点,尽管所有的人都能够在“有限”的时间来解决。 作为参数一些(也许是大多数)将导致运行时间,将采取20小时以上向上,我正在寻找一种方式,4小时后终止试验,这样我就不必浪费时间在那之后它是棘手。
我已经看过超时上一个Python函数调用和一段时间后停止代码 ,这可能是足够的一个问题的接近是重复的,但我有麻烦集成这些问题的答案与timeit
使倍的有4小时数记录像他们应该同时长跑返回一些有效时间大于4小时。
什么是去这样做的最佳方式?
编辑:一个我曾与它的问题是,我已经看到了答案,有需要的func(*args,**kwargs)
而timeit
功能如下:
timeit.Timer('my_function(*args, **kwargs)', setup=setup).timeit(1)
我不知道如何处理这种形式。
编辑:我下面使用线程实际上并没有终止线程提供的原始答案。 这可以很容易通过用下面的函数运行它显示。
def foo(x):
for i in xrange(1, x + 1):
sleep(1)
print i
return x
使用涉及代码multiprocessing.Pool
,这实际上有一个terminate()
允许这一点。
基于中找到了答案使用线程在Python不工作超时功能 。 如果你尝试一下在foo(x)
它确实停止不像我以前的答案计数。
import multiprocessing as mp
import timeit
def timeout(func, args=(), kwargs=None, TIMEOUT=10, default=None, err=.05):
if hasattr(args, "__iter__") and not isinstance(args, basestring):
args = args
else:
args = [args]
kwargs = {} if kwargs is None else kwargs
pool = mp.Pool(processes = 1)
try:
result = pool.apply_async(func, args=args, kwds=kwargs)
val = result.get(timeout = TIMEOUT * (1 + err))
except mp.TimeoutError:
pool.terminate()
return default
else:
pool.close()
pool.join()
return val
def Timeit(command, setup=''):
return timeit.Timer(command, setup=setup).timeit(1)
def timeit_timeout(command, setup='', TIMEOUT=10, default=None, err=.05):
return timeout(Timeit, args=command, kwargs={'setup':setup},
TIMEOUT=TIMEOUT, default=default, err=err)
经过一番摆弄多我根据初步回答使用线程超时功能 。 我还是愿意爱任何人,因为我还是新来这个谁拥有更好的想法听。
def timeout(func, args=None, kwargs=None, TIMEOUT=10, default=None, err=.05):
if args is None:
args = []
elif hasattr(args, "__iter__") and not isinstance(args, basestring):
args = args
else:
args = [args]
kwargs = {} if kwargs is None else kwargs
import threading
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = None
def run(self):
try:
self.result = func(*args, **kwargs)
except:
self.result = default
it = InterruptableThread()
it.start()
it.join(TIMEOUT* (1 + err))
if it.isAlive():
return default
else:
return it.result
def timeit_timeout(command, setup='', TIMEOUT=10, default=None, err=.05):
import timeit
f = lambda: timeit.Timer(command, setup=setup).timeit(1)
return timeout(f,TIMEOUT=TIMEOUT, default=default, err=err)
if __name__ == '__main__':
from time import sleep
setup = 'gc.enable()\nfrom time import sleep'
for n in range(1,15):
command = 'sleep({})'.format(n)
print timeit_timeout(command, setup=setup, default=float('Inf'))