1 回答
TA贡献1862条经验 获得超7个赞
下面的代码展示了如何在协程上观察取消(在我的例子中,是一个异步生成器)。如评论中所述,如果取消异步生成器,它会向生成器注入异常,从那时起,任何试图获取生成器中的下一项的尝试都会引发异常StopAsyncIteration
。请参阅PEP 525。要确定异步生成器是否被取消,只需尝试/排除异常asyncio.CancelledError
(派生自BaseException
)。
这里还有一些代码用于展示如何处理普通生成器,这些生成器更宽容一些。如果您保持相同的 try/except 流程,那么在它们GeneratorExit
被取消时会引发异常。
棘手的部分是这些异常中的大多数都派生自类BaseException
,这与StopIteration
我期望的派生Exception
自类的异常不同。
而且,顺便说一句,实际取消发生在starlette中。
import asyncio
import time
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
def infinite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
while True:
yield b"some fake data "
def finite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
x = 0
while x < 10000:
yield f"{x}"
x += 1
async def astreamer(generator):
try:
# if it was an async generator we'd do:
# "async for data in generator:"
# (there is no yield from async_generator)
for i in generator:
yield i
await asyncio.sleep(.001)
except asyncio.CancelledError as e:
print('cancelled')
def streamer(generator):
try:
# note: normally we would do "yield from generator"
# but that won't work with next(generator) in the finally statement
for i in generator:
yield i
time.sleep(.001)
except GeneratorExit:
print("cancelled")
finally:
# showing that we can check here to see if all data was consumed
# the except statement above effectively does the same thing
try:
next(generator)
print("we didn't finish")
return
except StopIteration:
print("we finished")
@app.get("/infinite")
async def infinite_stream():
return StreamingResponse(streamer(infinite_generator()))
@app.get("/finite")
async def finite_stream():
return StreamingResponse(streamer(finite_generator()))
@app.get("/ainfinite")
async def infinite_stream():
return StreamingResponse(astreamer(infinite_generator()))
@app.get("/afinite")
async def finite_stream():
return StreamingResponse(astreamer(finite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
添加回答
举报