亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

LangGraph與Streamlit的橋梁:實現語言模型生成內容的實時流式傳輸詳解

在最近的一个项目中,当我实现LangGraph时,我遇到了一个重大挑战:如何有效地将LangGraph状态机中的推理状态和输出令牌实时传输到Streamlit前端。但是这些框架并没有设计成能无缝协作。经过多次尝试,我找到了一个解决方案,利用FastAPI作为中间层来解决这个问题。

你可以在这里查看仓库页面: https://github.com/yigit353/LangGraph-FastAPI-Streamlit

集成挑战:从LangGraph到Streamlit

LangGraph 是 LangChain 的一个扩展,用于构建具有状态的多步骤 LLM 应用程序,在其自身生态系统内提供了出色的流处理能力。然而,将这些流连接到前端框架如 Streamlit 会遇到一些技术难题。

  1. 异步流不兼容:LangGraph采用异步操作,而Streamlit的架构并不直接处理这种流模式。
  2. 多种流类型:LangGraph状态机生成多种类型的输出(推理状态和内容令牌),这些输出在UI中需要分开处理。
  3. 框架边界:数据需要跨框架传输时保持干净,这要求仔细实现。

LangGraph的文档说明了如何在控制台程序中进行流式处理,但与Web框架集成需要额外的步骤,而这在官方示例中未涵盖。

解决方案:一种三层架构设计

我的方法引入了一个FastAPI层,作为连接LangGraph与任何前端的桥梁,其中,Streamlit作为一个演示实现。

LangGraph(状态图)→ FastAPI(流处理API)→ Streamlit(UI)

这种架构清晰地将不同关注领域分离开来:

  • LangGraph 负责大语言模型的工作流逻辑
  • FastAPI 负责流式通信
  • Streamlit(或其他前端工具)则负责可视化
技术实现方法

这个笑话和诗歌的示例来自官方LangGraph文档这里,请参阅更多详情:https://langchain-ai.github.io/langgraph/how-tos/streaming-token(点击链接查看)。

1. LangGraph 状态机的实现

查看代码中,LangGraph的实现代码创建了一个包含两个节点的图,首先生成一个笑话,生成一首诗:

    class State(TypedDict):  
        topic: str  
        joke: str  
        poem: str  

    async def generate_joke(state, config):  
        topic = state["topic"]  
        print("打印中...\n")  
        joke_response = await joke_model.ainvoke(  
            [{"role": "user", "content": f"写一个关于 {topic} 的笑话吧"}],  
            config,  
        )  
        return {"joke": joke_response.content}  

    async def generate_poem(state, config):  
        topic = state["topic"]  
        print("\n打印中...\n")  
        poem_response = await poem_model.ainvoke(  
            [{"role": "user", "content": f"写一首简短的关于 {topic} 的诗吧"}],  
            config,  
        )  
        return {"poem": poem_response.content}

在构建该图的过程中,这些节点按顺序连接如下。

    def create_graph():  
        workflow = StateGraph(state)  

        # 添加节点  
        workflow.add_node("generate_joke", generate_joke)  
        workflow.add_node("generate_poem", generate_poem)  

        # 设置起点  
        workflow.set_entry_point("generate_joke")  

        # 添加边  
        workflow.add_edge("generate_joke", "generate_poem")  

        # 设置结束节点  
        workflow.set_finish_point("generate_poem")  

        return workflow.compile()

这使得系统创建了一个有方向的流程(或步骤),系统首先生成一个笑话,接着生成同样主题的诗。

2. 基于 FastAPI 的服务器:流媒体桥梁

解决方案的核心在于一个基于FastAPI的服务器,它将LangGraph与任何前端连接起来。关键的创新在于它如何从LangGraph的状态机中流式传输推理结果和内容令牌。

@app.post("/generate")  
async def generate_content(request: Request):  
    data = await request.json()  
    topic = data.get("topic", "cats")  

    async def stream_generator():  
        thinking_started = False  

        async for msg, metadata in graph.astream(  
            {"topic": topic},  
            stream_mode="messages",  
        ):  
            node = metadata["langgraph_node"]  
            if node == "generate_joke":  
                if msg.content:  
                    if thinking_started:  
                        print("\n</thinking>\n")  
                        thinking_started = False  
                    print(msg.content, end="", flush=True)  
                    yield sse_format(  
                        {"content": msg.content, "type": "joke", "thinking": False}  
                    )  
                if "reasoning_content" in msg.additional_kwargs:  
                    if not thinking_started:  
                        print("<thinking>")  
                        thinking_started = True  
                    print(  
                        msg.additional_kwargs["reasoning_content"], end="", flush=True  
                    )  
                    yield sse_format(  
                        {  
                            "content": msg.additional_kwargs["reasoning_content"],  
                            "type": "joke",  
                            "thinking": True,  
                        }  
                    )  
            if node == "generate_poem":  
                print(msg.content, end="", flush=True)  
                yield sse_format(  
                    {"content": msg.content, "type": "poem", "thinking": False}  
                )

这个实现显示了几个重要的技术要点:

  1. 它使用 LangGraph 中的 astream 方法,并设置为 stream_mode="messages" 来访问原始消息对象
  2. 它检查 metadata["langgraph_node"] 以确定当前激活的节点是哪个
  3. 对于笑话节点,FastAPI 特别处理常规内容 (msg.content) 和推理部分 (msg.additional_kwargs["reasoning_content"])
  4. 它将每个部分格式化为带有特定类型信息的 Server-Sent Event (SSE),以区分笑话、诗歌和思考内容

The sse_format 辅助函数负责,正确处理 SSE 格式:

# 将有效负载格式化为SSE数据格式
def sse格式化(payload):  
    # 返回格式化后的SSE数据格式
    return f"data: {json.dumps(payload)}\n\n"

这种方法产生了一个干净的数据流,前端可以异步处理。

3. 部分 Streamlit 用户界面(UI):流数据可视化

Streamlit UI 采用了一种复杂的方式展示推理过程和内容流:

    # 处理事件  
    for event in client.events():  
        data = json.loads(event.data)  
        message_type = data.get("type", "N/A")  
        content = data.get("content", "")  
        is_thinking = data.get("thinking", False)  
        if message_type == "joke":  
            if is_thinking:  
                if not thinking_started:  
                    thinking_started = True  
                    joke_spinner.start()  # 启动-spinner  
                    think_time_start = time.time()  
                # 累积思考内容  
                thinking_content += content  
                joke_thinking_container.markdown(  
                    thinking_template.format(thinking_content=thinking_content),  
                    unsafe_allow_html=True,  
                )  
            else:  
                if not thinking_completed:  
                    thinking_completed = True  
                    # 停止-spinner  
                    joke_spinner.stop()  
                    joke_thinking_container()  
                    thinking_time = time.time() - think_time_start  
                    # 在折叠面板中显示  
                    with thinking_expander_container.expander(  
                        "思考了 {:.2f} 秒。查看。".format(  
                            thinking_time  
                        )  
                    ):  
                        st.text(thinking_content)  
                    # 启动-spinner  
                    joke_spinner = ControlledSpinner("正在创建笑话...")  
                    joke_spinner.start()  
                    joke_container.markdown(joke_content)  
                joke_content += content  
                joke_container.markdown(joke_content)  
      elif message_type == "poem":  
        if not joke_completed:  
            joke_completed = True  
            # 停止-spinner  
            joke_spinner.stop()  

            # 添加分隔线  
            divider.markdown("---")  

            # 启动诗歌-spinner  
            poem_spinner.start()  
            poem_container.markdown(poem_content)  

        poem_content += content  
        poem_container.markdown(poem_content)

这个实现解决了几个UI(用户界面)方面的难题:

  1. 顺序处理:首先处理推理令牌,然后处理内容令牌,这种顺序处理方式避免了带宽问题。
  2. 视觉分离:思考内容最初显示在一个蓝色高亮框内,完成后会折叠进一个展开器中,保持界面整洁。思考内容通过这种视觉提示在推理和生成阶段提供了反馈。
  3. 进度指示器:自定义的 ControlledSpinner 对象在推理和生成阶段提供了视觉指示。由于有标准的异步流实现 with st.spinner(),因此不显示自定义的进度指示器。
  4. 内容累积:思考和输出内容逐步积累,允许在令牌到达时进行实时更新。

该 UI 解决方案专门处理从思考到内容生成的转变,并提供相应的视觉引导。

    if not thinking_completed:  
        thinking_completed = True  
        # 停止思考动画  
        笑话思考动画停止()  
        笑话思考容器()  
        thinking_time = time.time() - 思考开始时间  
        # 在展开面板中显示思考时间  
        with thinking_expander_container.expander(  
            "思考了 {:.2f} 秒。查看思考步骤。".format(  
                thinking_time  
            )  
        ):  
            st.text(思考内容)

这实现了一个干净的过渡,保留了访问推理过程的方式,而不使界面变得杂乱。

4. 谈谈如何应对DeepSeek-R1集成的挑战

代码演示了处理DeepSeek R1特定实现细节的方法,其中推理令牌通过additional_kwargs参数提供,而不是在主内容中。

    if "reasoning_content" in msg.additional_kwargs:  
        如果还没有开始思考:  
            print('正在思考...')  
            thinking_started = True  
        print(  
            msg.additional_kwargs["reasoning_content"], end="", flush=True  
        )  
        生成并发送sse格式的消息(  
            {  
                "content": msg.additional_kwargs["思考内容"],  
                "type": "笑话",  
                "thinking": True,  
            }  
        )

这项实现特别检查并提取DeepSeek的additional_kwargs字段里的推理数据,适应模型的特定流式特性。

我也创建了另一个这样的项目,在这个项目中,思维令牌是从DeepSeek-R1流式传输到RAG聊天环境中。你可以在这里查看该项目: https://github.com/yigit353/DeepSeekRAGChat

5. 测试实施情况

test_client.py 文件提供了一个简单的验证方法来检查整个栈是否运行正常:

    def test_generate():  
        """测试带有SSE流的生成端点。"""  
        url = "http://localhost:8000/generate"  
        headers = {"Content-Type": "application/json"}  
        data = {"topic": "dogs"}  

        # 使用流请求获取SSE事件流  
        response = requests.post(url, json=data, headers=headers, stream=True)  

        # 创建SSE客户端实例  
        client = sseclient.SSEClient(response)  

        # 处理每个事件  
        for event in client.events():  
            data = json.loads(event.data)  
            print("-" * 50)  
            print(f"类型: {data.get('type', 'N/A')}")  
            print(f"内容: {data.get('content', 'N/A')}")  
            print(f"思绪: {data.get('thinking', 'N/A')}")

此测试脚本展示了如何连接到FastAPI服务器并使用SSE流,为其他非Streamlit的替代前端实现提供了一个清晰的参考模板。

开始使用这个仓库指南

该仓库提供了三个逐步的例子来帮助你理解并应用这种模式。

该仓库提供了三个逐步的例子来帮助你更好地理解和实现这个模式。

  1. 基本的 AsyncIO 控制台单元图:一个简单的起点,只有一个节点同时处理笑话和诗歌的生成。
  2. AsyncIO 控制台基本图:一个更复杂的图,不同类型的节点分别处理不同类型的内容。
  3. FastAPI + LangGraph + Streamlit:包含所有三层的完整实现,提供了简洁高效的应用程序接口、语言生成图和易于使用的数据展示界面。

如何设置并运行实例:

    # 克隆仓库,
    git clone https://github.com/yigit353/LangGraph-FastAPI-Streamlit.git  

    cd LangGraph-FastAPI-Streamlit # 设置好你的环境  
    python -m venv venv  
    source venv/bin/activate  # 在 Windows 上使用:venv\Scripts\activate  
    pip install -r requirements.txt# 配置你的 API 密钥  
    cp example.env .env  

    # 将你的 DeepSeek API 密钥添加到 .env 文件中  
    cd 03_fastapi_langgraph_streamlit  
    python server.py  

    # 在新的终端里  
    streamlit run streamlit_ui.py
实际的应用领域:如何将理论知识应用到实践中

这种架构模式适用于几个重要的应用场景。

  • 教育用途的LLM应用:让学习过程更加透明,向学生展示AI是如何逐步得出结论的。
  • 调试AI的推理过程:通过观察每一步来找到推理出错的地方。
  • 复杂工作流的可视化:可视化多步骤的LangGraph流程,帮助研究、分析或规划。
  • AI可解释性:让用户更容易理解AI为什么做出了某些推荐,提高AI决策过程的透明度。
技术考虑和未来工作

这个实现满足了LLM应用程序开发中的一个具体需求,但还有很多可以深入研究的地方。

  • WebSocket 实现:为了实现更实时的双向通信
  • 复杂图形可视化:扩展方法来可视化分支决策路径
  • React 实现:创建更复杂的 UI 组件来处理不同类型的数据流
  • 人机循环扩展:增加用户干预推理过程的能力

当前的实现侧重于顺序流(首先进行推理,然后提供内容),这避免了带宽问题,并提供更好的使用体验。

结论部分

使用LangGraph构建可视化应用时,需要解决不同框架之间的流处理集成难题。这种基于FastAPI的方案提供了一个干净的解决方案,同时保持了关注分离,并且可以实时展现推理状态和内容生成。

虽然 LangGraph 的文档为构建状态机提供了出色的起点,但这种实现填补了与前端框架(如 Streamlit)之间的空白,使开发人员更容易用可视化方式构建 LLM 应用程序。

代码已上传至 GitHub,欢迎社区成员提供反馈和建议,帮助我们继续改进这些适用于 LLM 开发的模式。

點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消