3 回答

TA貢獻1818條經驗 獲得超3個贊
通常你應該只做collect_data異步,并在整個過程中使用異步代碼——這就是 asyncio 的設計用途。但如果由于某種原因這不可行,您可以通過應用一些膠水代碼手動迭代異步迭代器:
def iter_over_async(ait, loop):
ait = ait.__aiter__()
async def get_next():
try:
obj = await ait.__anext__()
return False, obj
except StopAsyncIteration:
return True, None
while True:
done, obj = loop.run_until_complete(get_next())
if done:
break
yield obj
__anext__上面的工作方式是通過提供一個異步閉包,它使用魔術方法從異步迭代器中不斷檢索值,并在對象到達時返回它們。run_until_complete()此異步閉包在普通同步生成器內的循環中調用。(閉包實際上返回一對完成指示符和實際對象,以避免StopAsyncIteration通過傳播run_until_complete,這可能不受支持。)
有了這個,您可以制作execute_tasks一個異步生成器 ( async defwith yield) 并使用以下方法對其進行迭代:
for chunk in iter_over_async(execute_tasks(urls), loop):
...
請注意,此方法與 不兼容asyncio.run,并且可能會在以后導致問題。

TA貢獻1848條經驗 獲得超2個贊
有一個很好的庫可以執行此操作(以及更多?。Q為pypeln:
import pypeln as pl
import asyncio
from random import random
async def slow_add1(x):
? ? await asyncio.sleep(random()) # <= some slow computation
? ? return x + 1
async def slow_gt3(x):
? ? await asyncio.sleep(random()) # <= some slow computation
? ? return x > 3
data = range(10) # [0, 1, 2, ..., 9]?
stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

TA貢獻1890條經驗 獲得超9個贊
解決方案以使用asyncio.run_coroutine_threadsafe而不是loop.run_until_complete.
import asyncio
from typing import Any, AsyncGenerator
def _iter_over_async(loop: asyncio.AbstractEventLoop, async_generator: AsyncGenerator):
? ? ait = async_generator.__aiter__()
? ? async def get_next() -> tuple[bool, Any]:
? ? ? ? try:
? ? ? ? ? ? obj = await ait.__anext__()
? ? ? ? ? ? done = False
? ? ? ? except StopAsyncIteration:
? ? ? ? ? ? obj = None
? ? ? ? ? ? done = True
? ? ? ? return done, obj
? ? while True:
? ? ? ? done, obj = asyncio.run_coroutine_threadsafe(get_next(), loop).result()
? ? ? ? if done:
? ? ? ? ? ? break
? ? ? ? yield obj
我還想補充一點,我發現像這樣的工具在將同步代碼分段轉換為異步代碼的過程中非常有用。
添加回答
舉報