-->

Combine tornado gen.coroutine and joblib mem.cache

2019-06-09 10:56发布

问题:

Imagine having a function, which handles a heavy computational job, that we wish to execute asynchronously in a Tornado application context. Moreover, we would like to lazily evaluate the function, by storing its results to the disk, and not rerunning the function twice for the same arguments.

Without caching the result (memoization) one would do the following:

def complex_computation(arguments):
    ...
    return result

@gen.coroutine
def complex_computation_caller(arguments):
    ...
    result = complex_computation(arguments)
    raise gen.Return(result)

Assume to achieve function memoization, we choose Memory class from joblib. By simply decorating the function with @mem.cache the function can easily be memoized:

@mem.cache
def complex_computation(arguments):
    ...
    return result

where mem can be something like mem = Memory(cachedir=get_cache_dir()).

Now consider combining the two, where we execute the computationally complex function on an executor:

class TaskRunner(object):
    def __init__(self, loop=None, number_of_workers=1):
        self.executor = futures.ThreadPoolExecutor(number_of_workers)
        self.loop = loop or IOLoop.instance()

    @run_on_executor
    def run(self, func, *args, **kwargs):
        return func(*args, **kwargs)

mem = Memory(cachedir=get_cache_dir())
_runner = TaskRunner(1)

@mem.cache
def complex_computation(arguments):
    ...
    return result

@gen.coroutine
def complex_computation_caller(arguments):
    result = yield _runner.run(complex_computation, arguments)
    ...
    raise gen.Return(result)

So the first question is whether the aforementioned approach is technically correct?

Now let's consider the following scenario:

@gen.coroutine
def first_coroutine(arguments):
    ...
    result = yield second_coroutine(arguments)
    raise gen.Return(result)

@gen.coroutine
def second_coroutine(arguments):
    ...
    result = yield third_coroutine(arguments)
    raise gen.Return(result)

The second question is how one can memoize second_coroutine? Is it correct to do something like:

@gen.coroutine
def first_coroutine(arguments):
    ...
    mem = Memory(cachedir=get_cache_dir())
    mem_second_coroutine = mem(second_coroutine)
    result = yield mem_second_coroutine(arguments)
    raise gen.Return(result)

@gen.coroutine
def second_coroutine(arguments):
    ...
    result = yield third_coroutine(arguments)
    raise gen.Return(result)

[UPDATE I] Caching and reusing a function result in Tornado discusses using functools.lru_cache or repoze.lru.lru_cache as a solution for second question.

回答1:

The Future objects returned by Tornado coroutines are reusable, so it generally works to use in-memory caches such as functools.lru_cache, as explained in this question. Just be sure to put the caching decorator before @gen.coroutine.

On-disk caching (which seems to be implied by the cachedir argument to Memory) is trickier, since Future objects cannot generally be written to disk. Your TaskRunner example should work, but it's doing something fundamentally different from the others because complex_calculation is not a coroutine. Your last example will not work, because it's trying to put the Future object in the cache.

Instead, if you want to cache things with a decorator, you'll need a decorator that wraps the inner coroutine with a second coroutine. Something like this:

def cached_coroutine(f):
    @gen.coroutine
    def wrapped(*args):
        if args in cache:
            return cache[args]
        result = yield f(*args)
        cache[args] = f
        return result
    return wrapped