当前位置: 首页 > news >正文

FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

在使用 FastAPI 构建 AI 对话应用时,StreamingResponse 是实现打字机效果的绝佳工具。通过 yield 逐步返回内容,用户体验非常流畅。但一个棘手的问题随之而来:如果用户在 AI 回答的过程中取消对话或中断了连接,后端会发生什么?我们如何确保对话记录等重要数据依然能被可靠地保存到数据库中?

问题出现过程

1. 客户端发起流式对话请求

我们从一个典型的流式对话接口开始。我们使用依赖注入来获取一个 SQLAlchemy 的 AsyncSession,在对话开始时创建消息,在对话结束后更新 AI 的回答。

流式对话原始代码(伪代码)
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息async save_conversation(session, full_response)print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

2. 客户端取消对话(主动断开)

当用户取消发送时,会抛出这个异常

pymysql.err.InterfaceError: 

原因:当客户端断开时 ,FastAPI 会立即把它的 session连接回收掉,底层的那个物理连接被标记为 Cancelled,然后执行finally的时候,再往下传原来session连接就不对了,save_conversation函数就会抛pymysql.err.InterfaceError。


问题解决尝试

尝试一:在 save_conversation 函数中创建新连接

一个自然的想法是:既然旧的 session 不能用了,那就在保存的时候检查一下,如果不可用就创建一个新的。

代码更新
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息async save_conversation(session, full_response)print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可# 1.先判断传入的连接是否可用is_pass = session.inspect(self.db).closedif is_pass:# 继续更新消息else:# 创建新连接  async with AsyncSessionLocal() as session:# 继续更新消息  这又报错了⚠await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

结果:失败! 没想到,即使创建了新的 session,依然抛出了 pymysql.err.InterfaceError

原因分析:之所以还会抛错误,原因是这个新会话 依然在使用已经被取消的连接池资源,因为 FastAPI/Starlette 在主请求取消时,会把整个 AsyncSessionLocal() 对象的连接都标记为 “cancelled”。即便你重新 async with AsyncSessionLocal(),底层复用的还是同一个数据库连接池里的连接,而那个连接刚被 cancel。

重新创建个数据库引擎 是肯定可以的,但是只是对话后更新,这么搞完全没必要。
或者创建个独立线程,在新线程中去创建新连接,应该是可以的,个人还是感觉比较重,浪费资源。


尝试二:创建个协程去执行save_conversation

代码更新
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息# 创建协程执行asyncio.create_task(save_conversation(session, full_response))print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可# 1.先判断传入的连接是否可用is_pass = session.inspect(self.db).closedif is_pass:# 继续更新消息else:# 创建新连接  async with AsyncSessionLocal() as session:# 继续更新消息  这又报错了⚠await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

疑惑点:asyncio.create_task 启动的协程仍然跑在同一个线程和进程里,也会复用那个全局的连接池,理论上确实还有可能拿到刚才那个被 cancel 的连接啊?

主要在于操作的时序和上下文隔离

  1. 先清理,后执行

    当原始请求被取消后,FastAPI 会立即开始清理与该请求相关的资源(包括回收它持有的数据库连接)。这个清理动作在 finally 块中调用 create_task 之前就已经触发了。我们派生出的后台任务是在这个清理逻辑之后才启动的。

  2. 上下文隔离

    这个后台协程已经完全不挂在 HTTP 请求的上下文上了。客户端断开与否,都影响不了它的独立运行。只要连接池中还有任意一个好连接,它就能完成写入。

  3. 高成功率

    因为顺序已经变成了:先断开、先清理 → 再新建、再执行,所以新任务向连接池请求时,拿到那个“坏掉”连接的概率已经大大降低。连接池会优先分配一个健康的、空闲的连接。

即使在极端情况下又拿到了旧连接,它也很有可能在 session.begin() 阶段就失败,我们还可以在后台任务的 try...except 块里加入重试逻辑(比如 await asyncio.sleep(0.1) 后重试),进一步提高健壮性。

大量测试后,发现真没问题😁。

结论

对于需要确保最终操作(如数据写入)一定执行的流式 API,asyncio.create_task 提供了一种轻量级且非常有效的解决方案。它避免了引入像 Celery 这样复杂的任务队列,同时优雅地解决了因客户端断连导致资源状态污染的问题。

通过将关键的收尾工作与不稳定的 HTTP 请求生命周期解耦,我们能构建出更加健壮和可靠的 FastAPI 应用。"

http://www.rkmt.cn/news/60594.html

相关文章:

  • 2025 长效阻垢马桶权威榜单:95% 阻垢率才达标,告别管路发黄烦恼
  • 2025 年最新推荐冲击试验机优质厂家排行榜:摆锤 / 落锤 / 低温型设备精选,助力企业精准采购优质供应商低温冲击试验机/冲击试验机低温槽/冲击试验机缺口拉床公司推荐
  • 争取孩子抚养权找哪个律师靠谱?婚姻纠纷律师选择参考
  • 2025 最新硅芯管源头厂家推荐排行榜:权威甄选高密度聚乙烯 / 通信 / 光缆用优质管材供应企业通信用硅芯管/光缆保护用硅芯管/高强度硅芯管/内壁润滑硅芯管公司推荐
  • 2025年11月山东石材雕刻机/墓碑雕刻机/绳锯机综合选购指南与十大推荐:山东永福泰登顶
  • 时间序列信息预测:14种机器学习与深度学习模型
  • 2025年口碑好的成都制造业短视频运营公司最新权威实力榜
  • 2025 最新推荐!金刚石量子传感器厂家权威榜单:聚焦技术创新、产业应用与国际测评领先品牌金刚石量子磁力仪/金刚石量子探针扫描仪/金刚石量子显微镜/金刚石量子温度探针公司推荐
  • rman备份顺序详解
  • 创业企业如何选云?AWS、Azure、Google Cloud差异全解析(IDC Gartner洞察)
  • 2025海外云服务器推荐报告:Why AWS Dominates the Global Cloud Market
  • 国标GB28181算法算力平台EasyGBS助力构建食品安全监督管理系统全流程可视化监管方案
  • 创建同值数组
  • mapvthree 地理投影设计分析——自动转换与统一接口的设计理念
  • 安阳一对一家教辅导机构 TOP5 排行榜:2026年综合测评
  • 2025年双组份喷涂泵专用喷枪优质厂家权威推荐:高压无气喷涂机专用喷枪/无气喷涂机专用喷枪/双组份喷漆泵实力厂商精选
  • 2025年企业独栋招商机构口碑对比排行榜,办公场地/企业独栋/园区企业独栋出售哪个好
  • 2025-11-25 NOIP 模拟赛9 赛后总结
  • 2025.11.25
  • 实用指南:[论文阅读] 从 5MB 到 1.6GB 数据:Java/Scala/Python 在 Spark 中的性能表现全解析
  • 2025年立式内圆磨床优质厂家权威推荐榜单:高品质立式内圆磨床‌/高质量立式内圆磨床‌/新型立式内圆磨床‌源头厂家精选
  • 2025年11月乐清装修,半包,全包,全屋定制,别墅装修公司口碑推荐榜单TOP5精选指南
  • 【IEEE和ACM双出版 | 连续4届稳定EI检索 | 会议录用率高】第五届计算建模、仿真与数据分析国际学术会议(CMSDA 2025)
  • 【2025-11-24】又到周末
  • 2025年广告边框铝型材制造厂权威推荐榜单:葡萄架铝合金型材/门窗铝合金型材/工业铝型材源头厂家精选
  • (让 Java IA MCP 更简单 )Solon AI v3.7.2 发布
  • P14568 【MX-S12-T3】排列
  • SQL分区裁剪 - --
  • 2025年包头钢材/无缝钢管/螺纹管/型材/钢板行业场实力厂家盘点:优质源头厂家精选指南
  • 2025 最新太原山西菜馆推荐!权威测评认证的山西菜馆排行榜,探寻非遗传承与地道风味的匠心之选