一、业务背景
在网约车场景中,乘客下单后会进入「等待司机接单」页面。
如果只靠 HTTP 轮询(每隔几秒查一次订单状态),会有这些问题:
- 延迟高:轮询间隔越大,用户感知越慢
- 浪费资源:大量无效请求,服务器压力大
- 体验差:页面需要不断刷新或定时请求
更好的方案是:服务端主动推送。
乘客打开等待页时建立 WebSocket 长连接;司机抢单成功后,后端立刻把通知推送到乘客浏览器。
但这里有个架构问题:
- 抢单逻辑在 rpc-order 服务
- WebSocket 连接在 api-gateway 服务
两个服务不在同一进程,不能直接调用hub.Push()。
中间就需要一个消息队列做解耦——本项目用的是 RabbitMQ。
二、整体架构
完整链路如下:
司机抢单成功
↓
Redis Stream(异步落库)
↓
rpc-order 更新 MySQL
↓
发布 RabbitMQ 消息(OrderGrabbedEvent)
↓
api-gateway 消费消息
↓
WebSocket Hub 按 userId 精准推送
↓
乘客浏览器收到「司机已接单」通知
三个角色分工:
rpc-order
- 抢单、落库、发布 MQ 事件
RabbitMQ
- 跨服务传递「接单通知」
api-gateway
- 维护 WebSocket 连接,消费 MQ 并推送
三、RabbitMQ:Direct 路由模式
3.1 为什么选 Direct 路由模式
RabbitMQ 有多种交换机类型(fanout、topic、direct 等)。
本项目用的是 Direct 路由模式:
- 生产者发消息时带上 routing key
- 交换机按 key 精确匹配,把消息路由到对应队列
- 适合「一种事件类型 → 一个消费队列」的场景
3.2 拓扑结构
生产者(rpc-order)
──Publish──► Exchange: order.notify.exchange
│ routing key: order.grabbed
▼
Queue: order.notify.queue
│
消费者(api-gateway) ◄──Consume──┘
对应常量:
- 交换机:
order.notify.exchange - 路由键:
order.grabbed - 队列:
order.notify.queue
3.3 事件结构
消息体是一个 JSON 对象,主要字段:
event:固定为driver_accepted,前端用来判断消息类型user_id:乘客 ID,Gateway 用它查找 WebSocket 连接order_no:订单号driver_name:司机姓名car_number:车牌号car_type:车型rating:司机评分
前端拿到这些字段可以直接渲染,不需要再调接口。
3.4 发布时机
发布放在 MySQL 落库成功之后,保证「通知出去时,数据库已经是已接单状态」。
调用链:
GrabOrder(Lua 抢单)
→ Redis Stream
→ OrderGrabbedConsumer 消费
→ updateOrderGrabbed(MySQL)
→ publishOrderGrabbedNotify()
→ rmq.PublishOrderGrabbed()
设计要点:
- 从 MySQL 查订单和司机信息,不依赖 Redis 缓存(抢单后缓存已被删除)
- 发布失败只打日志,不影响 Stream 消费 ACK(通知与落库解耦)
3.5 Confirm 模式
发送端开启了 Publisher Confirm:
- 消息发出后等待 Broker 的 ACK
- 确保 RabbitMQ 确实收到了
- 避免「以为发出去了,实际丢了」
四、WebSocket:Hub 连接管理
4.1 为什么用 Hub 模式
WebSocket 连接散落在各个 HTTP 请求里,需要一个中心来统一管理。
本项目用的是经典的 Hub 模式:
- 数据结构:
map[userId] → WebSocket 连接 - 线程安全:
RWMutex保护 map,单连接写操作用Mutex保护 - 职责:注册连接、移除连接、按 userId 推送消息
4.2 连接建立流程
路由:GET /ws/order?token=xxx
流程:
- 从 URL 参数或 Header 取 JWT token
- 解析 token 得到
userId - HTTP Upgrade 为 WebSocket
hub.Register(userId, conn)注册连接- 阻塞
ReadMessage保持连接,断开时hub.Unregister
为什么 token 放 query 参数?
浏览器 WebSocket 无法自定义 Header,所以 token 只能放 URL 里。
4.3 心跳保活
- 服务端每 30 秒发 Ping 帧
- 60 秒无响应则断开连接
- 防止「僵尸连接」占用 Hub 内存
4.4 重复连接处理
同一userId重复连接时,关闭旧连接,只保留最新一条。
避免多端登录导致重复推送。
4.5 Unregister 的坑
移除连接时,要校验 conn 指针是否匹配,避免误删新连接:
旧连接断开 → 触发 Unregister
但此时用户可能已经建立了新连接
如果不校验指针,会把新连接也删掉
五、MQ 消费者 → WebSocket 推送
api-gateway 启动时,在 HTTP 服务之前启动 RabbitMQ 消费者:
- 连接 RabbitMQ,绑定
order.notify.queue - 收到消息后解析
OrderGrabbedEvent - 调用
hub.Push(userId, body)推送给在线乘客 - 手动 Ack 确认消费
推送逻辑:
- 乘客在线 → 写入 WebSocket → 返回 true
- 乘客离线 → 返回 false,MVP 方案直接 Ack,不做离线补偿
当前是 单实例 Gateway MVP 方案:
Hub 存在内存里,多实例部署需要额外方案(如 Redis Pub/Sub 广播到各实例 Hub)。
六、完整时序
乘客 api-gateway RabbitMQ rpc-order
│ │ │ │
│── GET /ws/order ────────►│ │ │
│◄── WebSocket 建立 ───────│ │ │
│ │ hub.Register(userId) │ │
│ │ │ │
│ │ │ │◄── 司机抢单
│ │ │ │── MySQL 落库
│ │ │◄── Publish ───────│
│ │◄── Consume ──────────│ │
│ │ hub.Push(userId) │ │
│◄── JSON 推送 ────────────│ │ │
│ {event: driver_accepted}│ │ │
七、关键设计决策
7.1 为什么不用 HTTP 轮询
WebSocket 是全双工长连接,服务端可以主动推。
对于「等司机接单」这种实时场景,比轮询更合适。
7.2 为什么 rpc-order 不直接推 WebSocket
- 微服务职责分离:order 服务管订单,gateway 管连接
- order 服务不知道乘客连在哪台 gateway 上
- MQ 解耦:order 只管发事件,gateway 只管推连接
7.3 离线不补偿
乘客没开等待页或已离开,MQ 消息仍然 Ack。
后续可以扩展:
- 离线消息存 Redis / DB
- 乘客下次登录时拉取
- 或配合 App Push 通知
7.4 手动 Ack vs 自动 Ack
消费者用手动 Ack:
- 业务处理成功才确认
- 失败可以 Nack 重新入队
- JSON 格式错误等不可恢复的错误,直接 Ack 丢弃,避免无限重试
八、今天学到的核心知识点
- RabbitMQ Direct 路由模式:Exchange + Routing Key + Queue 三件套
- Publisher Confirm:发送端确认 Broker 收到消息
- WebSocket Hub 模式:集中管理连接,按 userId 精准推送
- 跨服务实时通知:MQ 做桥梁,解耦生产者和推送端
- JWT + WebSocket:token 放 query 参数,因为浏览器 WS 不能自定义 Header
- 心跳保活:Ping/Pong + ReadDeadline,防止僵尸连接
- 并发安全:Hub 用 RWMutex,单连接写用 Mutex
九、后续可优化方向
- Gateway 多实例:Hub 改 Redis Pub/Sub 或专用推送服务
- 离线消息补偿:MQ 消费时落库,乘客上线后补推
- RabbitMQ 连接池:当前每次发布建独立连接,生产环境应复用
- 生产环境 CheckOrigin:当前开发环境允许所有跨域,上线需校验 Origin
十、总结
这套方案的本质是:用 RabbitMQ 打通微服务之间的实时通知链路,用 WebSocket 打通服务端到浏览器的最后一跳。
乘客体验上,从「每隔几秒刷新看看有没有司机接单」,变成「司机一点接单,页面立刻弹出通知」——这就是 RabbitMQ + WebSocket 在这个项目里的价值。