Why i need cancel tasks in this queue example?

2019-08-09 01:31发布

Well I am looking into python documentation for study for my work. I am new to python and also programming, I also do not understand concepts of programming like async operations very well.

I usign Fedora 29 with Python 3.7.3 for try examples of queue and the lib asyncio.

Follow the example of queue and async operations below:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

asyncio.run(main())

Why in this example I need cancel the tasks? Why I can exclude this part of code

# Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

and the example work fine?

1条回答
倾城 Initia
2楼-- · 2019-08-09 01:50

Why in this example i need cancel the tasks?

Because they will otherwise remain hanging indefinitely, waiting for a new item in the queue that will never arrive. In that particular example you are exiting the event loop anyway, so there's no harm from them "hanging", but if you did that as part of a utility function, you would create a coroutine leak.

In other words, canceling the workers tells them to exit because their services are no longer necessary, and is needed to ensure that resources associated with them get freed.

查看更多
登录 后发表回答