亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何讓 Flask 網頁(路由)在另一個網頁(路由)上后臺運行

如何讓 Flask 網頁(路由)在另一個網頁(路由)上后臺運行

夢里花落0921 2024-01-15 17:20:02
所以我正在創建這個應用程序,它的一部分是一個網頁,其中交易算法正在使用實時數據進行自我測試。所有這些都有效,但問題是如果我離開(退出)網頁,它就會停止。我想知道如何讓它無限期地在后臺運行,因為我希望算法繼續做它的事情。這是我想在后臺運行的路線。@app.route('/live-data-source')def live_data_source():    def get_live_data():        live_options = lo.Options()        while True:            live_options.run()            live_options.update_strategy()            trades = live_options.get_all_option_trades()            trades = trades[0]            json_data = json.dumps(                {'data': trades})            yield f"data:{json_data}\n\n"            time.sleep(5)    return Response(get_live_data(), mimetype='text/event-stream')我研究過多線程,但不太確定這是否適合這項工作。我對燒瓶還是個新手,所以這是一個可憐的問題。如果您需要更多信息,請發表評論。
查看完整描述

1 回答

?
三國紛爭

TA貢獻1804條經驗 獲得超7個贊

您可以按照以下方式進行操作 - 下面是 100% 有效的示例。請注意,在生產中使用 Celery 來執行此類任務,或者自己編寫另一個守護程序應用程序(另一個進程),并借助消息隊列(例如 RabbitMQ)或公共數據庫的幫助從 http 服務器向其提供任務。

如果對下面的代碼有任何疑問,請隨時提問,這對我來說是很好的練習:

from flask import Flask, current_app

import threading

from threading import Thread, Event

import time

from random import randint


app = Flask(__name__)

# use the dict to store events to stop other treads

# one event per thread !

app.config["ThreadWorkerActive"] = dict()



def do_work(e: Event):

    """function just for another one thread to do some work"""

    while True:

        if e.is_set():

            break  # can be stopped from another trhead

        print(f"{threading.current_thread().getName()} working now ...")

        time.sleep(2)

    print(f"{threading.current_thread().getName()} was stoped ...")



@app.route("/long_thread", methods=["GET"])

def long_thread_task():

    """Allows to start a new thread"""

    th_name = f"Th-{randint(100000, 999999)}"  # not really unique actually

    stop_event = Event()  # is used to stop another thread

    th = Thread(target=do_work, args=(stop_event, ), name=th_name, daemon=True)

    th.start()

    current_app.config["ThreadWorkerActive"][th_name] = stop_event

    return f"{th_name} was created!"



@app.route("/stop_thread/<th_id>", methods=["GET"])

def stop_thread_task(th_id):

    th_name = f"Th-{th_id}"

    if th_name in current_app.config["ThreadWorkerActive"].keys():

        e = current_app.config["ThreadWorkerActive"].get(th_name)

        if e:

            e.set()

            current_app.config["ThreadWorkerActive"].pop(th_name)

            return f"Th-{th_id} was asked to stop"

        else:

            return "Sorry something went wrong..."

    else:

        return f"Th-{th_id} not found"



@app.route("/", methods=["GET"])

def index_route():

    text = ("/long_thread - create another thread.  "

            "/stop_thread/th_id - stop thread with a certain id.  "

            f"Available Threads: {'; '.join(current_app.config['ThreadWorkerActive'].keys())}")

    return text



if __name__ == '__main__':

    app.run(host="0.0.0.0", port=9999)


查看完整回答
反對 回復 2024-01-15
  • 1 回答
  • 0 關注
  • 198 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號