为了账号安全,请及时绑定邮箱和手机立即绑定

如何在调用 `async for in` 后获取异步生成器的下一次迭代

如何在调用 `async for in` 后获取异步生成器的下一次迭代

斯蒂芬大帝 2023-03-16 16:16:43
使用 FastAPI 我试图检测StreamingResponse是否已完全被客户端使用或是否已被取消。我有以下示例应用程序:import asyncioimport uvicornfrom fastapi import FastAPIfrom fastapi.responses import StreamingResponseapp = FastAPI()async def ainfinite_generator():    while True:        yield b"some fake data "        await asyncio.sleep(.001)async def astreamer(generator):    try:        async for data in generator:            yield data    except Exception as e:        # this isn't triggered by a cancelled request        print(e)    finally:        # this always throws a StopAsyncIteration exception        # no matter whether the generator was consumed or not        leftover = await generator.__anext__()        if leftover:            print("we didn't finish")        else:            print("we finished")@app.get("/")async def infinite_stream():    return StreamingResponse(astreamer(ainfinite_generator()))if __name__ == "__main__":    uvicorn.run(app, host="0.0.0.0", port=8000)它似乎是第一个async for in generator“astreamer消耗”异步生成器的。在该循环之后,进一步尝试获得下一次迭代失败并出现异常StopAsyncIteration,即使生成器如上定义的那样是“无限的”。我查看了PEP-525,我唯一看到的是,如果将异常抛入生成器,它将导致进一步尝试从生成器读取以抛出 StopAsyncIteration 异常,但我看不到它在哪里会发生。至少,我没有在 Starlette 的 StreamingResponse类中看到这一点(它似乎与“内容”无关)。执行后生成器不会“释放”吗async for in gen?
查看完整描述

1 回答

?
牧羊人nacy

TA贡献1862条经验 获得超7个赞

下面的代码展示了如何在协程上观察取消(在我的例子中,是一个异步生成器)。如评论中所述,如果取消异步生成器,它会向生成器注入异常,从那时起,任何试图获取生成器中的下一项的尝试都会引发异常StopAsyncIteration。请参阅PEP 525。要确定异步生成器是否被取消,只需尝试/排除异常asyncio.CancelledError(派生自BaseException)。

这里还有一些代码用于展示如何处理普通生成器,这些生成器更宽容一些。如果您保持相同的 try/except 流程,那么在它们GeneratorExit被取消时会引发异常。

棘手的部分是这些异常中的大多数都派生自类BaseException,这与StopIteration我期望的派生Exception自类的异常不同。

而且,顺便说一句,实际取消发生在starlette中。

import asyncio

import time


import uvicorn

from fastapi import FastAPI

from fastapi.responses import StreamingResponse


app = FastAPI()



def infinite_generator():

    # not blocking, so doesn't need to be async

    # but if it was blocking, you could make this async and await it

    while True:

        yield b"some fake data "



def finite_generator():

    # not blocking, so doesn't need to be async

    # but if it was blocking, you could make this async and await it

    x = 0

    while x < 10000:

        yield f"{x}"

        x += 1



async def astreamer(generator):

    try:

        # if it was an async generator we'd do:

        # "async for data in generator:"

        # (there is no yield from async_generator)

        for i in generator:

            yield i

            await asyncio.sleep(.001)


    except asyncio.CancelledError as e:

        print('cancelled')



def streamer(generator):

    try:

        # note: normally we would do "yield from generator"

        # but that won't work with next(generator) in the finally statement

        for i in generator:

            yield i

            time.sleep(.001)


    except GeneratorExit:

        print("cancelled")

    finally:

        # showing that we can check here to see if all data was consumed

        # the except statement above effectively does the same thing

        try:

            next(generator)

            print("we didn't finish")

            return

        except StopIteration:

            print("we finished")



@app.get("/infinite")

async def infinite_stream():

    return StreamingResponse(streamer(infinite_generator()))



@app.get("/finite")

async def finite_stream():

    return StreamingResponse(streamer(finite_generator()))



@app.get("/ainfinite")

async def infinite_stream():

    return StreamingResponse(astreamer(infinite_generator()))



@app.get("/afinite")

async def finite_stream():

    return StreamingResponse(astreamer(finite_generator()))



if __name__ == "__main__":

    uvicorn.run(app, host="0.0.0.0", port=8000)


查看完整回答
反对 回复 2023-03-16
  • 1 回答
  • 0 关注
  • 84 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信