I already wrote my script using asyncio but found that the number of coroutines running simultaneously is too large and it often ends up hanging around.
So I would like to limit the number of coroutines concurrently, and once it reaches the limit, I want to wait for any coroutine to be finished before another is executed.
My current code is something like the following:
loop = asyncio.get_event_loop()
p = map(my_func, players)
result = loop.run_until_complete(asyncio.gather(*p))
async def my_func(player):
# something done with `await`
The players
is of type list
and contains many elements (say, 12000). It needs so much computational resource to run all of them simultaneously in asyncio.gather(*p)
so I would rather like the number of players run simultaneously to be 200. Once it reaches 199, then I wish another coroutine starts to be executed.
Is this possible in asyncio?
I can suggest using asyncio.BoundedSemaphore
.
import asyncio
async def my_func(player, asyncio_semaphore):
async with asyncio_semaphore:
# do stuff
async def main():
asyncio_semaphore = asyncio.BoundedSemaphore(200)
jobs = []
for i in range(12000):
jobs.append(asyncio.ensure_future(my_func(players[i], asyncio_semaphore)))
await asyncio.gather(*jobs)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
This way, only 200 concurrent tasks can acquire semaphore and use system resources while 12000 tasks are at hand.
You might want to consider using aiostream.stream.map with the task_limit
argument:
from aiostream import stream, pipe
async def main():
xs = stream.iterate(players)
ys = stream.map(xs, my_func, task_limit=100)
zs = stream.list(ys)
results = await zs
Same approach using pipes:
async def main():
results = await (
stream.iterate(players) |
pipe.map(my_func, task_limit=100) |
pipe.list())
See the aiostream project page and the documentation for further information.
Disclaimer: I am the project maintainer.
You can wrap your gather and enforce a Semaphore:
import asyncio
async def semaphore_gather(num, coros, return_exceptions=False):
semaphore = asyncio.Semaphore(num)
async def _wrap_coro(coro):
async with semaphore:
return await coro
return await asyncio.gather(
*(_wrap_coro(coro) for coro in coros), return_exceptions=return_exceptions
)
# async def a():
# return 1
# print(asyncio.run(semaphore_gather(10, [a() for _ in range(100)])))
# [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]