亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

帶有 flask 或 aiohttp 服務器的 asyncio 應用程序

帶有 flask 或 aiohttp 服務器的 asyncio 應用程序

ITMISS 2022-12-20 11:00:31
我改進了去年的應用程序。一開始有兩個不同的 python 應用程序——第一個用于統計數據,第二個——帶有 GET 請求的網絡服務器 gunicorn+flask。(這兩種服務都在 centos 中)Statistics 進行計數并將所有內容存儲在 Postgres 中。Web 服務器連接到該 Postgres 數據庫并響應 GET 請求。在重寫的版本中,我使用 pandas 框架進行了所有統計,現在我想將這兩個應用程序合并為一個。我使用 asyncio 來獲取數據和計數統計。一切正常,現在我要添加 Web 服務器以響應 GET。部分代碼:import asynciofrom contextlib import closingimport db_cl, tks_dbfrom formdf_cl import FormatDFgetinfofromtks = tks_db.TKS() # Class object to connect to third party databaseformatdf = FormatDF() # counting class object, that stores some datadbb = db_cl.MyDatabase('mydb.ini') # Class object to connect to my databaseasync def get_some_data():    # getting information from third party database every 5 seconds.    await asyncio.sleep(5)    ans_inc, ans_out = getinfofromtks.getdf()    return ans_inc, ans_out # two huge dataframes in pandasasync def process(ans_inc, ans_out):    # counting data on CPU    await asyncio.sleep(0)    formatdf.leftjoin(ans_inc, ans_out)    # storing statistics in my Database    dbb.query_database('INSERT INTO statistic (timestamp, outgoing, incoming, stats) values (%s, %s,%s, %s)',                       formatdf.make_count())    dbb.commit_query()async def main():    while True:        ans_inc, ans_out = await get_some_data()  # blocking, get data from third party database        asyncio.ensure_future(process(ans_inc, ans_out))  # computingif __name__ == "__main__":    with closing(asyncio.get_event_loop()) as event_loop:        event_loop.run_until_complete(main())現在我希望將 http 服務器添加為線程應用程序(使用 flask 或 aiohttp),它將使用類對象“formatdf”中的參數響應 GET 請求。包含這些功能的最佳方式是什么?
查看完整描述

2 回答

?
胡說叔叔

TA貢獻1804條經驗 獲得超8個贊

我設法添加了一個 http 服務器作為協程。首先我嘗試使用 aiohttp,但最終我找到了 Quart(與 Flask 相同,但它使用 Asyncio)。在 Quart 上運行 http 服務器的示例代碼:


import quart

from quart import request

import json

import time


app = quart.Quart(__name__)


def resp(code, data):

    return quart.Response(

        status=code,

        mimetype="application/json",

        response=to_json(data)

    )


def to_json(data):

    return json.dumps(data) + "\n"


@app.route('/')

def root():

    return quart.redirect('/api/status2')



@app.errorhandler(400)

def page_not_found(e):

    return resp(400, {})



@app.errorhandler(404)

def page_not_found(e):

    return resp(400, {})



@app.errorhandler(405)

def page_not_found(e):

    return resp(405, {})



@app.route('/api/status2', methods=['GET'])

def get_status():

    timestamp = request.args.get("timestamp")

    delay = request.args.get("delay")

    if timestamp:

        return resp(200, {"time": time.time()})

    elif delay:

        return resp(200, {"test is:": '1'})

    else:

        return resp(200, {"", "ask me about time"})



if __name__ == "__main__":

    app.run(debug=True, host='0.0.0.0', port=5000)

為了將此代碼添加為協同程序,我使用await asyncio.gather()并使用了 app.run_task 而不是 app.run。像這樣更改了我的問題中的代碼:


async def launcher_main():

    while True:

        ans_inc, ans_out = await get_some_data()

        asyncio.ensure_future(process(ans_inc, ans_out))



async def main():

    await asyncio.gather(launcher_main(),

                         restapi_quart.app.run_task(debug=True, host='0.0.0.0', port=5000))

剩下的最后一個問題是使“formatdf”類對象的可用參數到我的 http 服務器。我已經實現了Tests.restapi_quart.app.config["formatdf"] = formatdf向 process(...) 函數添加行。從 quart 調用它:


elif button:

    return resp(200, {"ok": app.config["formatdf"].attr})


查看完整回答
反對 回復 2022-12-20
?
精慕HU

TA貢獻1845條經驗 獲得超8個贊

我只需要為我的應用程序解決這個問題。這是在現有異步應用程序中運行 aiohttp 服務器的方法。


https://docs.aiohttp.org/en/stable/web_lowlevel.html


import asyncio

from aiohttp import web


async def web_server():

    print(f'Configuring Web Server')

    app = web.Application()

    app.add_routes([web.get('/hello', web_hello)])

    runner = web.AppRunner(app)

    await runner.setup()

    site = web.TCPSite(runner)    

    print(f'Starting Web Server')

    await site.start()

    print(f'Web Server Started')


    #run forever and ever

    await asyncio.sleep(100*3600)


async def web_hello(request):

    return web.Response(text="Hello World")


async def main():

    tasks = []


    #Web Server

    tasks.append(asyncio.create_task(web_server()))


    results = await asyncio.gather(*tasks)


if __name__ == '__main__':

    asyncio.run(main())


查看完整回答
反對 回復 2022-12-20
  • 2 回答
  • 0 關注
  • 190 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號