使用下面的代碼,我嘗試使用 asyncio 啟動 2 個無限循環:async def do_job_1(): while True : print('do_job_1') await asyncio.sleep(5)async def do_job_2(): while True : print('do_job_2') await asyncio.sleep(5)if __name__ == '__main__': asyncio.run(do_job_1()) asyncio.run(do_job_2())do_job_1塊do_job_2,因為do_job_2從不打印 do_job_1。我犯了什么錯誤?最終我試圖轉換kafka消費者代碼:from confluent_kafka import Consumer, KafkaErrorsettings = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'mygroup', 'client.id': 'client-1', 'enable.auto.commit': True, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}}c = Consumer(settings)c.subscribe(['mytopic'])try: while True: msg = c.poll(0.1) if msg is None: continue elif not msg.error(): print('Received message: {0}'.format(msg.value())) elif msg.error().code() == KafkaError._PARTITION_EOF: print('End of partition reached {0}/{1}' .format(msg.topic(), msg.partition())) else: print('Error occured: {0}'.format(msg.error().str()))except KeyboardInterrupt: passfinally: c.close()摘自https://www.confluence.io/blog/introduction-to-apache-kafka-for-python-programmers是并發的,這樣我就可以并行處理 Kafka 消息。
1 回答

回首憶惘然
TA貢獻1847條經驗 獲得超11個贊
來自help(asyncio.run):
它應該用作 asyncio 程序的主要入口點,并且最好只調用一次。
但您可以使用asyncio.gather加入任務:
import asyncio
async def do_job_1():
while True :
print('do_job_1')
await asyncio.sleep(5)
async def do_job_2():
while True :
print('do_job_2')
await asyncio.sleep(5)
async def main():
await asyncio.gather(do_job_1(), do_job_2())
if __name__ == '__main__':
asyncio.run(main())
添加回答
舉報
0/150
提交
取消