亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在調用 `async for in` 后獲取異步生成器的下一次迭代

如何在調用 `async for in` 后獲取異步生成器的下一次迭代

斯蒂芬大帝 2023-03-16 16:16:43
使用 FastAPI 我試圖檢測StreamingResponse是否已完全被客戶端使用或是否已被取消。我有以下示例應用程序:import asyncioimport uvicornfrom fastapi import FastAPIfrom fastapi.responses import StreamingResponseapp = FastAPI()async def ainfinite_generator():    while True:        yield b"some fake data "        await asyncio.sleep(.001)async def astreamer(generator):    try:        async for data in generator:            yield data    except Exception as e:        # this isn't triggered by a cancelled request        print(e)    finally:        # this always throws a StopAsyncIteration exception        # no matter whether the generator was consumed or not        leftover = await generator.__anext__()        if leftover:            print("we didn't finish")        else:            print("we finished")@app.get("/")async def infinite_stream():    return StreamingResponse(astreamer(ainfinite_generator()))if __name__ == "__main__":    uvicorn.run(app, host="0.0.0.0", port=8000)它似乎是第一個async for in generator“astreamer消耗”異步生成器的。在該循環之后,進一步嘗試獲得下一次迭代失敗并出現異常StopAsyncIteration,即使生成器如上定義的那樣是“無限的”。我查看了PEP-525,我唯一看到的是,如果將異常拋入生成器,它將導致進一步嘗試從生成器讀取以拋出 StopAsyncIteration 異常,但我看不到它在哪里會發生。至少,我沒有在 Starlette 的 StreamingResponse類中看到這一點(它似乎與“內容”無關)。執行后生成器不會“釋放”嗎async for in gen?
查看完整描述

1 回答

?
牧羊人nacy

TA貢獻1862條經驗 獲得超7個贊

下面的代碼展示了如何在協程上觀察取消(在我的例子中,是一個異步生成器)。如評論中所述,如果取消異步生成器,它會向生成器注入異常,從那時起,任何試圖獲取生成器中的下一項的嘗試都會引發異常StopAsyncIteration。請參閱PEP 525。要確定異步生成器是否被取消,只需嘗試/排除異常asyncio.CancelledError(派生自BaseException)。

這里還有一些代碼用于展示如何處理普通生成器,這些生成器更寬容一些。如果您保持相同的 try/except 流程,那么在它們GeneratorExit被取消時會引發異常。

棘手的部分是這些異常中的大多數都派生自類BaseException,這與StopIteration我期望的派生Exception自類的異常不同。

而且,順便說一句,實際取消發生在starlette中。

import asyncio

import time


import uvicorn

from fastapi import FastAPI

from fastapi.responses import StreamingResponse


app = FastAPI()



def infinite_generator():

    # not blocking, so doesn't need to be async

    # but if it was blocking, you could make this async and await it

    while True:

        yield b"some fake data "



def finite_generator():

    # not blocking, so doesn't need to be async

    # but if it was blocking, you could make this async and await it

    x = 0

    while x < 10000:

        yield f"{x}"

        x += 1



async def astreamer(generator):

    try:

        # if it was an async generator we'd do:

        # "async for data in generator:"

        # (there is no yield from async_generator)

        for i in generator:

            yield i

            await asyncio.sleep(.001)


    except asyncio.CancelledError as e:

        print('cancelled')



def streamer(generator):

    try:

        # note: normally we would do "yield from generator"

        # but that won't work with next(generator) in the finally statement

        for i in generator:

            yield i

            time.sleep(.001)


    except GeneratorExit:

        print("cancelled")

    finally:

        # showing that we can check here to see if all data was consumed

        # the except statement above effectively does the same thing

        try:

            next(generator)

            print("we didn't finish")

            return

        except StopIteration:

            print("we finished")



@app.get("/infinite")

async def infinite_stream():

    return StreamingResponse(streamer(infinite_generator()))



@app.get("/finite")

async def finite_stream():

    return StreamingResponse(streamer(finite_generator()))



@app.get("/ainfinite")

async def infinite_stream():

    return StreamingResponse(astreamer(infinite_generator()))



@app.get("/afinite")

async def finite_stream():

    return StreamingResponse(astreamer(finite_generator()))



if __name__ == "__main__":

    uvicorn.run(app, host="0.0.0.0", port=8000)


查看完整回答
反對 回復 2023-03-16
  • 1 回答
  • 0 關注
  • 214 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號