Python的parallelising“异步的”(Python parallelising “as

2019-09-27 18:20发布

我在我的龙卷风处理程序下面的方法:

  async def get(self):
      url = 'url here'
      try:
          async for batch in downloader.fetch(url):
              self.write(batch)
              await self.flush()
      except Exception as e:
          logger.warning(e)

这是downloader.fetch()的代码:

async def fetch(url, **kwargs):
    timeout = kwargs.get('timeout', aiohttp.ClientTimeout(total=12))
    response_validator = kwargs.get('response_validator', json_response_validator)
    extractor = kwargs.get('extractor', json_extractor)
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url) as resp:
                response_validator(resp)
                async for batch in extractor(resp):
                    yield batch

    except aiohttp.client_exceptions.ClientConnectorError:
        logger.warning("bad request")
        raise
    except asyncio.TimeoutError:
        logger.warning("server timeout")
        raise

我想从多个相同常下载者产生“批”的对象。 我想从第一下载等第一可用的批次,直到完成所有下载者。 像这样的东西(这是不工作的代码):

async for batch in [downloader.fetch(url1), downloader.fetch(url2)]:
    ....

这可能吗? 我如何修改我为了能够从多个并行协同程序产生我在做什么?

Answer 1:

我如何修改我为了能够从多个并行协同程序产生我在做什么?

您需要合并两个异步序列合并为一个函数,从一个或另一个并行并产生元件遍历两者,因为它们变得可用。 虽然在目前的标准库中不包括这样的功能,你可以找到一个在aiostream包 。

您也可以编写自己的merge功能,如在这个答案 :

async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret

使用该功能,当环路是这样的:

async for batch in merge(downloader.fetch(url1), downloader.fetch(url2)):
    ....


Answer 2:

编辑:在评论中提到,下面的方法并行不执行给定的程序。

结帐aitertools库。

import asyncio
import aitertools

async def f1():
    await asyncio.sleep(5)
    yield 1

async def f2():
    await asyncio.sleep(6)
    yield 2

async def iter_funcs():
    async for x in aitertools.chain(f2(), f1()):
        print(x)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(iter_funcs())

看来,正在迭代函数必须couroutine。



文章来源: Python parallelising “async for”