Python 3.5 async for blocks the ioloop

2019-06-28 06:41发布

问题:

I have a simple aiohttp-server with two handlers. First one does some computations in the async for loop. Second one just returns text response. not_so_long_operation returns 30-th fibonacci number with the slowest recursive implementation, which takes something about one second.

def not_so_long_operation():
    return fib(30)

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            return i
        else:
            raise StopAsyncIteration

# GET /
async def index(request):
    print('request!')
    l = []
    async for i in arange(20):
        print(i)
        l.append(not_so_long_operation())

    return aiohttp.web.Response(text='%d\n' % l[0])

# GET /lol/
async def lol(request):
    print('request!')
    return aiohttp.web.Response(text='just respond\n')

When I'm trying to fetch / and then /lol/, it gives me response for the second one only when the first one gets finished.
What am I doing wrong and how to make index handler release the ioloop on each iteration?

回答1:

Your example has no yield points (await statements) for switching between tasks. Asynchronous iterator allows to use await inside __aiter__/__anext__ but don't insert it automatically into your code.

Say,

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            await asyncio.sleep(0)  # insert yield point
            return i
        else:
            raise StopAsyncIteration

should work as you expected.

In real application most likely you don't need await asyncio.sleep(0) calls because you will wait on database access and similar activities.



回答2:

Since, fib(30) is CPU bound and sharing little data, you should probably use a ProcessPoolExecutor (as opposed to a ThreadPoolExecutor):

async def index(request):
    loop = request.app.loop
    executor = request.app["executor"]
    result = await loop.run_in_executor(executor, fib, 30)
    return web.Response(text="%d" % result)

Setup executor when you create the app:

app = Application(...)
app["exector"] = ProcessPoolExector()


回答3:

An asynchronous iterator is not really needed here. Instead you can simply give the control back to the event loop inside your loop. In python 3.4, this is done by using a simple yield:

@asyncio.coroutine
def index(self):
    for i in range(20):
        not_so_long_operation()
        yield

In python 3.5, you can define an Empty object that basically does the same thing:

class Empty:
    def __await__(self):
        yield

Then use it with the await syntax:

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await Empty()

Or simply use asyncio.sleep(0) that has been recently optimized:

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await asyncio.sleep(0)

You could also run the not_so_long_operation in a thread using the default executor:

async def index(request, loop):
    for i in range(20):
        await loop.run_in_executor(None, not_so_long_operation)