亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

將 python asyncio 與 pyee 事件發射器相結合

將 python asyncio 與 pyee 事件發射器相結合

慕桂英3389331 2023-03-01 15:46:14
我正在嘗試使用pyee 庫AsyncIOEventEmitter中的,但沒有成功。由于某種原因,發出的事件“Hi”永遠不會到達完成 asyncio 的未來。我也沒有在網上找到合適的例子。此外,我嘗試提供當前事件并為 ,使用新的事件循環,但兩者產生相同的結果。async_handlerAsyncIOEventEmitter有人可以幫我嗎?下面的示例單元測試:import asyncioimport loggingimport pytestfrom pyee import AsyncIOEventEmitterLOG = logging.getLogger(__name__)@pytest.mark.asyncioasync def test_setup(event_loop):    LOG.info("1 - start")    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())    # Create a new Future object.    future_result = event_loop.create_future()    LOG.info("2 - emit event")    event_emitter.emit("event", "Hi")    @event_emitter.on("event")    async def async_handler(message):        LOG.info(">>> %s", message)        future_result.set_result(message)        return future_result    # Wait until *future_result* has a result and print it.    LOG.info(await future_result)
查看完整描述

1 回答

?
胡說叔叔

TA貢獻1804條經驗 獲得超8個贊

好的,明白了,該async_handler方法必須在測試的早期定義...


這現在有效:


"""Event emitter playground"""

import asyncio

import logging

import pytest

from pyee import AsyncIOEventEmitter


LOG = logging.getLogger(__name__)



@pytest.mark.asyncio

async def test_setup(event_loop):

    """Receive event from emitter and complete future!"""

    LOG.info("1 - start")

    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())


    @event_emitter.on("event")

    def async_handler(message):

        LOG.info(">>> %s", message)

        future_result.set_result(message)


    future_result = event_loop.create_future()

    LOG.info("2 - emit event")

    event_emitter.emit("event", "Hi")


    LOG.info(await future_result)


查看完整回答
反對 回復 2023-03-01
  • 1 回答
  • 0 關注
  • 509 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號