1 回答

TA貢獻1779條經驗 獲得超6個贊
如果您嘗試借助input(key_press示例中的函數)模擬USB數據流,則必須使用multithreading模塊,因為它input是一個阻塞函數,它將停止asyncio線程中的循環工作。要asyncio與input函數結合,您必須使用另一個線程,請查找以下示例:
import asyncio
import threading
async def do_work(i):
"""take input key and do some work with the input"""
await asyncio.sleep(5)
print(i)
async def main_thread_loop_work(_key):
"""simulate multiple tasks caused by key input"""
t1 = asyncio.create_task(do_work(_key))
t2 = asyncio.create_task(do_work(_key))
t3 = asyncio.create_task(do_work(_key))
await t1
await t2
await t3
def thead_worker(_key):
"""target function for another thread"""
asyncio.run(main_thread_loop_work(_key))
if __name__ == '__main__':
while True:
some_key = input("Please provide any key: ")
th = threading.Thread(target=thead_worker, args=(some_key, ))
th.start()
添加回答
舉報