当前位置: 首页 > news >正文

[python]FastAPI + 自建SSE 踩坑全记录

1、什么是SSE服务

一种服务端向客户端主动推送消息的协议,适合用于服务端完成异步任务后主动向客户端推送消息。

SSE 的优点:

  • 浏览器原生支持 EventSource
  • 实现简单
  • 适合服务端单向推送
  • 不需要 WebSocket 那样的握手和协议控制

2、技术背景

  • 后端使用python+FastAPI;
  • 前端使用vue

3、后端实现代码

services层sse_manage.py
提供SSE的创建和底层功能。

# app/services/sse_manager.py# 简单的 SSE 管理器,支持多个客户端连接,用于向前端主动发送事件通知,例如数据更新完成等importasyncioimportjsonimportloggingfromtypingimportDict,Setfromstarlette.responsesimportStreamingResponse logger=logging.getLogger(__name__)classSSEManager:"""简单的 SSE 管理器,支持多个客户端连接"""def__init__(self):# Dict[str, Set[asyncio.Queue]]: 事件类型 -> 订阅该事件的客户端队列集合# Dict字典,Set集合,asyncio.Queue异步队列self._clients:Dict[str,Set[asyncio.Queue]]={}self.shutdown_event=asyncio.Event()# 用于优雅关闭asyncdefsubscribe(self,event_type:str)->asyncio.Queue:""" 订阅指定事件类型,返回一个 asyncio.Queue 用于接收事件消息 - event_type: 事件类型字符串,例如 "data_update" - 返回值: asyncio.Queue 对象,客户端可以从中异步获取事件消息 - 注意:调用方需要负责调用 unsubscribe 来取消订阅并清理资源 - 示例用法: queue = await sse_manager.subscribe("data_update") while True: message = await queue.get() # 处理消息,例如发送给前端 """queue=asyncio.Queue()# 每个订阅者拥有一个独立的消息队列self._clients.setdefault(event_type,set()).add(queue)returnqueueasyncdefunsubscribe(self,event_type:str,queue:asyncio.Queue):""" 取消订阅指定事件类型,移除对应的 asyncio.Queue - event_type: 事件类型字符串,例如 "data_update" - queue: 之前 subscribe 返回的 asyncio.Queue 对象 - 注意:调用方需要确保传入正确的 queue 对象,否则可能无法正确取消订阅 """ifevent_typeinself._clients:# discard方法会安全地移除元素,如果元素不存在也不会抛出异常self._clients[event_type].discard(queue)asyncdefsend_event(self,event_type:str,data:dict):""" 向所有订阅了指定事件类型的客户端发送事件消息 - event_type: 事件类型字符串,例如 "data_update" - data: 要发送的数据,以字典形式提供,会被转换为 JSON 字符串发送给客户端 - 注意:如果没有订阅该事件类型的客户端,则不会发送任何消息 """ifevent_typenotinself._clients:return# dumps方法将Python对象转换为JSON字符串,event:和data:是SSE协议的格式要求,\n\n表示消息结束message=f"event:{event_type}\ndata:{json.dumps(data)}\n\n"forqueueinself._clients[event_type]:awaitqueue.put(message)asyncdefshutdown(self):print("SSE shutdown 开始...")self.shutdown_event.set()# 通知所有 SSE 任务退出# 全局单例sse_manager=SSEManager()

routes层see.py
提供用于前端订阅的接口,同时会作为客户端长期运行维持SSE。

# app/api/routes/sse.pyimportasynciofromfastapiimportAPIRouterfromstarlette.responsesimportStreamingResponsefromapp.services.sse_managerimportsse_manager router=APIRouter(prefix="/sse",tags=["实时推送"])@router.get("/subscribe/{event_type}")asyncdefsubscribe(event_type:str):""" 订阅指定事件类型的 SSE 流,前端可以通过 EventSource 连接到这个接口来接收实时事件推送 - event_type: 事件类型字符串,例如 "data_update",前端可以根据这个事件类型来区分不同的事件流 - 返回值: StreamingResponse 对象,内容类型为 "text/event-stream",符合 SSE 协议要求 - 注意:前端需要使用 EventSource 来连接这个接口,例如: const eventSource = new EventSource("/api/sse/subscribe/data_update"); eventSource.onmessage = (event) => { const data = JSON.parse(event.data); console.log("Received data update event:", data); }; """queue=awaitsse_manager.subscribe(event_type)asyncdefevent_generator():try:whileTrue:done,pending=awaitasyncio.wait([asyncio.create_task(queue.get()),asyncio.create_task(sse_manager.shutdown_event.wait())],timeout=10,# 心跳间隔return_when=asyncio.FIRST_COMPLETED)# shutdown_event 触发 → 退出ifsse_manager.shutdown_event.is_set():breakifnotdone:yield"event: heartbeat\ndata: {}\n\n"continue# queue.get() 返回message=done.pop().result()ifmessageisNone:breakyieldmessageawaitasyncio.sleep(0.1)exceptasyncio.CancelledError:# 不再抛出,直接忽略,让连接自然关闭# await sse_manager.unsubscribe(event_type, queue)passfinally:awaitsse_manager.unsubscribe(event_type,queue)# StreamingResponse 用于创建一个流式响应,event_generator 是一个异步生成器函数,负责从队列中获取消息并发送给前端# 流式响应是一种特殊的HTTP响应,允许服务器持续发送数据给客户端,而不需要等待所有数据准备好后一次性发送,这对于实时推送非常有用returnStreamingResponse(event_generator(),media_type="text/event-stream",headers={"Cache-Control":"no-cache","Connection":"keep-alive","Access-Control-Allow-Origin":"*",# 允许前端跨域"X-Accel-Buffering":"no",})

4、前端实现代码

composables层useSSE.ts
提供前端的持续消息接收服务。

exportfunctionuseSSE(eventType:string,callback:(data:any)=>void){leteventSource:EventSource|null=nullletlastHeartbeat=Date.now()constcreateConnection=()=>{eventSource=newEventSource(`${import.meta.env.VITE_API_BASE_URL}/sse/subscribe/${eventType}`)// 正常业务事件eventSource.addEventListener(eventType,(event)=>{constdata=JSON.parse(event.data)callback(data)})// 心跳事件eventSource.addEventListener("heartbeat",()=>{lastHeartbeat=Date.now()})// 服务器关闭事件eventSource.addEventListener('server_shutdown',()=>{console.log('服务器即将关闭,SSE 连接主动断开')eventSource?.close()})// 出错时自动重连(排除正常关闭)eventSource.onerror=()=>{if(eventSource?.readyState===EventSource.CLOSED)returneventSource?.close()setTimeout(createConnection,3000)}}createConnection()// 心跳超时检测(关键)setInterval(()=>{if(Date.now()-lastHeartbeat>15000){console.log("心跳超时,服务器可能已关闭,主动断开 SSE")eventSource?.close()}},5000)window.addEventListener('beforeunload',()=>eventSource?.close())returneventSource}

前段使用SSE的方法:在APP.vue下配置如下
订阅task_completed消息并实时弹出弹窗提示。

<script setup lang="ts">import{useSSE}from'@/composables/useSSE'import{ElNotification}from'element-plus'import{onMounted,onBeforeUnmount}from'vue'letsse:EventSource|null=null// 监听任务完成事件onMounted(()=>{sse=useSSE('task_completed',(data)=>{ElNotification({title:'任务完成',message:data.message||'操作已成功',type:data.type||'success',duration:5000,})})})onBeforeUnmount(()=>{sse?.close()})</script>

5、该方案的弊端

该方案可以顺利实现SSE服务前后端消息推送功能,但是会导致后端服务无法正常关闭。
在开发中一般使用如下命令启动python后端服务器

uvicorn app.main:app--reload

该指令让后端服务可以随着后端文件修改,按下ctrl+s后自动重启后端更新程序,同时还可以ctrl+c中止程序。
但是由于在routes层接口配置了while true的客户端连接,这会导致uvicorn一直等待连接的关闭而卡住,除非到达超时时间触发uvicorn 的强制关闭。

可以通过如下的方式配置超时参数来减少超时等待延迟

uvicorn app.main:app--reload--timeout-graceful-shutdown5

但是超时时间到达后由于SSE连接被强制关闭,会导致后端出现一大片报错。
参考如下

(strategy-env)PS E:\2025\机器学习\Strategy-Forge\backend>uvicorn app.main:app--reload--timeout-graceful-shutdown30INFO: Willwatchforchangesinthese directories:['E:\\2025\\机器学习\\Strategy-Forge\\backend']INFO: Uvicorn running on http://127.0.0.1:8000(Press CTRL+C to quit)INFO: Started reloader process[5872]using StatReload INFO: Started server process[20604]INFO: Waitingforapplication startup. 📋 API 文档:http://127.0.0.1:8000/docs INFO: Application startup complete. INFO:127.0.0.1:53078 -"GET /sse/subscribe/task_completed HTTP/1.1"200OK INFO: Shutting down INFO: Waitingforconnections to close.(CTRL+C to force quit)ERROR: Cancel1running task(s),timeoutgracefulshutdownexceeded INFO: Waitingforapplication shutdown. SSEshutdown开始... INFO: Applicationshutdowncomplete. INFO: Finished server process[20604]ERROR: ExceptioninASGI application Traceback(most recent call last): File"E:\Anaconda\envs\strategy-env\Lib\site-packages\uvicorn\protocols\http\h11_impl.py", line415,inrun_asgi result=await app(# type: ignore[func-returns-value]^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ self.scope, self.receive, self.send ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^)^ File"E:\Anaconda\envs\strategy-env\Lib\site-packages\uvicorn\middleware\proxy_headers.py", line63,in__call__returnawait self.app(scope, receive, send)^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File"E:\Anaconda\envs\strategy-env\Lib\site-packages\fastapi\applications.py", line1159,in__call__ await super().__call__(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\applications.py", line90,in__call__ await self.middleware_stack(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\middleware\errors.py", line164,in__call__ await self.app(scope, receive, _send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\middleware\cors.py", line96,in__call__ await self.simple_response(scope, receive, send,request_headers=headers)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\middleware\cors.py", line154,insimple_response await self.app(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\middleware\exceptions.py", line63,in__call__ await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\_exception_handler.py", line42,inwrapped_app await app(scope, receive, sender)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\fastapi\middleware\asyncexitstack.py", line18,in__call__ await self.app(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\routing.py", line660,in__call__ await self.middleware_stack(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\routing.py", line680,inapp await route.handle(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\routing.py", line276,inhandle await self.app(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\fastapi\routing.py", line134,inapp await wrap_app_handling_exceptions(app, request)(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\_exception_handler.py", line42,inwrapped_app await app(scope, receive, sender)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\fastapi\routing.py", line121,inapp await response(scope, receive, send)File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\responses.py", line274,in__call__ async with anyio.create_task_group()as task_group: ~~~~~~~~~~~~~~~~~~~~~~~^^ File"E:\Anaconda\envs\strategy-env\Lib\site-packages\anyio\_backends\_asyncio.py", line803,in__aexit__ raise exc_val File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\responses.py", line281,in__call__ await wrap(partial(self.listen_for_disconnect, receive))File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\responses.py", line277,inwrap await func()File"E:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\responses.py", line244,inlisten_for_disconnect message=await receive()^^^^^^^^^^^^^^^ File"E:\Anaconda\envs\strategy-env\Lib\site-packages\uvicorn\protocols\http\h11_impl.py", line536,inreceive await self.message_event.wait()File"E:\Anaconda\envs\strategy-env\Lib\asyncio\locks.py", line213,inwaitawait fut asyncio.exceptions.CancelledError: Task cancelled,timeoutgracefulshutdownexceeded INFO: Stopping reloader process[5872]

一开始,我的解决方式是在程序的生命周期结束时主动触发SSE关闭。即在main.py中配置app的生命周期如下:

@asynccontextmanagerasyncdeflifespan(app:FastAPI):print("📋 API 文档:http://127.0.0.1:8000/docs")yield# 该任务本意用来在控制面板ctrl+c关闭后端的时候主动关闭sse服务# 但是由于unicorn关闭会先于shutdown触发,因此总是会导致sse先被异常关闭掉awaitsse_manager.shutdown()

上述方法中yield前的程序会在后端开始生命周期时,正式运行前执行,yield后的程序则会在生命周期结束时执行,即sse_manager.shutdown()。
事实证明,该方案是不可行的,从上面的报错例子中也可以看到,先触发了ERROR,才打印出了sse_manager.shutdown()内部的print,这表示按下ctrl+c后,程序等待超时触发了强制关闭,导致了报错后,才触发了我的方法,这为时已晚。
由于上述框架中生命周期的设置,想要在FastAPI中正常关闭我的SSE似乎是不可能的了,我也尝试过通过心跳机制让前端发送断开连接,但是此时后端已经在“死亡的路上”,依然无法正常关闭。

6、应该如何实现优雅的后端的消息推送

最终,只能遗憾地判断,我的SSE无法和FastAPI兼容,虽然功能得以实现,但是服务端关闭时的报错让人难以接收。
最终查询发现,其实FastAPI有提供SSE服务(居然完全不需要我自己写吗!!!)
框架官方提供的SSE应该和它的生命周期是可以兼容的,应该考虑使用该方案来实现功能的同时又能优雅地关闭服务。
同时还可以考虑用WebSocket协议来实现后端消息的推送,作为全双工的协议,其应当可以轻松实现主动断开连接防止重复等待。

7、补充

FastAPI提供的SSE文档只是对于SSE格式数据的封装,并没有提供SSE服务,依然无法改善我的问题。

后续有两个方案可以改进我的问题:
1、仅临时通讯时使用SSE,例如前端触发事件后开启SSE通讯,等待超时或者SSE正常返回完成后主动关闭SSE,这样就不会出现服务端关闭时SSE客户端持续等待的问题;
2、改为客户端主动也想服务端发送心跳包,超时则主动关闭,该修改仍然需要服务端等待一段时间。
3、换用WebSocket协议来构建长时间的前后端相互通讯通道(websocket似乎也会有类似问题,仍然需要测试)。

http://www.rkmt.cn/news/1514128.html

相关文章:

  • 告别命令行恐惧:用GROMACS和Travis插件可视化RDF与SDF的保姆级流程
  • Articraft:一种用于可扩展关节 3D 资产生成的智体系统
  • jQuery树形组件完整示例包:含静态渲染、数据库异步加载和父子联动多选功能
  • 从‘九鼎之局’到旋转数独:我是如何用贪心和斜线法登上最强大脑榜一的
  • 新公司注册下来之后必须做账报税吗?
  • 一台电脑,四人同乐:Nucleus Co-Op分屏游戏终极指南
  • 别再凭感觉画线了!用KiCad/Eagle实战演示:如何根据电流和板厂工艺精准设置PCB线宽
  • 别再被网站屏蔽了!Chromedp无头浏览器隐藏WebDriver指纹的保姆级教程
  • 3分钟学会:OBS背景移除插件让普通摄像头变专业绿幕
  • ISP Tuning新手到高手:我的三段式学习法,从调参数到懂原理
  • IR-Protocol 已正式上线,面向AI记忆链与人文学交互AI 开放标准文档
  • 从‘开发’到‘验证’:一张图看懂DO-178C工具鉴定等级(TQL)怎么定,附工具选型避坑建议
  • “AI大语言模型”助力大气科学相关交叉领域实践技术应用
  • 避坑指南:N32G45X移植LVGL 8.3到ILI9488屏幕,我遇到的三个“坑”及填平方法
  • WPF自定义窗口避坑指南:WindowChrome最大化时内容被任务栏遮挡?一招搞定!
  • 从RDF到3D SDF:一次搞懂GROMACS后处理如何揭示分子间的“爱恨情仇”
  • GLASS LAI数据月度合成实战:如何用Python+ArcPy脚本智能区分平闰年,实现MVC最大值合成
  • 2026年成都专业销毁中心服务现状与口碑观察:从文件保密到食品环保的多元选择 - 优质品牌商家
  • AI 驱动的响应式布局生成:从设计意图到自适应代码,前端开发的视觉自动化
  • 2026年移动式径向偏差测量仪选购指南:技术参数与工程实践深度分析 - 优质品牌商家
  • 新手避坑指南:在1kHz控制频率下,如何让你的Franka机械臂libfranka代码跑得更稳?
  • 2026装企管理软件选型指南:技术、成本、服务三维度实测对比 - 优质品牌商家
  • MySQL表约束体系全解:从基础语法到实战设计,吃透所有约束类型与核心坑点
  • GEE新手避坑指南:获取MODIS NDVI数据时,为什么你的值域总是不对?
  • 别再手动改文献了!用Better BibTex插件5分钟搞定Zotero导出格式,完美对齐Google Scholar
  • VMware Workstation Pro 17 虚拟化技术指南:许可证管理与企业级部署方案
  • i.MX21架构解析:异构计算与低功耗设计如何重塑嵌入式多媒体
  • 别再只会用装饰器了!用Python Hook机制给你的Flask/Django应用加个‘插件’功能
  • 线程管理特点 线程属性 线程状态之间切换
  • 2026年浙江牛皮纸扑克牌源头厂家专业实力与选型全解析 - 品牌鉴赏官2026