亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Python - 在 asyncio 中取消任務?

Python - 在 asyncio 中取消任務?

慕無忌1623718 2023-06-20 16:51:11
我在下面為異步池編寫了代碼。在__aexit__任務完成后我取消了 _worker 任務。但是當我運行代碼時,工作任務不會被取消并且代碼會永遠運行。這就是任務的樣子:<Task pending coro=<AsyncPool._worker() running at \async_pool.py:17> wait_for=<Future cancelled>>。正在asyncio.wait_for取消但不是工作任務。class AsyncPool:    def __init__(self,coroutine,no_of_workers,timeout):        self._loop           = asyncio.get_event_loop()        self._queue          = asyncio.Queue()        self._no_of_workers  = no_of_workers        self._coroutine      = coroutine        self._timeout        = timeout        self._workers        = None    async def _worker(self):         while True:            try:                ret = False                queue_item           = await self._queue.get()                ret = True                result               = await asyncio.wait_for(self._coroutine(queue_item), timeout = self._timeout,loop= self._loop)            except Exception as e:                print(e)            finally:                if ret:                    self._queue.task_done()    async def push_to_queue(self,item):        self._queue.put_nowait(item)        async def __aenter__(self):        assert self._workers == None        self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]        return self        async def __aexit__(self,type,value,traceback):        await self._queue.join()        for worker in self._workers:            worker.cancel()        await asyncio.gather(*self._workers, loop=self._loop, return_exceptions =True)要使用異步池:async def something(item):    print("got", item)    await asyncio.sleep(item) async def main():    async with AsyncPool(something, 5, 2) as pool:        for i in range(10):            await pool.push_to_queue(i) asyncio.run(main())我終端的輸出:
查看完整描述

3 回答

?
侃侃爾雅

TA貢獻1801條經驗 獲得超16個贊

問題是您的except Exception例外條款也會捕獲取消并忽略它。更令人困惑的是,print(e)在 a 的情況下只打印一個空行CancelledError,這是輸出中空行的來源。(將其更改為print(type(e))顯示發生了什么。)

要更正此問題,請更改except Exception為更具體的內容,例如except asyncio.TimeoutError. Python 3.8 中不需要此更改,它asyncio.CancelledError不再派生自Exception,而是派生自BaseException,因此except Exception不會捕獲它。


查看完整回答
反對 回復 2023-06-20
?
慕慕森

TA貢獻1856條經驗 獲得超17個贊

這似乎有效。這event是一個計數計時器,當它到期時它的cancels任務。


import asyncio

from datetime import datetime as dt

from datetime import timedelta as td

import random

import time


class Program:

    def __init__(self):

        self.duration_in_seconds = 20

        self.program_start = dt.now()

        self.event_has_expired = False

        self.canceled_success = False


        


    async def on_start(self):

        print("On Start Event Start! Applying Overrides!!!")

        await asyncio.sleep(random.randint(3, 9))



    async def on_end(self):

        print("On End Releasing All Overrides!")

        await asyncio.sleep(random.randint(3, 9))

        


    async def get_sensor_readings(self):

        print("getting sensor readings!!!")

        await asyncio.sleep(random.randint(3, 9))   


 

    async def evauluate_data(self):

        print("checking data!!!")

        await asyncio.sleep(random.randint(3, 9))   



    async def check_time(self):

        if (dt.now() - self.program_start > td(seconds = self.duration_in_seconds)):

            self.event_has_expired = True

            print("Event is DONE!!!")


        else:

            print("Event is not done! ",dt.now() - self.program_start)




    async def main(self):

        # script starts, do only once self.on_start()

        await self.on_start()

        print("On Start Done!")


        while not self.canceled_success:


            readings = asyncio.ensure_future(self.get_sensor_readings())

            analysis = asyncio.ensure_future(self.evauluate_data())

            checker = asyncio.ensure_future(self.check_time())

            

            if not self.event_has_expired:

                await readings   

                await analysis           

                await checker

                

            else:

                # close other tasks before final shutdown

                readings.cancel()

                analysis.cancel()

                checker.cancel()

                self.canceled_success = True

                print("cancelled hit!")



        # script ends, do only once self.on_end() when even is done

        await self.on_end()

        print('Done Deal!')



async def main():

    program = Program()

    await program.main()



查看完整回答
反對 回復 2023-06-20
?
紅顏莎娜

TA貢獻1842條經驗 獲得超13個贊

當您asyncio創建然后取消任務時,您仍然有需要“回收”的任務。所以你想要await worker它。然而,一旦你await取消了這樣的任務,因為它永遠不會給你返回預期的返回值,就會asyncio.CancelledError被提高,你需要在某個地方抓住它。


由于這種行為,我認為您不應該為每個取消的任務gather執行它們await,因為它們應該立即返回:


async def __aexit__(self,type,value,traceback):

    await self._queue.join()


    for worker in self._workers:

        worker.cancel()

    for worker in self._workers:

        try:

           await worker

        except asyncio.CancelledError:

           print("worker cancelled:", worker)


查看完整回答
反對 回復 2023-06-20
  • 3 回答
  • 0 關注
  • 381 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號