Python's asyncio works synchronously

2019-02-25 03:36发布

I'm trying to leverage Python's new asyncio library to send asynchronous HTTP requests. I want to wait a few miliseconds (the timeout variable) before sending each request - but of course - send them all asynchronously and, not wait for a response after each request sent.

I'm doing something like the following:

@asyncio.coroutine
def handle_line(self, line, destination):
    print("Inside! line {} destination {}".format(line, destination))
    response = yield from aiohttp.request('POST', destination, data=line,
                               headers=tester.headers)
    print(response.status)
    return (yield from response.read())

@asyncio.coroutine
def send_data(self, filename, timeout):
    destination='foo'
    logging.log(logging.DEBUG, 'sending_data')
    with open(filename) as log_file:
        for line in log_file:
            try:
                json_event = json.loads(line)
            except ValueError as e:
                print("Error parsing json event")
            time.sleep(timeout)
            yield from asyncio.async(self.handle_line(json.dumps(json_event), destination))


loop=asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))

The output that I am getting (by printing the 200 responses) looks like this code is running synchronously. What am I doing wrong?

1条回答
时光不老,我们不散
2楼-- · 2019-02-25 04:38

There are a couple of issues here:

  1. You should use asyncio.sleep, not time.sleep, because the latter will block the event loop.

  2. You shouldn't be using yield from after the asyncio.async(self.handle_line(...)) call, because that will make the script block until the self.handle_line coroutine is complete, which means you don't end up doing anything concurrently; you process each line, wait for the processing to complete, then move on to the next line. Instead, you should run all the asyncio.async calls without waiting, save the Task objects returned to a list, and then use asyncio.wait to wait for them all to complete once you've started them all.

Putting that all together:

@asyncio.coroutine
def handle_line(self, line, destination):
    print("Inside! line {} destination {}".format(line, destination))
    response = yield from aiohttp.request('POST', destination, data=line,
                               headers=tester.headers)
    print(response.status)
    return (yield from response.read())

@asyncio.coroutine
def send_data(self, filename, timeout):
    destination='foo'
    logging.log(logging.DEBUG, 'sending_data')
    tasks = []
    with open(filename) as log_file:
        for line in log_file:
            try:
                json_event = json.loads(line)
            except ValueError as e:
                print("Error parsing json event")
            yield from asyncio.sleep(timeout)
            tasks.append(asyncio.async(
                 self.handle_line(json.dumps(json_event), destination))
    yield from asyncio.wait(tasks)


asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))
查看更多
登录 后发表回答