Python 3.5 async for blocks the ioloop

2019-06-28 06:40发布

I have a simple aiohttp-server with two handlers. First one does some computations in the async for loop. Second one just returns text response. not_so_long_operation returns 30-th fibonacci number with the slowest recursive implementation, which takes something about one second.

def not_so_long_operation():
    return fib(30)

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            return i
        else:
            raise StopAsyncIteration

# GET /
async def index(request):
    print('request!')
    l = []
    async for i in arange(20):
        print(i)
        l.append(not_so_long_operation())

    return aiohttp.web.Response(text='%d\n' % l[0])

# GET /lol/
async def lol(request):
    print('request!')
    return aiohttp.web.Response(text='just respond\n')

When I'm trying to fetch / and then /lol/, it gives me response for the second one only when the first one gets finished.
What am I doing wrong and how to make index handler release the ioloop on each iteration?

3条回答
Melony?
2楼-- · 2019-06-28 07:04

An asynchronous iterator is not really needed here. Instead you can simply give the control back to the event loop inside your loop. In python 3.4, this is done by using a simple yield:

@asyncio.coroutine
def index(self):
    for i in range(20):
        not_so_long_operation()
        yield

In python 3.5, you can define an Empty object that basically does the same thing:

class Empty:
    def __await__(self):
        yield

Then use it with the await syntax:

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await Empty()

Or simply use asyncio.sleep(0) that has been recently optimized:

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await asyncio.sleep(0)

You could also run the not_so_long_operation in a thread using the default executor:

async def index(request, loop):
    for i in range(20):
        await loop.run_in_executor(None, not_so_long_operation)
查看更多
手持菜刀,她持情操
3楼-- · 2019-06-28 07:10

Since, fib(30) is CPU bound and sharing little data, you should probably use a ProcessPoolExecutor (as opposed to a ThreadPoolExecutor):

async def index(request):
    loop = request.app.loop
    executor = request.app["executor"]
    result = await loop.run_in_executor(executor, fib, 30)
    return web.Response(text="%d" % result)

Setup executor when you create the app:

app = Application(...)
app["exector"] = ProcessPoolExector()
查看更多
▲ chillily
4楼-- · 2019-06-28 07:20

Your example has no yield points (await statements) for switching between tasks. Asynchronous iterator allows to use await inside __aiter__/__anext__ but don't insert it automatically into your code.

Say,

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            await asyncio.sleep(0)  # insert yield point
            return i
        else:
            raise StopAsyncIteration

should work as you expected.

In real application most likely you don't need await asyncio.sleep(0) calls because you will wait on database access and similar activities.

查看更多
登录 后发表回答