2 回答
TA贡献1802条经验 获得超10个赞
我遇到了这个答案,并查看了 aiostream 库。这是我想出的用于合并多个异步生成器的代码。它不使用任何库。
async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
pending = gens.copy()
pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
while len(pending_tasks) > 0:
done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
for d in done:
try:
result = d.result()
yield result
dg = pending_tasks[d]
pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
except StopAsyncIteration as sai:
print("Exception in getting result", sai)
finally:
del pending_tasks[d]
希望这对您有所帮助,如果有任何错误,请告诉我。
添加回答
举报
