2 回答

TA貢獻1804條經驗 獲得超8個贊
我設法添加了一個 http 服務器作為協程。首先我嘗試使用 aiohttp,但最終我找到了 Quart(與 Flask 相同,但它使用 Asyncio)。在 Quart 上運行 http 服務器的示例代碼:
import quart
from quart import request
import json
import time
app = quart.Quart(__name__)
def resp(code, data):
return quart.Response(
status=code,
mimetype="application/json",
response=to_json(data)
)
def to_json(data):
return json.dumps(data) + "\n"
@app.route('/')
def root():
return quart.redirect('/api/status2')
@app.errorhandler(400)
def page_not_found(e):
return resp(400, {})
@app.errorhandler(404)
def page_not_found(e):
return resp(400, {})
@app.errorhandler(405)
def page_not_found(e):
return resp(405, {})
@app.route('/api/status2', methods=['GET'])
def get_status():
timestamp = request.args.get("timestamp")
delay = request.args.get("delay")
if timestamp:
return resp(200, {"time": time.time()})
elif delay:
return resp(200, {"test is:": '1'})
else:
return resp(200, {"", "ask me about time"})
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=5000)
為了將此代碼添加為協同程序,我使用await asyncio.gather()并使用了 app.run_task 而不是 app.run。像這樣更改了我的問題中的代碼:
async def launcher_main():
while True:
ans_inc, ans_out = await get_some_data()
asyncio.ensure_future(process(ans_inc, ans_out))
async def main():
await asyncio.gather(launcher_main(),
restapi_quart.app.run_task(debug=True, host='0.0.0.0', port=5000))
剩下的最后一個問題是使“formatdf”類對象的可用參數到我的 http 服務器。我已經實現了Tests.restapi_quart.app.config["formatdf"] = formatdf向 process(...) 函數添加行。從 quart 調用它:
elif button:
return resp(200, {"ok": app.config["formatdf"].attr})

TA貢獻1845條經驗 獲得超8個贊
我只需要為我的應用程序解決這個問題。這是在現有異步應用程序中運行 aiohttp 服務器的方法。
https://docs.aiohttp.org/en/stable/web_lowlevel.html
import asyncio
from aiohttp import web
async def web_server():
print(f'Configuring Web Server')
app = web.Application()
app.add_routes([web.get('/hello', web_hello)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner)
print(f'Starting Web Server')
await site.start()
print(f'Web Server Started')
#run forever and ever
await asyncio.sleep(100*3600)
async def web_hello(request):
return web.Response(text="Hello World")
async def main():
tasks = []
#Web Server
tasks.append(asyncio.create_task(web_server()))
results = await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
添加回答
舉報