为了账号安全,请及时绑定邮箱和手机立即绑定

asyncio.CancelledError 和“从未检索到_GatheringFuture 异常”

asyncio.CancelledError 和“从未检索到_GatheringFuture 异常”

忽然笑 2023-05-16 14:29:25
我正在观看import asyncio:学习 Python 的 AsyncIO #3 - 使用协程。讲师举了下面的例子:import asyncioimport datetimeasync def keep_printing(name):    while True:        print(name, end=" ")        print(datetime.datetime.now())        await asyncio.sleep(0.5)async def main():    group_task = asyncio.gather(                     keep_printing("First"),                     keep_printing("Second"),                     keep_printing("Third")                 )    try:        await asyncio.wait_for(group_task, 3)    except asyncio.TimeoutError:        print("Time's up!")if __name__ == "__main__":    asyncio.run(main())输出有异常:First 2020-08-11 14:53:12.079830Second 2020-08-11 14:53:12.079830Third 2020-08-11 14:53:12.080828 First 2020-08-11 14:53:12.580865Second 2020-08-11 14:53:12.580865Third 2020-08-11 14:53:12.581901 First 2020-08-11 14:53:13.081979Second 2020-08-11 14:53:13.082408Third 2020-08-11 14:53:13.082408 First 2020-08-11 14:53:13.583497Second 2020-08-11 14:53:13.583935Third 2020-08-11 14:53:13.584946First 2020-08-11 14:53:14.079666Second 2020-08-11 14:53:14.081169Third 2020-08-11 14:53:14.115689First 2020-08-11 14:53:14.570694Second 2020-08-11 14:53:14.571668Third 2020-08-11 14:53:14.635769First 2020-08-11 14:53:15.074124Second 2020-08-11 14:53:15.074900Time's up!_GatheringFuture exception was never retrievedfuture: <_GatheringFuture finished exception=CancelledError()>concurrent.futures._base.CancelledError然后讲师就继续讨论其他主题,再也没有回到这个例子来说明如何修复它。幸运的是,通过实验,我发现我们可以通过在async 函数try/except中添加另一个来修复它:事实上,有了这个版本main,我们甚至不需要try...except asyncio.CancelledErrorin keep_printing。它仍然可以正常工作。那是为什么?为什么 catching CancelledErrorin mainwork 而不是 in keep_printing?视频讲师处理此异常的方式只会让我更加困惑。keep_printing他根本不需要更改任何代码!
查看完整描述

3 回答

?
守着星空守着你

TA贡献1799条经验 获得超8个赞

让我们看看发生了什么:


此代码安排三个协程执行并返回Future对象group_task(内部类的实例_GatheringFuture)聚合结果。

group_task = asyncio.gather(

                     keep_printing("First"),

                     keep_printing("Second"),

                     keep_printing("Third")

                 )

此代码等待未来完成超时。如果发生超时,它会取消未来并引发asyncio.TimeoutError.

    try:

        await asyncio.wait_for(group_task, 3)

    except asyncio.TimeoutError:

        print("Time's up!")

发生超时。让我们看看 asyncio 库task.py。wait_for执行以下操作:

timeout_handle = loop.call_later(timeout, _release_waiter, waiter)

...

await waiter

...

await _cancel_and_wait(fut, loop=loop)  # _GatheringFuture.cancel() inside

raise exceptions.TimeoutError()

当我们这样做时_GatheringFuture.cancel(),如果实际上取消了任何子任务,CancelledError则传播

class _GatheringFuture(futures.Future):

    ...

    def cancel(self):

        ...

        for child in self._children:

            if child.cancel():

                ret = True

        if ret:

            # If any child tasks were actually cancelled, we should

            # propagate the cancellation request regardless of

            # *return_exceptions* argument.  See issue 32684.

            self._cancel_requested = True

        return ret

然后


...

if outer._cancel_requested:

    # If gather is being cancelled we must propagate the

    # cancellation regardless of *return_exceptions* argument.

    # See issue 32684.

    outer.set_exception(exceptions.CancelledError())

else:

    outer.set_result(results)

因此,从收集中提取结果或异常更为正确future

async def main():

    group_task = asyncio.gather(

                     keep_printing("First"),

                     keep_printing("Second"),

                     keep_printing("Third")

                 )

    try:

        await asyncio.wait_for(group_task, 3)

    except asyncio.TimeoutError:

        print("Time's up!")


    try:

        result = await group_task

    except asyncio.CancelledError:

        print("Gather was cancelled")


查看完整回答
反对 回复 2023-05-16
?
烙印99

TA贡献1829条经验 获得超13个赞

我认为你需要await放在asyncio.gather. 因此,此调用取自您的代码:

    group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )

需要改成:

    group_task = await asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )

不知道为什么,我还在学习这些东西。


查看完整回答
反对 回复 2023-05-16
?
繁星点点滴滴

TA贡献1803条经验 获得超3个赞

当aw因为超时被取消时,wait_for等待aw被取消。如果将 CancelledError 处理到协程中,则会出现超时错误。这在 3.7 版中发生了变化。

例子


import asyncio

import datetime


async def keep_printing(name):

    print(datetime.datetime.now())

    try:

        await asyncio.sleep(3600)

    except asyncio.exceptions.CancelledError:

        print("done")


async def main():

    try:

        await asyncio.wait_for(keep_printing("First"), timeout=3)

    except asyncio.exceptions.TimeoutError:

        print("timeouted")



if __name__ == "__main__":

    asyncio.run(main())

用于从 Task 或 Future 检索结果的 gather 方法,你有一个无限循环并且从不返回任何结果。如果 aws 序列中的任何 Task 或 Future 被取消(wait_for 发生了什么),它会被视为引发了 CancelledError——在这种情况下 gather() 调用不会被取消。这是为了防止取消一个已提交的任务/未来导致其他任务/未来被取消。

对于保护聚集法,你可以将它覆盖到盾牌上。


import asyncio

import datetime



async def keep_printing(name):

    while True:

        print(name, datetime.datetime.now())

        try:

            await asyncio.sleep(0.5)

        except asyncio.exceptions.CancelledError:

            print(f"canceled {name}")

            return None


async def main():

    group_task = asyncio.shield(asyncio.gather(

                     keep_printing("First"),

                     keep_printing("Second"),

                     keep_printing("Third"))

                    )

    try:

        await asyncio.wait_for(group_task, 3)

    except asyncio.exceptions.TimeoutError:

        print("Done")



if __name__ == "__main__":

    asyncio.run(main())


查看完整回答
反对 回复 2023-05-16
  • 3 回答
  • 0 关注
  • 244 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信