I am confused about how to play around with the asyncio
module in Python 3.4. I have a searching
API for a search engine, and want to each search request to be run either parallel, or asynchronously, so that I don't have to wait for one search finish to start another.
Here is my high-level searching API to build some objects with the raw search results. The search engine itself is using some kind of asyncio mechanism, so I won't bother with that.
# No asyncio module used here now
class search(object):
...
self.s = some_search_engine()
...
def searching(self, *args, **kwargs):
ret = {}
# do some raw searching according to args and kwargs and build the wrapped results
...
return ret
To try to async the requests, I wrote following test case to test how I can interact my stuff with the asyncio
module.
# Here is my testing script
@asyncio.coroutine
def handle(f, *args, **kwargs):
r = yield from f(*args, **kwargs)
return r
s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(handle(s.searching, arg1, arg2, ...))
loop.close()
By running with pytest, it will return a RuntimeError: Task got bad yield : {results from searching...}
, when it hits the line r = yield from ...
.
I also tried another way.
# same handle as above
def handle(..):
....
s = search()
loop = asyncio.get_event_loop()
tasks = [
asyncio.async(handle(s.searching, arg11, arg12, ...)),
asyncio.async(handle(s.searching, arg21, arg22, ...)),
...
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
By running this test case by pytest, it passes but some weird exception from the search engine will raise. And it says Future/Task exception was never retrieved
.
Things I wish to ask:
- For my 1st try, is that the right way to use
yield from
, by returning the actual result from a function call?
- I think I need to add some sleep to my 2nd test case to wait for the task finish, but how should I do that? And how can I get my function calls to return in my 2nd test case?
- Is that a good way to implement asyncio with an existing module, by creating an async handler to handle requests?
- If the answer to question 2 is NO, does every client calls to the class
search
needs to include loop = get_event_loop()
this kind of stuffs to async the requests?
The problem is that you can't just call existing synchronous code as if it was an asyncio.coroutine
and get asynchronous behavior. When you call yield from searching(...)
, you're only going to get asynchronous behavior if searching
itself is actually an asyncio.coroutine
, or at least returns an asyncio.Future
. Right now, searching
is just a regular synchronous function, so calling yield from searching(...)
is just going to throw an error, because it doesn't return a Future
or coroutine.
To get the behavior you want, you'll need to have an asynchronous version of searching
in addition to a synchronous
version (or just drop the synchronous version altogether if you don't need it). You have a few options to support both:
- Rewrite
searching
as an asyncio.coroutine
that it uses asyncio
-compatible calls to do its I/O, rather than blocking I/O. This will make it work in an asyncio
context, but it means you won't be able to call it directly in a synchronous context anymore. Instead, you'd need to also provide an alternative synchronous searching
method that starts an asyncio
event loop and calls return loop.run_until_complete(self.searching(...))
. See this question for more details on that.
Keep your synchronous implementation of searching
, and provide an alternative asynchronous API that uses BaseEventLoop.run_in_executor
to run your the searching
method in a background thread:
class search(object):
...
self.s = some_search_engine()
...
def searching(self, *args, **kwargs):
ret = {}
...
return ret
@asyncio.coroutine
def searching_async(self, *args, **kwargs):
loop = kwargs.get('loop', asyncio.get_event_loop())
try:
del kwargs['loop'] # assuming searching doesn't take loop as an arg
except KeyError:
pass
r = yield from loop.run_in_executor(None, self.searching, *args) # Passing None tells asyncio to use the default ThreadPoolExecutor
return r
Testing script:
s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(s.searching_async(arg1, arg2, ...))
loop.close()
This way, you can keep your synchronous code as is, and at least provide methods that can be used in asyncio
code without blocking the event loop. It's not as clean a solution as it would be if you actually used asynchronous I/O in your code, but its better than nothing.
- Provide two completely separate versions of
searching
, one that uses blocking I/O, and one that's asyncio
-compatible. This gives ideal implementations for both contexts, but requires twice the work.