1 回答

TA貢獻1862條經驗 獲得超7個贊
下面的代碼展示了如何在協程上觀察取消(在我的例子中,是一個異步生成器)。如評論中所述,如果取消異步生成器,它會向生成器注入異常,從那時起,任何試圖獲取生成器中的下一項的嘗試都會引發異常StopAsyncIteration
。請參閱PEP 525。要確定異步生成器是否被取消,只需嘗試/排除異常asyncio.CancelledError
(派生自BaseException
)。
這里還有一些代碼用于展示如何處理普通生成器,這些生成器更寬容一些。如果您保持相同的 try/except 流程,那么在它們GeneratorExit
被取消時會引發異常。
棘手的部分是這些異常中的大多數都派生自類BaseException
,這與StopIteration
我期望的派生Exception
自類的異常不同。
而且,順便說一句,實際取消發生在starlette中。
import asyncio
import time
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
def infinite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
while True:
yield b"some fake data "
def finite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
x = 0
while x < 10000:
yield f"{x}"
x += 1
async def astreamer(generator):
try:
# if it was an async generator we'd do:
# "async for data in generator:"
# (there is no yield from async_generator)
for i in generator:
yield i
await asyncio.sleep(.001)
except asyncio.CancelledError as e:
print('cancelled')
def streamer(generator):
try:
# note: normally we would do "yield from generator"
# but that won't work with next(generator) in the finally statement
for i in generator:
yield i
time.sleep(.001)
except GeneratorExit:
print("cancelled")
finally:
# showing that we can check here to see if all data was consumed
# the except statement above effectively does the same thing
try:
next(generator)
print("we didn't finish")
return
except StopIteration:
print("we finished")
@app.get("/infinite")
async def infinite_stream():
return StreamingResponse(streamer(infinite_generator()))
@app.get("/finite")
async def finite_stream():
return StreamingResponse(streamer(finite_generator()))
@app.get("/ainfinite")
async def infinite_stream():
return StreamingResponse(astreamer(infinite_generator()))
@app.get("/afinite")
async def finite_stream():
return StreamingResponse(astreamer(finite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
添加回答
舉報