I'm trying to rewrite this Python2.7 code to the new async world order:
def get_api_results(func, iterable):
pool = multiprocessing.Pool(5)
for res in pool.map(func, iterable):
yield res
map()
blocks until all results have been computed, so I'm trying to rewrite this as an async implementation that will yield results as soon as they are ready. Like map()
, return values must be returned in the same order as iterable
. I tried this (I need requests
because of legacy auth requirements):
import requests
def get(i):
r = requests.get('https://example.com/api/items/%s' % i)
return i, r.json()
async def get_api_results():
loop = asyncio.get_event_loop()
futures = []
for n in range(1, 11):
futures.append(loop.run_in_executor(None, get, n))
async for f in futures:
k, v = await f
yield k, v
for r in get_api_results():
print(r)
but with Python 3.6 I'm getting:
File "scratch.py", line 16, in <module>
for r in get_api_results():
TypeError: 'async_generator' object is not iterable
How can I accomplish this?
Regarding your older (2.7) code - multiprocessing is considered a powerful drop-in replacement for the much simpler threading module for concurrently processing CPU intensive tasks, where threading does not work so well. Your code is probably not CPU bound - since it just needs to make HTTP requests - and threading might have been enough for solving your problem.
However, instead of using
threading
directly, Python 3+ has a nice module called concurrent.futures that with a cleaner API via coolExecutor
classes. This module is available also for python 2.7 as an external package.The following code works on python 2 and python 3:
This code uses
futures.ThreadPoolExecutor
, based on threading. A lot of the magic is inas_completed()
used here.Your python 3.6 code above, uses
run_in_executor()
which creates afutures.ProcessPoolExecutor()
, and does not really use asynchronous IO!!If you really want to go forward with asyncio, you will need to use an HTTP client that supports asyncio, such as aiohttp. Here is an example code:
As you can see,
asyncio
also has anas_completed()
, now using real asynchronous IO, utilizing only one thread on one process.You put your event loop in another co-routine. Don't do that. The event loop is the outermost 'driver' of async code, and should be run synchronous.
If you need to process the fetched results, write more coroutines that do so. They could take the data from a queue, or could be driving the fetching directly.
You could have a main function that fetches and processes results, for example:
I'd make the
get()
function properly async too using an async library likeaiohttp
so you don't have to use the executor at all.