Im in the process of moving some synchronous code to asyncio using aiohttp. the synchronous code was taking 15 minutes to run, so I'm hoping to improves this.
I have some working code which gets data from some urls and returns the body of each. But this is just against 1 lab site, I have 70+ actual sites.
So if I got a loop to create a list of all the urls for all sites that would make 700 urls in a list to be processed. Now processing them I don't think is a problem?
But doing 'stuff' with the results, I'm not sure how to program? I have code already that will do 'stuff' to each of the results that are returned, but I'm not sure how to program against the right type of result.
When the code runs does it process all urls and depending on the time to run, return an unknown order?
Do I need a function that will process any type of result?
import asyncio, aiohttp, ssl
from bs4 import BeautifulSoup
def page_content(page):
return BeautifulSoup(page, 'html.parser')
async def fetch(session, url):
with aiohttp.Timeout(15, loop=session.loop):
async with session.get(url) as response:
return page_content(await response.text())
async def get_url_data(urls, username, password):
tasks = []
# Fetch all responses within one Client session,
# keep connection alive for all requests.
async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session:
for i in urls:
task = asyncio.ensure_future(fetch(session, i))
tasks.append(task)
responses = await asyncio.gather(*tasks)
# you now have all response bodies in this variable
for i in responses:
print(i.title.text)
return responses
def main():
username = 'monitoring'
password = '*********'
ip = '10.10.10.2'
urls = [
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'),
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'),
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'),
]
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_url_data(urls,username,password))
data = loop.run_until_complete(future)
print(data)
if __name__ == "__main__":
main()
Your code isn't far from the mark.
asyncio.gather
returns the results in the order of the arguments, so order is preserved here, butpage_content
will not be called in order.A few tweaks:
First of all, you do not need
ensure_future
here. Creating a Task is only needed if you are trying to have a coroutine outlive its parent, ie if the task has to continue running even though the function that created it is done. Here what you need is instead callingasyncio.gather
directly with your coroutines:But calling this would schedule all the fetch at the same time, and with a high number of URLs, this is far from optimal. Instead you should choose a maximum concurrency and ensure at most X fetches are running at any time. To implement this, you can use a
asyncio.Semaphore(20)
, this semaphore can only be acquired by at most 20 coroutines, so the others will wait to acquire until a spot is available.This way, all the fetches are started immediately, but only 20 of them will be able to acquire the semaphore. The others will block at the first
async with
instruction and wait until another fetch is done.I have also replaced the aiohttp.Timeout with the official asyncio equivalent here.
Finally, for the actual processing of the data, if you are limited by CPU time, asyncio will probably not help you much. You will need to use a
ProcessPoolExecutor
here to parallelise the actual work to another CPU.run_in_executor
will probably be of use to.Here's an example with
concurrent.futures.ProcessPoolExecutor
. If it's created without specifyingmax_workers
, the implementation will useos.cpu_count
instead. Also note thatasyncio.wrap_future
is public but undocumented. Alternatively, there'sAbstractEventLoop.run_in_executor
.