WebSocket + Netty 构建一个简易的聊天软件
前言
即时通讯(IM)是移动互联网时代的基础设施。从微信到钉钉,从直播弹幕到游戏同步,IM 早已无处不在。如何构建一个高性能、高可用的聊天系统,是每一位后端开发进阶之路上的必修课。
本文将以一个实际项目为例,讲解如何基于Netty + WebSocket + Spring Boot构建完整的聊天系统,涵盖实时消息、文件传输、离线消息的三层可靠性保障、消息幂等去重等实战话题。项目已在生产环境验证,可直接作为 IM 系统的脚手架参考。
一、整体架构
系统的核心模块是信令服务(Signaling Service)——它通过 Netty 管理所有 WebSocket 长连接,同时提供 REST API 用于历史消息查询和文件上传。
┌──────────────┐ HTTP/REST ┌──────────────┐ Dubbo ┌──────────────┐ │ uni-app │ ───────────────> │ API 网关 │ ──────────> │ 用户服务 │ │ (前端) │ │ (Gateway) │ │ (好友/登录) │ └──────┬───────┘ └──────────────┘ └──────────────┘ │ WebSocket ws:// ▼ ┌──────────────────────────────────────────────────────────────────────┐ │ 信令服务 (Signaling Service) │ │ │ │ ┌──────────────────────┐ ┌────────────────────────────────┐ │ │ │ Netty WebSocket │ │ REST Controller │ │ │ │ Server :9090 │ │ /api/chat/history │ │ │ │ /ws?token=jwt │ │ /api/chat/upload │ │ │ └──────────┬───────────┘ │ /api/monitor/offline/* │ │ │ │ └───────────────┬────────────────┘ │ │ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ 核心服务层 │ │ │ │ SessionManager → userId ↔ Channel 双向映射 │ │ │ │ ChatService → 消息持久化 + 三层离线保障 │ │ │ │ CallRecordService→ 通话记录管理 │ │ │ │ FileService → MinIO 对象存储 │ │ │ └──────────────────────────┬───────────────────────────┘ │ │ │ │ │ ┌──────────────┐ ┌──────▼───────┐ ┌──────────────────┐ │ │ │ PostgreSQL │ │ Redis │ │ MinIO (S3) │ │ │ │ 消息持久化 │ │ 在线状态 │ │ 图片/文件存储 │ │ │ │ 离线消息 DB │ │ 离线缓存 │ │ 预签名 URL │ │ │ │ 用户/好友 │ │ Pub/Sub │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────────┘ │ └──────────────────────────────────────────────────────────────────────┘
为什么选择 Netty?
| 对比 | Netty | Tomcat WebSocket (JSR 356) |
|---|---|---|
| 线程模型 | 全异步 Reactor,一个 EventLoop 管数千连接 | 每个连接一个线程,高并发下线程爆炸 |
| 内存管理 | 池化 DirectByteBuf,零拷贝 | 堆内存分配,GC 压力大 |
| 扩展性 | Pipeline 可编排任意处理器 | 注解驱动,灵活性有限 |
简单说:Netty 能用更少的线程支撑更多的连接。对于聊天这种长连接密集场景,这是决定性的优势。
二、Netty 服务器搭建
2.1 ServerBootstrap 配置
@Component @Slf4j public class NettyWebSocketServer { private EventLoopGroup bossGroup = new NioEventLoopGroup(2); private EventLoopGroup workerGroup = new NioEventLoopGroup(3); private Channel channel; @PostConstruct public void start() { ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast( new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerCompressionHandler(), new SignalingHandler(), // 核心业务处理器 new WebSocketServerProtocolHandler("/ws"), new IdleStateHandler(60, 0, 0) // 60s 空闲检测 ); } }); channel = bootstrap.bind(9090).channel(); } }Pipeline 各组件职责:
| 处理器 | 作用 |
|---|---|
HttpServerCodec | HTTP 编解码,WebSocket 握手依赖 HTTP 升级 |
HttpObjectAggregator | 将 HTTP 请求聚合成完整消息 |
WebSocketServerCompressionHandler | 数据帧压缩 |
SignalingHandler | 自定义处理器,处理所有业务消息 |
WebSocketServerProtocolHandler | 协议升级、帧聚合、关闭控制 |
IdleStateHandler | 60 秒无读则触发空闲事件,清理死连接 |
2.2 JWT 握手鉴权
WebSocket 标准 API 不支持自定义 Header,因此 Token 通过 URL 查询参数传递:
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { QueryStringDecoder decoder = new QueryStringDecoder(req.uri()); String token = decoder.parameters().get("token").get(0); Claims claims = JwtUtil.validateToken(token); Long userId = claims.get("userId", Long.class); // 注册会话 sessionManager.register(userId, ctx.channel()); // Redis 标记在线(60s TTL,分布式环境自动过期) redisTemplate.opsForValue().set("user:status:" + userId, "online", 60, TimeUnit.SECONDS); // 广播好友上线 redisTemplate.convertAndSend("channel:friend_status", statusJson); // 推送离线消息 chatService.pushOfflineMessages(userId); // 重写 URI 交给 WebSocketServerProtocolHandler req.setUri("/ws"); ctx.fireChannelRead(req.retain()); }三、会话管理:SessionManager
每个连接对应一个 NettyChannel,使用ConcurrentHashMap维护双向映射。
@Component public class SessionManager { private final ConcurrentHashMap<Long, Channel> userChannelMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Long> channelUserMap = new ConcurrentHashMap<>(); /** 注册会话,同一用户新连接会踢掉旧连接 */ public void register(Long userId, Channel channel) { Channel old = userChannelMap.put(userId, channel); if (old != null && old.isActive()) { old.writeAndFlush(new TextWebSocketFrame("{\"type\":\"kicked\"}")); old.close(); } channelUserMap.put(channel.id().toString(), userId); } public void sendMessage(Long userId, String message) { Channel ch = userChannelMap.get(userId); if (ch != null && ch.isActive()) { ch.writeAndFlush(new TextWebSocketFrame(message)); } } }设计要点:
同一个用户在多设备登录时,旧连接会被主动踢下线
channelUserMap反向索引解决channelInactive事件中如何知道哪个用户断开的问题不直接暴露 Channel 对象,避免业务代码误操作
四、消息协议设计
所有 WebSocket 消息使用统一 JSON 格式,type字段路由:
@Data public class SignalingMessage { private String type; // chat_message | call | sdp | ice | ... private Long fromUserId; private Long toUserId; private String content; // 聊天文本 private String messageType; // text | image | file | audio private String fileUrl; private String fileName; private Long fileSize; private Long msgId; // 服务端分配的 Snowflake ID private Long clientMsgId; // 客户端临时 ID(ACK 匹配) private String idempotentKey; // 幂等键(客户端去重用) // ... 信令字段(sdp, candidate, roomId 等) }核心消息类型:
| 类型 | 流向 | 说明 |
|---|---|---|
chat_message | C ↔ S | 聊天消息 |
chat_ack | S → C | 送达确认 |
offline_messages | S → C | 离线消息批量推送 |
mark_read | C → S | 已读回执 |
heartbeat / heartbeat_ack | C ↔ S | 心跳保活 |
五、聊天消息核心流程
5.1 消息发送到送达
发送方 信令服务 接收方 │ │ │ │── chat_message ───────>│ │ │ {toUserId,content} │ │ │ │── 1. 持久化到 PostgreSQL │ │ │ │ │<── chat_ack ──────────│ │ │ {msgId,clientMsgId} │ │ │ │── 2. 在线检查 │ │ │ ├── 在线 → WebSocket 转发│ │ │ └── 离线 → 离线消息存储 │ │ │ ↓ │ │ │ 写入DB+Redis(双写) │ │ │ 标记status=0(待推送) │private void handleChatMessage(SignalingMessage msg) { // 1. 持久化消息(Snowflake ID) ChatMessage saved = chatService.saveMessage(fromUserId, toUserId, ...); Long msgId = saved.getId(); // 2. 回 ACK 给发送方 sendAck(fromUserId, msgId, msg.getClientMsgId()); // 3. 构建转发消息 String forwardJson = buildForwardMessage(saved); if (sessionManager.isOnline(toUserId)) { // 在线:直接推送 sessionManager.sendMessage(toUserId, forwardJson); } else { // 离线:幂等双写 chatService.storeOfflineMessage(toUserId, forwardJson, msgId); } }5.2 ACK 确认机制
每条chat_message必须回chat_ack,否则发送方 UI 会停留在"发送中"状态:
前端发送时生成
clientMsgId(前端生成,如 UUID)服务端回 ACK 时携带
msgId + clientMsgId前端匹配到
clientMsgId后,本地消息从"发送中"→"已发送"10 秒无 ACK → 前端超时重试
六、离线消息的三层可靠性保障
离线消息必须在 Redis 崩了、服务重启、网络闪断等各种故障下100% 不丢。
整体设计
存储 读取 ┌─────────────────┐ ┌─────────────────┐ │ PostgreSQL │ │ Redis 优先 │ │ status=0/1/2 │<──────>│ (高吞吐) │ │ (持久化保底) │ │ │ └─────────────────┘ └─────────────────┘ │ │ │ DB 失败? │ Redis 挂了? │ 业务不中断 │ 自动降级 DB ▼ ▼ 消息不丢 ✓ 消息不丢 ✓
第一层:实时推送(用户上线时)
public void pushOfflineMessages(Long userId) { // 1. 优先从 Redis 拉取(高性能) List<String> messages = popFromRedis(userId); // 2. DB 补充拉取 status=0 的未推送消息(去重后合并) List<OfflineMessage> dbRecords = offlineMessageRepository .findByUserIdAndStatusOrderByCreateTimeAsc(userId, STATUS_UNPUSHED); // 3. WebSocket 推送 boolean ok = doPush(userId, messages); // 4. 推送成功 → 标记 DB status=1,Redis 已 leftPop 删除 if (ok) markAllPushed(dbRecords); }第二层:定时补偿(每 5 分钟)
即使第一层因网络瞬断等原因推送失败,定时任务会兜底:
@Component @RequiredArgsConstructor public class OfflineMessageCompensationTask { /** 每 5 分钟扫描未推送的离线消息重新投递 */ @Scheduled(initialDelay = 30_000, fixedRate = 300_000) public void compensateUndelivered() { chatService.resendUndeliveredMessages(); } /** 每日凌晨 3 点清理已推送的过期记录 */ @Scheduled(cron = "0 0 3 * * ?") public void cleanExpired() { chatService.cleanExpiredMessages(); } }public void resendUndeliveredMessages() { // 扫描 status=0 的消息,最多 200 条 List<OfflineMessage> pending = offlineMessageRepository .findByStatusOrderByCreateTimeAsc(STATUS_UNPUSHED, PageRequest.of(0, 200)); for (OfflineMessage msg : pending) { if (!sessionManager.isOnline(msg.getUserId())) continue; boolean ok = doPush(msg.getUserId(), msg.getContent()); if (ok) { offlineMessageRepository.markAsPushed(msg.getId(), now); // → status=1 } else { offlineMessageRepository.markAsFailed(msg.getId(), now); // → status=2, pushCount++ } } }第三层:监控告警
通过 REST API 暴露指标
| 端点 | 说明 |
|---|---|
GET /api/monitor/offline/stats | 全局未推送数、失败数、积压超 5 分钟数 |
GET /api/monitor/offline/backlog/{userId} | 指定用户积压 |
GET /api/monitor/offline/reconcile/{userId} | Redis vs DB 一致性对账 |
幂等性保证(消息不重不漏)
服务端幂等写入:
每条离线消息有全局唯一的idempotentKey = "offline:" + msgId(msgId 为 Snowflake ID):
@Transactional public void storeOfflineMessage(Long userId, String messageJson, Long msgId) { String key = "offline:" + msgId; // 数据层幂等判断 if (offlineMessageRepository.existsByIdempotentKey(key)) { log.debug("幂等跳过: {}", key); return; } OfflineMessage record = new OfflineMessage(); record.setUserId(userId); record.setContent(messageJson); record.setIdempotentKey(key); record.setMsgId(msgId); record.setStatus(OfflineMessage.STATUS_UNPUSHED); offlineMessageRepository.save(record); // Redis 加速 redisTemplate.opsForList().rightPush("offline:msgs:" + userId, messageJson); }客户端 LRU 去重:
前端维护一个最大 500 条的idempotentKey缓存,重复消息直接丢弃:
class IdempotentCache { constructor(maxSize = 500) { this.maxSize = maxSize this.cache = new Map() // 利用 Map 的插入顺序实现 LRU } add(key) { if (this.cache.has(key)) { this.cache.delete(key) // 重新插入以更新访问顺序 } else if (this.cache.size >= this.maxSize) { const oldest = this.cache.keys().next().value this.cache.delete(oldest) // 淘汰最久未访问的 } this.cache.set(key, true) } has(key) { return this.cache.has(key) } } // 在消息入口处去重 function handleChatMessage(data) { if (data.idempotentKey && idempotentCache.has(data.idempotentKey)) { return // 直接丢弃 } idempotentCache.add(data.idempotentKey) // ... 正常处理 }三层保障的可靠性矩阵:
| 故障场景 | 第一层(实时) | 第二层(补偿) | 结果 |
|---|---|---|---|
| Redis 崩溃 | 降级走 DB | ✅ | 不丢 |
| 推送时网络闪断 | ❌ | 5 分钟内重推 | 不丢 |
| 服务重启 | 上线触发推送 | ✅ | 不丢 |
| 消息重复发送 | 幂等跳过 | 幂等跳过 | 不重 |
| 极端并发 | LRU 兜底 | — | 不重 |
七、离线消息数据表设计
CREATE TABLE offline_message ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL, -- 接收者 content TEXT NOT NULL, -- 消息 JSON status INT DEFAULT 0, -- 0=待推送, 1=已推送, 2=推送失败 idempotent_key VARCHAR(255), -- 幂等键 msg_id BIGINT, -- 原始消息 ID push_count INT DEFAULT 0, -- 已重试次数 last_push_time TIMESTAMP, -- 最后推送时间 create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_offline_message_user ON offline_message(user_id); CREATE INDEX idx_offline_message_status ON offline_message(status); CREATE INDEX idx_offline_message_key ON offline_message(idempotent_key);
为什么不是简单的"先写 DB 再删"?
状态机设计(0→1→清理)让每条消息可追踪
失败原因可排查(pushCount 记录重试次数)
定时对账可以发现 DB 和 Redis 的数据不一致
八、心跳保活与连接清理
// 服务端:60 秒无数据则关闭 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { ctx.close(); } }// 前端:每 30 秒发一次心跳 function startHeartbeat() { heartbeatTimer = setInterval(() => { sendMessage({ type: 'heartbeat' }) }, 30000) }为什么前端间隔是 30 秒,服务端超时是 60 秒?
给网络抖动留一个缓冲窗口(丢一个心跳包没关系)
服务端超时 = 前端间隔 × 2,优雅容错
九、分布式部署:Redis Pub/Sub
当信令服务部署多实例时,用户 A 在实例 1 上线,用户 B 在实例 2 需要收到好友状态通知:
实例1 (user:1001 上线) Redis Pub/Sub 实例2 (user:1002 在线) │ │ │ │── set user:status:1001=online │ │ │── publish channel:friend_status ──────────────────> │ │ │ │ 收到消息 │ │ 通知 user:1002 │ 好友 1001 上线
// 状态变更时通过 Redis 广播 private void notifyFriendsStatus(Long userId, String status) { Map<String, Object> msg = Map.of( "type", "friend_status", "userId", userId, "status", status ); redisTemplate.convertAndSend("channel:friend_status", json(msg)); } // 订阅者:收到广播后推送给当前实例的在线用户 @Component public class RedisStatusSubscriber { public void onMessage(String message, String channel) { StatusMessage status = JSON.parseObject(message, StatusMessage.class); sessionManager.broadcastToAllFriends(status.getUserId(), message); } }十、文件消息:图片与文件传输
聊天中的图片和文件通过MinIO (S3 协议对象存储)处理,不走 WebSocket:
选择文件 → POST /api/chat/upload (multipart) → MinIO 存储 → 返回 fileUrl → WebSocket 发送 chat_message { messageType: "image", fileUrl, ... } → 接收方根据 messageType 渲染图片/文件卡片文件访问使用 MinIO 预签名 URL(7 天有效期),无需额外鉴权:
public String generatePresignedUrl(String objectKey) { return minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder() .bucket(bucketName) .object(objectKey) .expiry(7, TimeUnit.DAYS) .build()); }十一、完整消息流程图
┌──────┐ ┌──────────────┐ ┌──────┐ │ 发送方 │ │ 信令服务 │ │ 接收方 │ └──┬───┘ └──────┬───────┘ └──┬───┘ │ │ │ │── chat_message ──────────────>│ │ │ │── saveMessage() │ │ │── chat_ack ──────────────────> │ │<── chat_ack ────────────────│ │ │ │ │ │ │── 在线检查 │ │ │ ├── 在线 → 直接转发 ────────>│ │ │ └── 离线 │ │ │ ├── storeOfflineMessage() │ │ │ │ DB(幂等) + Redis │ │ │ │ status=0 │ │ │ │ │ (接收方上线) │ │ │ │<── WebSocket 握手 │ │ │── pushOfflineMessages() │ │ │ ├── Redis 优先拉取 │ │ │ ├── DB status=0 补拉 │ │ │ ├── WebSocket 推送 ────────>│ │ │ │ 带 idempotentKey │ │ │ └── 标记 DB status=1 │ │ │ │ │ (5 分钟后) │ │ │ │── [定时补偿] │ │ │ ├── 扫描 status=0 │ │ │ ├── 用户在线? 重推 ────────>│ │ │ └── 成功→status=1 │ │ │ 失败→status=2,count++ │
十二、总结
本文实现了
| 功能 | 技术方案 |
|---|---|
| 长连接管理 | Netty Reactor 模型 + Pipeline 编排 |
| 用户鉴权 | JWT 握手时验证 |
| 会话映射 | ConcurrentHashMap 双向索引 |
| 消息可靠送达 | ACK + 10s 超时重试 + clientMsgId |
| 离线消息不丢 | DB 持久化 + Redis 加速 + 状态机 |
| 定时补偿 | @Scheduled 每 5 分钟扫描 status=0 |
| 消息幂等 | idempotentKey 数据库去重 + 前端 LRU |
| 监控对账 | REST 端点暴露积压/失败/延迟指标 |
| 在线状态 | Redis TTL + Pub/Sub 跨实例广播 |
| 图片/文件 | MinIO 对象存储 + 预签名 URL |
| 心跳保活 | 30s 客户端心跳 + 60s 服务端 idle |
效果
进一步优化的方向
消息有序性:引入 sequenceId,客户端严格按序渲染
推送通道:集成 Web Push / APNs / FCM,App 后台时也能收到通知
读写分离:历史消息归档到只读库,缓解主库压力
多设备同步:引入 sync 机制,手机和 PC 的消息状态同步
消息加密:端到端加密(E2EE),服务端无法解密内容
项目技术栈
后端:Java 17 + Spring Boot 3.2 + Spring Cloud 2023 + Netty + Dubbo 3.x 存储:PostgreSQL + Redis + MinIO (S3) 注册:Nacos 2.x 前端:Vue 3 + uni-app + WebRTC
