尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

rabbitmq+websocket实时通知

rabbitmq+websocket实时通知
📅 发布时间:2026/6/30 2:38:04

一、业务背景

在网约车场景中,乘客下单后会进入「等待司机接单」页面。

如果只靠 HTTP 轮询(每隔几秒查一次订单状态),会有这些问题:

  • 延迟高:轮询间隔越大,用户感知越慢
  • 浪费资源:大量无效请求,服务器压力大
  • 体验差:页面需要不断刷新或定时请求

更好的方案是:服务端主动推送。

乘客打开等待页时建立 WebSocket 长连接;司机抢单成功后,后端立刻把通知推送到乘客浏览器。

但这里有个架构问题:

  • 抢单逻辑在 rpc-order 服务
  • WebSocket 连接在 api-gateway 服务

两个服务不在同一进程,不能直接调用hub.Push()。

中间就需要一个消息队列做解耦——本项目用的是 RabbitMQ。


二、整体架构

完整链路如下:

司机抢单成功

↓

Redis Stream(异步落库)

↓

rpc-order 更新 MySQL

↓

发布 RabbitMQ 消息(OrderGrabbedEvent)

↓

api-gateway 消费消息

↓

WebSocket Hub 按 userId 精准推送

↓

乘客浏览器收到「司机已接单」通知

三个角色分工:

rpc-order

  • 抢单、落库、发布 MQ 事件

RabbitMQ

  • 跨服务传递「接单通知」

api-gateway

  • 维护 WebSocket 连接,消费 MQ 并推送

三、RabbitMQ:Direct 路由模式

3.1 为什么选 Direct 路由模式

RabbitMQ 有多种交换机类型(fanout、topic、direct 等)。

本项目用的是 Direct 路由模式:

  • 生产者发消息时带上 routing key
  • 交换机按 key 精确匹配,把消息路由到对应队列
  • 适合「一种事件类型 → 一个消费队列」的场景

3.2 拓扑结构

生产者(rpc-order)

──Publish──► Exchange: order.notify.exchange

│ routing key: order.grabbed

▼

Queue: order.notify.queue

│

消费者(api-gateway) ◄──Consume──┘

对应常量:

  • 交换机:order.notify.exchange
  • 路由键:order.grabbed
  • 队列:order.notify.queue

3.3 事件结构

消息体是一个 JSON 对象,主要字段:

  • event:固定为driver_accepted,前端用来判断消息类型
  • user_id:乘客 ID,Gateway 用它查找 WebSocket 连接
  • order_no:订单号
  • driver_name:司机姓名
  • car_number:车牌号
  • car_type:车型
  • rating:司机评分

前端拿到这些字段可以直接渲染,不需要再调接口。

3.4 发布时机

发布放在 MySQL 落库成功之后,保证「通知出去时,数据库已经是已接单状态」。

调用链:

GrabOrder(Lua 抢单)

→ Redis Stream

→ OrderGrabbedConsumer 消费

→ updateOrderGrabbed(MySQL)

→ publishOrderGrabbedNotify()

→ rmq.PublishOrderGrabbed()

设计要点:

  • 从 MySQL 查订单和司机信息,不依赖 Redis 缓存(抢单后缓存已被删除)
  • 发布失败只打日志,不影响 Stream 消费 ACK(通知与落库解耦)

3.5 Confirm 模式

发送端开启了 Publisher Confirm:

  • 消息发出后等待 Broker 的 ACK
  • 确保 RabbitMQ 确实收到了
  • 避免「以为发出去了,实际丢了」

四、WebSocket:Hub 连接管理

4.1 为什么用 Hub 模式

WebSocket 连接散落在各个 HTTP 请求里,需要一个中心来统一管理。

本项目用的是经典的 Hub 模式:

  • 数据结构:map[userId] → WebSocket 连接
  • 线程安全:RWMutex保护 map,单连接写操作用Mutex保护
  • 职责:注册连接、移除连接、按 userId 推送消息

4.2 连接建立流程

路由:GET /ws/order?token=xxx

流程:

  1. 从 URL 参数或 Header 取 JWT token
  2. 解析 token 得到userId
  3. HTTP Upgrade 为 WebSocket
  4. hub.Register(userId, conn)注册连接
  5. 阻塞ReadMessage保持连接,断开时hub.Unregister

为什么 token 放 query 参数?

浏览器 WebSocket 无法自定义 Header,所以 token 只能放 URL 里。

4.3 心跳保活

  • 服务端每 30 秒发 Ping 帧
  • 60 秒无响应则断开连接
  • 防止「僵尸连接」占用 Hub 内存

4.4 重复连接处理

同一userId重复连接时,关闭旧连接,只保留最新一条。

避免多端登录导致重复推送。

4.5 Unregister 的坑

移除连接时,要校验 conn 指针是否匹配,避免误删新连接:

旧连接断开 → 触发 Unregister

但此时用户可能已经建立了新连接

如果不校验指针,会把新连接也删掉


五、MQ 消费者 → WebSocket 推送

api-gateway 启动时,在 HTTP 服务之前启动 RabbitMQ 消费者:

  1. 连接 RabbitMQ,绑定order.notify.queue
  2. 收到消息后解析OrderGrabbedEvent
  3. 调用hub.Push(userId, body)推送给在线乘客
  4. 手动 Ack 确认消费

推送逻辑:

  • 乘客在线 → 写入 WebSocket → 返回 true
  • 乘客离线 → 返回 false,MVP 方案直接 Ack,不做离线补偿

当前是 单实例 Gateway MVP 方案:

Hub 存在内存里,多实例部署需要额外方案(如 Redis Pub/Sub 广播到各实例 Hub)。


六、完整时序

乘客 api-gateway RabbitMQ rpc-order

│ │ │ │

│── GET /ws/order ────────►│ │ │

│◄── WebSocket 建立 ───────│ │ │

│ │ hub.Register(userId) │ │

│ │ │ │

│ │ │ │◄── 司机抢单

│ │ │ │── MySQL 落库

│ │ │◄── Publish ───────│

│ │◄── Consume ──────────│ │

│ │ hub.Push(userId) │ │

│◄── JSON 推送 ────────────│ │ │

│ {event: driver_accepted}│ │ │


七、关键设计决策

7.1 为什么不用 HTTP 轮询

WebSocket 是全双工长连接,服务端可以主动推。

对于「等司机接单」这种实时场景,比轮询更合适。

7.2 为什么 rpc-order 不直接推 WebSocket

  • 微服务职责分离:order 服务管订单,gateway 管连接
  • order 服务不知道乘客连在哪台 gateway 上
  • MQ 解耦:order 只管发事件,gateway 只管推连接

7.3 离线不补偿

乘客没开等待页或已离开,MQ 消息仍然 Ack。

后续可以扩展:

  • 离线消息存 Redis / DB
  • 乘客下次登录时拉取
  • 或配合 App Push 通知

7.4 手动 Ack vs 自动 Ack

消费者用手动 Ack:

  • 业务处理成功才确认
  • 失败可以 Nack 重新入队
  • JSON 格式错误等不可恢复的错误,直接 Ack 丢弃,避免无限重试

八、今天学到的核心知识点

  1. RabbitMQ Direct 路由模式:Exchange + Routing Key + Queue 三件套
  2. Publisher Confirm:发送端确认 Broker 收到消息
  3. WebSocket Hub 模式:集中管理连接,按 userId 精准推送
  4. 跨服务实时通知:MQ 做桥梁,解耦生产者和推送端
  5. JWT + WebSocket:token 放 query 参数,因为浏览器 WS 不能自定义 Header
  6. 心跳保活:Ping/Pong + ReadDeadline,防止僵尸连接
  7. 并发安全:Hub 用 RWMutex,单连接写用 Mutex

九、后续可优化方向

  • Gateway 多实例:Hub 改 Redis Pub/Sub 或专用推送服务
  • 离线消息补偿:MQ 消费时落库,乘客上线后补推
  • RabbitMQ 连接池:当前每次发布建独立连接,生产环境应复用
  • 生产环境 CheckOrigin:当前开发环境允许所有跨域,上线需校验 Origin

十、总结

这套方案的本质是:用 RabbitMQ 打通微服务之间的实时通知链路,用 WebSocket 打通服务端到浏览器的最后一跳。

乘客体验上,从「每隔几秒刷新看看有没有司机接单」,变成「司机一点接单,页面立刻弹出通知」——这就是 RabbitMQ + WebSocket 在这个项目里的价值。

相关新闻

  • OpenClaw(龙虾)2026 最新安装部署终极指南
  • 中小微企业建站首选!PageAdmin CMS,零代码搞定官网运维
  • chunk重叠overlap设多少:切断上下文的坑

最新新闻

  • 卡美德生物科普:CD40L(免疫信号信使)
  • 内衣、家居服品牌如何突破“万级SKU”管理难题?
  • 读论文:IoTGA-SRC²,如何让遗传算法更懂 deadline?
  • 终极指南:3步免费解决广色域显示器色彩过饱和问题
  • 7大场景揭秘:为什么iTransformer是时间序列预测的最佳选择?
  • 如何永久保存微信聊天记录?这款免费工具让你真正拥有数据主权

日新闻

  • 【计算机毕业设计案例】基于 Spring Boot+Vue 的电影售票系统设计与实现 前后端分离架构下影院在线购票管理平台(程序+文档+讲解+定制)
  • 到底 TMD 用哪个: npm, pnpm, Yarn, Bun, Deno? 傻瓜, 当然用 npm 啦
  • Google限制Meta使用Gemini模型 凸显AI授权竞争白热化

周新闻

  • Windows字体自定义终极方案:No!! MeiryoUI完全指南
  • Deepin Boot Maker:告别命令行,3分钟制作Linux启动盘的智能解决方案
  • Plain Craft Launcher 2:重新定义你的Minecraft游戏体验

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号