在最近的一个项目中,当我实现LangGraph时,我遇到了一个重大挑战:如何有效地将LangGraph状态机中的推理状态和输出令牌实时传输到Streamlit前端。但是这些框架并没有设计成能无缝协作。经过多次尝试,我找到了一个解决方案,利用FastAPI作为中间层来解决这个问题。
你可以在这里查看仓库页面: https://github.com/yigit353/LangGraph-FastAPI-Streamlit
集成挑战:从LangGraph到StreamlitLangGraph 是 LangChain 的一个扩展,用于构建具有状态的多步骤 LLM 应用程序,在其自身生态系统内提供了出色的流处理能力。然而,将这些流连接到前端框架如 Streamlit 会遇到一些技术难题。
- 异步流不兼容:LangGraph采用异步操作,而Streamlit的架构并不直接处理这种流模式。
- 多种流类型:LangGraph状态机生成多种类型的输出(推理状态和内容令牌),这些输出在UI中需要分开处理。
- 框架边界:数据需要跨框架传输时保持干净,这要求仔细实现。
LangGraph的文档说明了如何在控制台程序中进行流式处理,但与Web框架集成需要额外的步骤,而这在官方示例中未涵盖。
解决方案:一种三层架构设计我的方法引入了一个FastAPI层,作为连接LangGraph与任何前端的桥梁,其中,Streamlit作为一个演示实现。
LangGraph(状态图)→ FastAPI(流处理API)→ Streamlit(UI)
这种架构清晰地将不同关注领域分离开来:
- LangGraph 负责大语言模型的工作流逻辑
- FastAPI 负责流式通信
- Streamlit(或其他前端工具)则负责可视化
这个笑话和诗歌的示例来自官方LangGraph文档这里,请参阅更多详情:https://langchain-ai.github.io/langgraph/how-tos/streaming-token(点击链接查看)。
1. LangGraph 状态机的实现查看代码中,LangGraph的实现代码创建了一个包含两个节点的图,首先生成一个笑话,生成一首诗:
class State(TypedDict):
topic: str
joke: str
poem: str
async def generate_joke(state, config):
topic = state["topic"]
print("打印中...\n")
joke_response = await joke_model.ainvoke(
[{"role": "user", "content": f"写一个关于 {topic} 的笑话吧"}],
config,
)
return {"joke": joke_response.content}
async def generate_poem(state, config):
topic = state["topic"]
print("\n打印中...\n")
poem_response = await poem_model.ainvoke(
[{"role": "user", "content": f"写一首简短的关于 {topic} 的诗吧"}],
config,
)
return {"poem": poem_response.content}
在构建该图的过程中,这些节点按顺序连接如下。
def create_graph():
workflow = StateGraph(state)
# 添加节点
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("generate_poem", generate_poem)
# 设置起点
workflow.set_entry_point("generate_joke")
# 添加边
workflow.add_edge("generate_joke", "generate_poem")
# 设置结束节点
workflow.set_finish_point("generate_poem")
return workflow.compile()
这使得系统创建了一个有方向的流程(或步骤),系统首先生成一个笑话,接着生成同样主题的诗。
2. 基于 FastAPI 的服务器:流媒体桥梁解决方案的核心在于一个基于FastAPI的服务器,它将LangGraph与任何前端连接起来。关键的创新在于它如何从LangGraph的状态机中流式传输推理结果和内容令牌。
@app.post("/generate")
async def generate_content(request: Request):
data = await request.json()
topic = data.get("topic", "cats")
async def stream_generator():
thinking_started = False
async for msg, metadata in graph.astream(
{"topic": topic},
stream_mode="messages",
):
node = metadata["langgraph_node"]
if node == "generate_joke":
if msg.content:
if thinking_started:
print("\n</thinking>\n")
thinking_started = False
print(msg.content, end="", flush=True)
yield sse_format(
{"content": msg.content, "type": "joke", "thinking": False}
)
if "reasoning_content" in msg.additional_kwargs:
if not thinking_started:
print("<thinking>")
thinking_started = True
print(
msg.additional_kwargs["reasoning_content"], end="", flush=True
)
yield sse_format(
{
"content": msg.additional_kwargs["reasoning_content"],
"type": "joke",
"thinking": True,
}
)
if node == "generate_poem":
print(msg.content, end="", flush=True)
yield sse_format(
{"content": msg.content, "type": "poem", "thinking": False}
)
这个实现显示了几个重要的技术要点:
- 它使用 LangGraph 中的
astream
方法,并设置为stream_mode="messages"
来访问原始消息对象 - 它检查
metadata["langgraph_node"]
以确定当前激活的节点是哪个 - 对于笑话节点,FastAPI 特别处理常规内容 (
msg.content
) 和推理部分 (msg.additional_kwargs["reasoning_content"]
) - 它将每个部分格式化为带有特定类型信息的 Server-Sent Event (SSE),以区分笑话、诗歌和思考内容
The sse_format
辅助函数负责,正确处理 SSE 格式:
# 将有效负载格式化为SSE数据格式
def sse格式化(payload):
# 返回格式化后的SSE数据格式
return f"data: {json.dumps(payload)}\n\n"
这种方法产生了一个干净的数据流,前端可以异步处理。
3. 部分 Streamlit 用户界面(UI):流数据可视化Streamlit UI 采用了一种复杂的方式展示推理过程和内容流:
# 处理事件
for event in client.events():
data = json.loads(event.data)
message_type = data.get("type", "N/A")
content = data.get("content", "")
is_thinking = data.get("thinking", False)
if message_type == "joke":
if is_thinking:
if not thinking_started:
thinking_started = True
joke_spinner.start() # 启动-spinner
think_time_start = time.time()
# 累积思考内容
thinking_content += content
joke_thinking_container.markdown(
thinking_template.format(thinking_content=thinking_content),
unsafe_allow_html=True,
)
else:
if not thinking_completed:
thinking_completed = True
# 停止-spinner
joke_spinner.stop()
joke_thinking_container()
thinking_time = time.time() - think_time_start
# 在折叠面板中显示
with thinking_expander_container.expander(
"思考了 {:.2f} 秒。查看。".format(
thinking_time
)
):
st.text(thinking_content)
# 启动-spinner
joke_spinner = ControlledSpinner("正在创建笑话...")
joke_spinner.start()
joke_container.markdown(joke_content)
joke_content += content
joke_container.markdown(joke_content)
elif message_type == "poem":
if not joke_completed:
joke_completed = True
# 停止-spinner
joke_spinner.stop()
# 添加分隔线
divider.markdown("---")
# 启动诗歌-spinner
poem_spinner.start()
poem_container.markdown(poem_content)
poem_content += content
poem_container.markdown(poem_content)
这个实现解决了几个UI(用户界面)方面的难题:
- 顺序处理:首先处理推理令牌,然后处理内容令牌,这种顺序处理方式避免了带宽问题。
- 视觉分离:思考内容最初显示在一个蓝色高亮框内,完成后会折叠进一个展开器中,保持界面整洁。思考内容通过这种视觉提示在推理和生成阶段提供了反馈。
- 进度指示器:自定义的
ControlledSpinner
对象在推理和生成阶段提供了视觉指示。由于有标准的异步流实现with st.spinner()
,因此不显示自定义的进度指示器。 - 内容累积:思考和输出内容逐步积累,允许在令牌到达时进行实时更新。
该 UI 解决方案专门处理从思考到内容生成的转变,并提供相应的视觉引导。
if not thinking_completed:
thinking_completed = True
# 停止思考动画
笑话思考动画停止()
笑话思考容器()
thinking_time = time.time() - 思考开始时间
# 在展开面板中显示思考时间
with thinking_expander_container.expander(
"思考了 {:.2f} 秒。查看思考步骤。".format(
thinking_time
)
):
st.text(思考内容)
这实现了一个干净的过渡,保留了访问推理过程的方式,而不使界面变得杂乱。
4. 谈谈如何应对DeepSeek-R1集成的挑战代码演示了处理DeepSeek R1特定实现细节的方法,其中推理令牌通过additional_kwargs
参数提供,而不是在主内容中。
if "reasoning_content" in msg.additional_kwargs:
如果还没有开始思考:
print('正在思考...')
thinking_started = True
print(
msg.additional_kwargs["reasoning_content"], end="", flush=True
)
生成并发送sse格式的消息(
{
"content": msg.additional_kwargs["思考内容"],
"type": "笑话",
"thinking": True,
}
)
这项实现特别检查并提取DeepSeek的additional_kwargs
字段里的推理数据,适应模型的特定流式特性。
我也创建了另一个这样的项目,在这个项目中,思维令牌是从DeepSeek-R1流式传输到RAG聊天环境中。你可以在这里查看该项目: https://github.com/yigit353/DeepSeekRAGChat
5. 测试实施情况test_client.py
文件提供了一个简单的验证方法来检查整个栈是否运行正常:
def test_generate():
"""测试带有SSE流的生成端点。"""
url = "http://localhost:8000/generate"
headers = {"Content-Type": "application/json"}
data = {"topic": "dogs"}
# 使用流请求获取SSE事件流
response = requests.post(url, json=data, headers=headers, stream=True)
# 创建SSE客户端实例
client = sseclient.SSEClient(response)
# 处理每个事件
for event in client.events():
data = json.loads(event.data)
print("-" * 50)
print(f"类型: {data.get('type', 'N/A')}")
print(f"内容: {data.get('content', 'N/A')}")
print(f"思绪: {data.get('thinking', 'N/A')}")
此测试脚本展示了如何连接到FastAPI服务器并使用SSE流,为其他非Streamlit的替代前端实现提供了一个清晰的参考模板。
开始使用这个仓库指南该仓库提供了三个逐步的例子来帮助你理解并应用这种模式。
该仓库提供了三个逐步的例子来帮助你更好地理解和实现这个模式。
- 基本的 AsyncIO 控制台单元图:一个简单的起点,只有一个节点同时处理笑话和诗歌的生成。
- AsyncIO 控制台基本图:一个更复杂的图,不同类型的节点分别处理不同类型的内容。
- FastAPI + LangGraph + Streamlit:包含所有三层的完整实现,提供了简洁高效的应用程序接口、语言生成图和易于使用的数据展示界面。
如何设置并运行实例:
# 克隆仓库,
git clone https://github.com/yigit353/LangGraph-FastAPI-Streamlit.git
cd LangGraph-FastAPI-Streamlit # 设置好你的环境
python -m venv venv
source venv/bin/activate # 在 Windows 上使用:venv\Scripts\activate
pip install -r requirements.txt# 配置你的 API 密钥
cp example.env .env
# 将你的 DeepSeek API 密钥添加到 .env 文件中
cd 03_fastapi_langgraph_streamlit
python server.py
# 在新的终端里
streamlit run streamlit_ui.py
实际的应用领域:如何将理论知识应用到实践中
这种架构模式适用于几个重要的应用场景。
- 教育用途的LLM应用:让学习过程更加透明,向学生展示AI是如何逐步得出结论的。
- 调试AI的推理过程:通过观察每一步来找到推理出错的地方。
- 复杂工作流的可视化:可视化多步骤的LangGraph流程,帮助研究、分析或规划。
- AI可解释性:让用户更容易理解AI为什么做出了某些推荐,提高AI决策过程的透明度。
这个实现满足了LLM应用程序开发中的一个具体需求,但还有很多可以深入研究的地方。
- WebSocket 实现:为了实现更实时的双向通信
- 复杂图形可视化:扩展方法来可视化分支决策路径
- React 实现:创建更复杂的 UI 组件来处理不同类型的数据流
- 人机循环扩展:增加用户干预推理过程的能力
当前的实现侧重于顺序流(首先进行推理,然后提供内容),这避免了带宽问题,并提供更好的使用体验。
结论部分使用LangGraph构建可视化应用时,需要解决不同框架之间的流处理集成难题。这种基于FastAPI的方案提供了一个干净的解决方案,同时保持了关注分离,并且可以实时展现推理状态和内容生成。
虽然 LangGraph 的文档为构建状态机提供了出色的起点,但这种实现填补了与前端框架(如 Streamlit)之间的空白,使开发人员更容易用可视化方式构建 LLM 应用程序。
代码已上传至 GitHub,欢迎社区成员提供反馈和建议,帮助我们继续改进这些适用于 LLM 开发的模式。
共同學習,寫下你的評論
評論加載中...
作者其他優質文章