我正在嘗試找到一種方法來從 Celery 隊列中刪除所有當前排隊的具有特定給定名稱的任務。從官方文件中,我知道我可以通過查找工人的姓名然后獲取他們的 ID 來檢查工人并撤銷任務,例如:def drop_celery_task(options): def _get_tasks_id(workers: list, tasks_ids: list, task_name: str): """ Get task ids with the given name included inside the given `workers` tasks. {'worker1.example.com': [ {'name': 'tasks.sleeptask', 'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf', 'args': '(8,)', 'kwargs': '{}'}] } """ for worker in workers: if not workers[worker]: continue for _task in workers[worker]: if _task["name"].split(".")[-1] == task_name: tasks_ids.append(_task["id"]) task_name = options.drop_celery_task["name"] i = Inspect(app=celery_app) # Inspect all nodes. registered = i.registered() if not registered: raise Exception("No registered tasks found") if not any(task_name == worker.split(".")[-1] for worker in chain(*list(registered.values()))): raise Exception(f"Task not registered: {task_name}") tasks_ids = [] _get_tasks_id(i.active(), tasks_ids, task_name) _get_tasks_id(i.scheduled(), tasks_ids, task_name) _get_tasks_id(i.reserved(), tasks_ids, task_name) if tasks_ids: for task_id in tasks_ids: Control(app=celery_app).revoke(task_id) else: logging.info(f"No active/scheduled/registered task found with the name {task_name}")但此代碼僅撤銷 celery 工作人員獲取或預取的任務,而不是仍在隊列中的任務(使用 Redis 作為后端)。關于如何使用 celery 命令刪除 Redis 中的任務,或阻止工作人員接受具有給定名稱的任務,有什么建議嗎?
1 回答

長風秋雁
TA貢獻1757條經驗 獲得超7個贊
我最終在 Redis 中使用我想要的名稱標識了任務的 ID(使用 Redis 客戶端,而不是 celery 命令),然后通過命令撤銷這些 ID Control(app=celery_app).revoke(task_id)
。在 Redis 中,隊列是帶有隊列名稱的鍵下的列表對象。
添加回答
舉報
0/150
提交
取消