尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?
📅 发布时间:2026/6/17 23:11:36

FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

在使用 FastAPI 构建 AI 对话应用时,StreamingResponse 是实现打字机效果的绝佳工具。通过 yield 逐步返回内容,用户体验非常流畅。但一个棘手的问题随之而来:如果用户在 AI 回答的过程中取消对话或中断了连接,后端会发生什么?我们如何确保对话记录等重要数据依然能被可靠地保存到数据库中?

问题出现过程

1. 客户端发起流式对话请求

我们从一个典型的流式对话接口开始。我们使用依赖注入来获取一个 SQLAlchemy 的 AsyncSession,在对话开始时创建消息,在对话结束后更新 AI 的回答。

流式对话原始代码(伪代码)
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息async save_conversation(session, full_response)print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

2. 客户端取消对话(主动断开)

当用户取消发送时,会抛出这个异常

pymysql.err.InterfaceError: 

原因:当客户端断开时 ,FastAPI 会立即把它的 session连接回收掉,底层的那个物理连接被标记为 Cancelled,然后执行finally的时候,再往下传原来session连接就不对了,save_conversation函数就会抛pymysql.err.InterfaceError。


问题解决尝试

尝试一:在 save_conversation 函数中创建新连接

一个自然的想法是:既然旧的 session 不能用了,那就在保存的时候检查一下,如果不可用就创建一个新的。

代码更新
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息async save_conversation(session, full_response)print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可# 1.先判断传入的连接是否可用is_pass = session.inspect(self.db).closedif is_pass:# 继续更新消息else:# 创建新连接  async with AsyncSessionLocal() as session:# 继续更新消息  这又报错了⚠await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

结果:失败! 没想到,即使创建了新的 session,依然抛出了 pymysql.err.InterfaceError。

原因分析:之所以还会抛错误,原因是这个新会话 依然在使用已经被取消的连接池资源,因为 FastAPI/Starlette 在主请求取消时,会把整个 AsyncSessionLocal() 对象的连接都标记为 “cancelled”。即便你重新 async with AsyncSessionLocal(),底层复用的还是同一个数据库连接池里的连接,而那个连接刚被 cancel。

重新创建个数据库引擎 是肯定可以的,但是只是对话后更新,这么搞完全没必要。
或者创建个独立线程,在新线程中去创建新连接,应该是可以的,个人还是感觉比较重,浪费资源。


尝试二:创建个协程去执行save_conversation

代码更新
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息# 创建协程执行asyncio.create_task(save_conversation(session, full_response))print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可# 1.先判断传入的连接是否可用is_pass = session.inspect(self.db).closedif is_pass:# 继续更新消息else:# 创建新连接  async with AsyncSessionLocal() as session:# 继续更新消息  这又报错了⚠await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

疑惑点:asyncio.create_task 启动的协程仍然跑在同一个线程和进程里,也会复用那个全局的连接池,理论上确实还有可能拿到刚才那个被 cancel 的连接啊?

主要在于操作的时序和上下文隔离:

  1. 先清理,后执行

    当原始请求被取消后,FastAPI 会立即开始清理与该请求相关的资源(包括回收它持有的数据库连接)。这个清理动作在 finally 块中调用 create_task 之前就已经触发了。我们派生出的后台任务是在这个清理逻辑之后才启动的。

  2. 上下文隔离

    这个后台协程已经完全不挂在 HTTP 请求的上下文上了。客户端断开与否,都影响不了它的独立运行。只要连接池中还有任意一个好连接,它就能完成写入。

  3. 高成功率

    因为顺序已经变成了:先断开、先清理 → 再新建、再执行,所以新任务向连接池请求时,拿到那个“坏掉”连接的概率已经大大降低。连接池会优先分配一个健康的、空闲的连接。

即使在极端情况下又拿到了旧连接,它也很有可能在 session.begin() 阶段就失败,我们还可以在后台任务的 try...except 块里加入重试逻辑(比如 await asyncio.sleep(0.1) 后重试),进一步提高健壮性。

大量测试后,发现真没问题😁。

结论

对于需要确保最终操作(如数据写入)一定执行的流式 API,asyncio.create_task 提供了一种轻量级且非常有效的解决方案。它避免了引入像 Celery 这样复杂的任务队列,同时优雅地解决了因客户端断连导致资源状态污染的问题。

通过将关键的收尾工作与不稳定的 HTTP 请求生命周期解耦,我们能构建出更加健壮和可靠的 FastAPI 应用。"

相关新闻

  • 2025 长效阻垢马桶权威榜单:95% 阻垢率才达标,告别管路发黄烦恼
  • 2025 年最新推荐冲击试验机优质厂家排行榜:摆锤 / 落锤 / 低温型设备精选,助力企业精准采购优质供应商低温冲击试验机/冲击试验机低温槽/冲击试验机缺口拉床公司推荐
  • 争取孩子抚养权找哪个律师靠谱?婚姻纠纷律师选择参考

最新新闻

  • AD pcb设计规则设置和DRC检查
  • 浙江闸阀厂家实力排行:基于工况适配性的客观盘点 - 起跑123
  • 2026无锡网站建设哪家口碑好:实测筛选3家本土靠谱建站服务商,避坑不踩雷 - wxxwlm
  • 2026年五大SEO优化公司推荐:从传统搜索到生成式引擎,五家值得关注的服务商深度选型评测 - 资讯纵览
  • 微交互设计:从状态反馈到情感化动效的工程化实现
  • 【毕业设计】基于 Python+Vue 的习题自测型自主学习系统的设计与实现 基于 Python+Vue 的轻量化线上自主学习服务系统(源码+文档+远程调试,全bao定制等)

日新闻

  • 2026年不锈钢卷板厂家推荐排行榜:冷轧热轧/304/201不锈钢卷板,高颜值耐腐蚀源头厂家实力精选 - 企业推荐官【官方】
  • FLUX.1-dev FP8模型实战指南:24GB以下显卡高效部署方案
  • 2026佛山长途搬家价目表:跨省跨市搬家费用完整计算指南 - 从来都是英雄出少年

周新闻

  • 3步解锁iOS设备:applera1n激活锁绕过完全指南
  • 39 2026 人工智能证书终极盘点,普通人选 AI 证书可以从这些方向入手
  • Redis 暴露公网有多危险?从端口检查到补救步骤

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号