亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

從事件循環返回異步生成器數據可能嗎?

從事件循環返回異步生成器數據可能嗎?

智慧大石 2023-06-20 16:07:56
我想使用httpx從協程中同時讀取多個 HTTP 流請求,并將數據返回給我運行事件循環的非異步函數,而不是僅僅返回最終數據。但是,如果我讓我的異步函數產生而不是返回,我會收到抱怨并asyncio.as_completed()期望loop.run_until_complete()協程或 Future,而不是異步生成器。所以我能讓它工作的唯一方法是收集每個協程內的所有流數據,并在請求完成后返回所有數據。然后收集所有協程結果,最后將其返回給非異步調用函數。這意味著我必須將所有內容保存在內存中,并等到最慢的請求完成后才能獲取所有數據,這破壞了流式傳輸 http 請求的全部意義。有什么辦法可以完成這樣的事情嗎?我當前的愚蠢實現如下所示:def collect_data(urls):? ? """Non-async function wishing it was a non-async generator"""? ? async def stream(async_client, url, payload):? ? ? ? data = []? ? ? ? async with async_client.stream("GET", url=url) as ar:? ? ? ? ? ? ar.raise_for_status()? ? ? ? ? ? async for line in ar.aiter_lines():? ? ? ? ? ? ? ? data.append(line)? ? ? ? ? ? ? ? # would like to yield each line here? ? ? ? return data? ? async def execute_tasks(urls):? ? ? ? all_data = []? ? ? ? async with httpx.AsyncClient() as async_client:? ? ? ? ? ? tasks = [stream(async_client, url) for url in urls]? ? ? ? ? ? for coroutine in asyncio.as_completed(tasks):? ? ? ? ? ? ? ? all_data += await coroutine? ? ? ? ? ? ? ? # would like to iterate and yield each line here? ? ? ? return all_events? ? try:? ? ? ? loop = asyncio.get_event_loop()? ? ? ? data = loop.run_until_complete(execute_tasks(urls=urls))? ? ? ? return data? ? ? ? # would like to iterate and yield the data here as it becomes available? ? finally:? ? ? ? loop.close()我也嘗試了一些使用asyncio.Queue和trio內存通道的解決方案,但由于我只能從異步范圍內的那些中讀取,所以它并沒有讓我更接近解決方案。我想從非異步生成器中使用它的原因是我想從使用 Django Rest Framework 流式 API 的 Django 應用程序中使用它。
查看完整描述

3 回答

?
泛舟湖上清波郎朗

TA貢獻1818條經驗 獲得超3個贊

通常你應該只做collect_data異步,并在整個過程中使用異步代碼——這就是 asyncio 的設計用途。但如果由于某種原因這不可行,您可以通過應用一些膠水代碼手動迭代異步迭代器:


def iter_over_async(ait, loop):

    ait = ait.__aiter__()

    async def get_next():

        try:

            obj = await ait.__anext__()

            return False, obj

        except StopAsyncIteration:

            return True, None

    while True:

        done, obj = loop.run_until_complete(get_next())

        if done:

            break

        yield obj

__anext__上面的工作方式是通過提供一個異步閉包,它使用魔術方法從異步迭代器中不斷檢索值,并在對象到達時返回它們。run_until_complete()此異步閉包在普通同步生成器內的循環中調用。(閉包實際上返回一對完成指示符和實際對象,以避免StopAsyncIteration通過傳播run_until_complete,這可能不受支持。)


有了這個,您可以制作execute_tasks一個異步生成器 ( async defwith yield) 并使用以下方法對其進行迭代:


for chunk in iter_over_async(execute_tasks(urls), loop):

    ...

請注意,此方法與 不兼容asyncio.run,并且可能會在以后導致問題。


查看完整回答
反對 回復 2023-06-20
?
慕尼黑5688855

TA貢獻1848條經驗 獲得超2個贊

有一個很好的庫可以執行此操作(以及更多?。Q為pypeln:

import pypeln as pl

import asyncio

from random import random


async def slow_add1(x):

? ? await asyncio.sleep(random()) # <= some slow computation

? ? return x + 1


async def slow_gt3(x):

? ? await asyncio.sleep(random()) # <= some slow computation

? ? return x > 3


data = range(10) # [0, 1, 2, ..., 9]?


stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)

stage = pl.task.filter(slow_gt3, stage, workers=2)


data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]


查看完整回答
反對 回復 2023-06-20
?
當年話下

TA貢獻1890條經驗 獲得超9個贊

解決方案以使用asyncio.run_coroutine_threadsafe而不是loop.run_until_complete.


import asyncio

from typing import Any, AsyncGenerator


def _iter_over_async(loop: asyncio.AbstractEventLoop, async_generator: AsyncGenerator):

? ? ait = async_generator.__aiter__()


? ? async def get_next() -> tuple[bool, Any]:

? ? ? ? try:

? ? ? ? ? ? obj = await ait.__anext__()

? ? ? ? ? ? done = False


? ? ? ? except StopAsyncIteration:

? ? ? ? ? ? obj = None

? ? ? ? ? ? done = True


? ? ? ? return done, obj


? ? while True:

? ? ? ? done, obj = asyncio.run_coroutine_threadsafe(get_next(), loop).result()


? ? ? ? if done:

? ? ? ? ? ? break


? ? ? ? yield obj

我還想補充一點,我發現像這樣的工具在將同步代碼分段轉換為異步代碼的過程中非常有用。


查看完整回答
反對 回復 2023-06-20
  • 3 回答
  • 0 關注
  • 192 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號