当前位置: 首页 > news >正文

WebSocket + Netty 构建一个简易的聊天软件

前言

即时通讯(IM)是移动互联网时代的基础设施。从微信到钉钉,从直播弹幕到游戏同步,IM 早已无处不在。如何构建一个高性能、高可用的聊天系统,是每一位后端开发进阶之路上的必修课。

本文将以一个实际项目为例,讲解如何基于Netty + WebSocket + Spring Boot构建完整的聊天系统,涵盖实时消息、文件传输、离线消息的三层可靠性保障、消息幂等去重等实战话题。项目已在生产环境验证,可直接作为 IM 系统的脚手架参考。


一、整体架构

系统的核心模块是信令服务(Signaling Service)——它通过 Netty 管理所有 WebSocket 长连接,同时提供 REST API 用于历史消息查询和文件上传。

┌──────────────┐ HTTP/REST ┌──────────────┐ Dubbo ┌──────────────┐ │ uni-app │ ───────────────> │ API 网关 │ ──────────> │ 用户服务 │ │ (前端) │ │ (Gateway) │ │ (好友/登录) │ └──────┬───────┘ └──────────────┘ └──────────────┘ │ WebSocket ws:// ▼ ┌──────────────────────────────────────────────────────────────────────┐ │ 信令服务 (Signaling Service) │ │ │ │ ┌──────────────────────┐ ┌────────────────────────────────┐ │ │ │ Netty WebSocket │ │ REST Controller │ │ │ │ Server :9090 │ │ /api/chat/history │ │ │ │ /ws?token=jwt │ │ /api/chat/upload │ │ │ └──────────┬───────────┘ │ /api/monitor/offline/* │ │ │ │ └───────────────┬────────────────┘ │ │ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ 核心服务层 │ │ │ │ SessionManager → userId ↔ Channel 双向映射 │ │ │ │ ChatService → 消息持久化 + 三层离线保障 │ │ │ │ CallRecordService→ 通话记录管理 │ │ │ │ FileService → MinIO 对象存储 │ │ │ └──────────────────────────┬───────────────────────────┘ │ │ │ │ │ ┌──────────────┐ ┌──────▼───────┐ ┌──────────────────┐ │ │ │ PostgreSQL │ │ Redis │ │ MinIO (S3) │ │ │ │ 消息持久化 │ │ 在线状态 │ │ 图片/文件存储 │ │ │ │ 离线消息 DB │ │ 离线缓存 │ │ 预签名 URL │ │ │ │ 用户/好友 │ │ Pub/Sub │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────────┘ │ └──────────────────────────────────────────────────────────────────────┘

为什么选择 Netty?

对比NettyTomcat WebSocket (JSR 356)
线程模型全异步 Reactor,一个 EventLoop 管数千连接每个连接一个线程,高并发下线程爆炸
内存管理池化 DirectByteBuf,零拷贝堆内存分配,GC 压力大
扩展性Pipeline 可编排任意处理器注解驱动,灵活性有限

简单说:Netty 能用更少的线程支撑更多的连接。对于聊天这种长连接密集场景,这是决定性的优势。


二、Netty 服务器搭建

2.1 ServerBootstrap 配置

@Component @Slf4j public class NettyWebSocketServer { ​ private EventLoopGroup bossGroup = new NioEventLoopGroup(2); private EventLoopGroup workerGroup = new NioEventLoopGroup(3); private Channel channel; ​ @PostConstruct public void start() { ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast( new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerCompressionHandler(), new SignalingHandler(), // 核心业务处理器 new WebSocketServerProtocolHandler("/ws"), new IdleStateHandler(60, 0, 0) // 60s 空闲检测 ); } }); channel = bootstrap.bind(9090).channel(); } }

Pipeline 各组件职责

处理器作用
HttpServerCodecHTTP 编解码,WebSocket 握手依赖 HTTP 升级
HttpObjectAggregator将 HTTP 请求聚合成完整消息
WebSocketServerCompressionHandler数据帧压缩
SignalingHandler自定义处理器,处理所有业务消息
WebSocketServerProtocolHandler协议升级、帧聚合、关闭控制
IdleStateHandler60 秒无读则触发空闲事件,清理死连接

2.2 JWT 握手鉴权

WebSocket 标准 API 不支持自定义 Header,因此 Token 通过 URL 查询参数传递:

private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { QueryStringDecoder decoder = new QueryStringDecoder(req.uri()); String token = decoder.parameters().get("token").get(0); Claims claims = JwtUtil.validateToken(token); Long userId = claims.get("userId", Long.class); ​ // 注册会话 sessionManager.register(userId, ctx.channel()); // Redis 标记在线(60s TTL,分布式环境自动过期) redisTemplate.opsForValue().set("user:status:" + userId, "online", 60, TimeUnit.SECONDS); // 广播好友上线 redisTemplate.convertAndSend("channel:friend_status", statusJson); // 推送离线消息 chatService.pushOfflineMessages(userId); // 重写 URI 交给 WebSocketServerProtocolHandler req.setUri("/ws"); ctx.fireChannelRead(req.retain()); }

三、会话管理:SessionManager

每个连接对应一个 NettyChannel,使用ConcurrentHashMap维护双向映射。

@Component public class SessionManager { private final ConcurrentHashMap<Long, Channel> userChannelMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Long> channelUserMap = new ConcurrentHashMap<>(); ​ /** 注册会话,同一用户新连接会踢掉旧连接 */ public void register(Long userId, Channel channel) { Channel old = userChannelMap.put(userId, channel); if (old != null && old.isActive()) { old.writeAndFlush(new TextWebSocketFrame("{\"type\":\"kicked\"}")); old.close(); } channelUserMap.put(channel.id().toString(), userId); } ​ public void sendMessage(Long userId, String message) { Channel ch = userChannelMap.get(userId); if (ch != null && ch.isActive()) { ch.writeAndFlush(new TextWebSocketFrame(message)); } } }

设计要点

  • 同一个用户在多设备登录时,旧连接会被主动踢下线

  • channelUserMap反向索引解决channelInactive事件中如何知道哪个用户断开的问题

  • 不直接暴露 Channel 对象,避免业务代码误操作


四、消息协议设计

所有 WebSocket 消息使用统一 JSON 格式,type字段路由:

@Data public class SignalingMessage { private String type; // chat_message | call | sdp | ice | ... private Long fromUserId; private Long toUserId; private String content; // 聊天文本 private String messageType; // text | image | file | audio private String fileUrl; private String fileName; private Long fileSize; private Long msgId; // 服务端分配的 Snowflake ID private Long clientMsgId; // 客户端临时 ID(ACK 匹配) private String idempotentKey; // 幂等键(客户端去重用) // ... 信令字段(sdp, candidate, roomId 等) }

核心消息类型

类型流向说明
chat_messageC ↔ S聊天消息
chat_ackS → C送达确认
offline_messagesS → C离线消息批量推送
mark_readC → S已读回执
heartbeat / heartbeat_ackC ↔ S心跳保活

五、聊天消息核心流程

5.1 消息发送到送达

发送方 信令服务 接收方 │ │ │ │── chat_message ───────>│ │ │ {toUserId,content} │ │ │ │── 1. 持久化到 PostgreSQL │ │ │ │ │<── chat_ack ──────────│ │ │ {msgId,clientMsgId} │ │ │ │── 2. 在线检查 │ │ │ ├── 在线 → WebSocket 转发│ │ │ └── 离线 → 离线消息存储 │ │ │ ↓ │ │ │ 写入DB+Redis(双写) │ │ │ 标记status=0(待推送) │
private void handleChatMessage(SignalingMessage msg) { // 1. 持久化消息(Snowflake ID) ChatMessage saved = chatService.saveMessage(fromUserId, toUserId, ...); Long msgId = saved.getId(); ​ // 2. 回 ACK 给发送方 sendAck(fromUserId, msgId, msg.getClientMsgId()); ​ // 3. 构建转发消息 String forwardJson = buildForwardMessage(saved); ​ if (sessionManager.isOnline(toUserId)) { // 在线:直接推送 sessionManager.sendMessage(toUserId, forwardJson); } else { // 离线:幂等双写 chatService.storeOfflineMessage(toUserId, forwardJson, msgId); } }

5.2 ACK 确认机制

每条chat_message必须回chat_ack,否则发送方 UI 会停留在"发送中"状态:

  • 前端发送时生成clientMsgId(前端生成,如 UUID)

  • 服务端回 ACK 时携带msgId + clientMsgId

  • 前端匹配到clientMsgId后,本地消息从"发送中"→"已发送"

  • 10 秒无 ACK → 前端超时重试


六、离线消息的三层可靠性保障

离线消息必须在 Redis 崩了、服务重启、网络闪断等各种故障下100% 不丢

整体设计

存储 读取 ┌─────────────────┐ ┌─────────────────┐ │ PostgreSQL │ │ Redis 优先 │ │ status=0/1/2 │<──────>│ (高吞吐) │ │ (持久化保底) │ │ │ └─────────────────┘ └─────────────────┘ │ │ │ DB 失败? │ Redis 挂了? │ 业务不中断 │ 自动降级 DB ▼ ▼ 消息不丢 ✓ 消息不丢 ✓

第一层:实时推送(用户上线时)

public void pushOfflineMessages(Long userId) { // 1. 优先从 Redis 拉取(高性能) List<String> messages = popFromRedis(userId); // 2. DB 补充拉取 status=0 的未推送消息(去重后合并) List<OfflineMessage> dbRecords = offlineMessageRepository .findByUserIdAndStatusOrderByCreateTimeAsc(userId, STATUS_UNPUSHED); // 3. WebSocket 推送 boolean ok = doPush(userId, messages); // 4. 推送成功 → 标记 DB status=1,Redis 已 leftPop 删除 if (ok) markAllPushed(dbRecords); }

第二层:定时补偿(每 5 分钟)

即使第一层因网络瞬断等原因推送失败,定时任务会兜底:

@Component @RequiredArgsConstructor public class OfflineMessageCompensationTask { /** 每 5 分钟扫描未推送的离线消息重新投递 */ @Scheduled(initialDelay = 30_000, fixedRate = 300_000) public void compensateUndelivered() { chatService.resendUndeliveredMessages(); } /** 每日凌晨 3 点清理已推送的过期记录 */ @Scheduled(cron = "0 0 3 * * ?") public void cleanExpired() { chatService.cleanExpiredMessages(); } }
public void resendUndeliveredMessages() { // 扫描 status=0 的消息,最多 200 条 List<OfflineMessage> pending = offlineMessageRepository .findByStatusOrderByCreateTimeAsc(STATUS_UNPUSHED, PageRequest.of(0, 200)); for (OfflineMessage msg : pending) { if (!sessionManager.isOnline(msg.getUserId())) continue; boolean ok = doPush(msg.getUserId(), msg.getContent()); if (ok) { offlineMessageRepository.markAsPushed(msg.getId(), now); // → status=1 } else { offlineMessageRepository.markAsFailed(msg.getId(), now); // → status=2, pushCount++ } } }

第三层:监控告警

通过 REST API 暴露指标

端点说明
GET /api/monitor/offline/stats全局未推送数、失败数、积压超 5 分钟数
GET /api/monitor/offline/backlog/{userId}指定用户积压
GET /api/monitor/offline/reconcile/{userId}Redis vs DB 一致性对账

幂等性保证(消息不重不漏)

服务端幂等写入

每条离线消息有全局唯一的idempotentKey = "offline:" + msgId(msgId 为 Snowflake ID):

@Transactional public void storeOfflineMessage(Long userId, String messageJson, Long msgId) { String key = "offline:" + msgId; // 数据层幂等判断 if (offlineMessageRepository.existsByIdempotentKey(key)) { log.debug("幂等跳过: {}", key); return; } OfflineMessage record = new OfflineMessage(); record.setUserId(userId); record.setContent(messageJson); record.setIdempotentKey(key); record.setMsgId(msgId); record.setStatus(OfflineMessage.STATUS_UNPUSHED); offlineMessageRepository.save(record); // Redis 加速 redisTemplate.opsForList().rightPush("offline:msgs:" + userId, messageJson); }

客户端 LRU 去重

前端维护一个最大 500 条的idempotentKey缓存,重复消息直接丢弃:

class IdempotentCache { constructor(maxSize = 500) { this.maxSize = maxSize this.cache = new Map() // 利用 Map 的插入顺序实现 LRU } add(key) { if (this.cache.has(key)) { this.cache.delete(key) // 重新插入以更新访问顺序 } else if (this.cache.size >= this.maxSize) { const oldest = this.cache.keys().next().value this.cache.delete(oldest) // 淘汰最久未访问的 } this.cache.set(key, true) } has(key) { return this.cache.has(key) } } // 在消息入口处去重 function handleChatMessage(data) { if (data.idempotentKey && idempotentCache.has(data.idempotentKey)) { return // 直接丢弃 } idempotentCache.add(data.idempotentKey) // ... 正常处理 }

三层保障的可靠性矩阵

故障场景第一层(实时)第二层(补偿)结果
Redis 崩溃降级走 DB不丢
推送时网络闪断5 分钟内重推不丢
服务重启上线触发推送不丢
消息重复发送幂等跳过幂等跳过不重
极端并发LRU 兜底不重

七、离线消息数据表设计

CREATE TABLE offline_message ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL, -- 接收者 content TEXT NOT NULL, -- 消息 JSON status INT DEFAULT 0, -- 0=待推送, 1=已推送, 2=推送失败 idempotent_key VARCHAR(255), -- 幂等键 msg_id BIGINT, -- 原始消息 ID push_count INT DEFAULT 0, -- 已重试次数 last_push_time TIMESTAMP, -- 最后推送时间 create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_offline_message_user ON offline_message(user_id); CREATE INDEX idx_offline_message_status ON offline_message(status); CREATE INDEX idx_offline_message_key ON offline_message(idempotent_key);

为什么不是简单的"先写 DB 再删"?

  • 状态机设计(0→1→清理)让每条消息可追踪

  • 失败原因可排查(pushCount 记录重试次数)

  • 定时对账可以发现 DB 和 Redis 的数据不一致


八、心跳保活与连接清理

// 服务端:60 秒无数据则关闭 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { ctx.close(); } }
// 前端:每 30 秒发一次心跳 function startHeartbeat() { heartbeatTimer = setInterval(() => { sendMessage({ type: 'heartbeat' }) }, 30000) }

为什么前端间隔是 30 秒,服务端超时是 60 秒?

  • 给网络抖动留一个缓冲窗口(丢一个心跳包没关系)

  • 服务端超时 = 前端间隔 × 2,优雅容错


九、分布式部署:Redis Pub/Sub

当信令服务部署多实例时,用户 A 在实例 1 上线,用户 B 在实例 2 需要收到好友状态通知:

实例1 (user:1001 上线) Redis Pub/Sub 实例2 (user:1002 在线) │ │ │ │── set user:status:1001=online │ │ │── publish channel:friend_status ──────────────────> │ │ │ │ 收到消息 │ │ 通知 user:1002 │ 好友 1001 上线
// 状态变更时通过 Redis 广播 private void notifyFriendsStatus(Long userId, String status) { Map<String, Object> msg = Map.of( "type", "friend_status", "userId", userId, "status", status ); redisTemplate.convertAndSend("channel:friend_status", json(msg)); } // 订阅者:收到广播后推送给当前实例的在线用户 @Component public class RedisStatusSubscriber { public void onMessage(String message, String channel) { StatusMessage status = JSON.parseObject(message, StatusMessage.class); sessionManager.broadcastToAllFriends(status.getUserId(), message); } }

十、文件消息:图片与文件传输

聊天中的图片和文件通过MinIO (S3 协议对象存储)处理,不走 WebSocket:

选择文件 → POST /api/chat/upload (multipart) → MinIO 存储 → 返回 fileUrl → WebSocket 发送 chat_message { messageType: "image", fileUrl, ... } → 接收方根据 messageType 渲染图片/文件卡片

文件访问使用 MinIO 预签名 URL(7 天有效期),无需额外鉴权:

public String generatePresignedUrl(String objectKey) { return minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder() .bucket(bucketName) .object(objectKey) .expiry(7, TimeUnit.DAYS) .build()); }

十一、完整消息流程图

┌──────┐ ┌──────────────┐ ┌──────┐ │ 发送方 │ │ 信令服务 │ │ 接收方 │ └──┬───┘ └──────┬───────┘ └──┬───┘ │ │ │ │── chat_message ──────────────>│ │ │ │── saveMessage() │ │ │── chat_ack ──────────────────> │ │<── chat_ack ────────────────│ │ │ │ │ │ │── 在线检查 │ │ │ ├── 在线 → 直接转发 ────────>│ │ │ └── 离线 │ │ │ ├── storeOfflineMessage() │ │ │ │ DB(幂等) + Redis │ │ │ │ status=0 │ │ │ │ │ (接收方上线) │ │ │ │<── WebSocket 握手 │ │ │── pushOfflineMessages() │ │ │ ├── Redis 优先拉取 │ │ │ ├── DB status=0 补拉 │ │ │ ├── WebSocket 推送 ────────>│ │ │ │ 带 idempotentKey │ │ │ └── 标记 DB status=1 │ │ │ │ │ (5 分钟后) │ │ │ │── [定时补偿] │ │ │ ├── 扫描 status=0 │ │ │ ├── 用户在线? 重推 ────────>│ │ │ └── 成功→status=1 │ │ │ 失败→status=2,count++ │

十二、总结

本文实现了

功能技术方案
长连接管理Netty Reactor 模型 + Pipeline 编排
用户鉴权JWT 握手时验证
会话映射ConcurrentHashMap 双向索引
消息可靠送达ACK + 10s 超时重试 + clientMsgId
离线消息不丢DB 持久化 + Redis 加速 + 状态机
定时补偿@Scheduled 每 5 分钟扫描 status=0
消息幂等idempotentKey 数据库去重 + 前端 LRU
监控对账REST 端点暴露积压/失败/延迟指标
在线状态Redis TTL + Pub/Sub 跨实例广播
图片/文件MinIO 对象存储 + 预签名 URL
心跳保活30s 客户端心跳 + 60s 服务端 idle

效果

进一步优化的方向

  1. 消息有序性:引入 sequenceId,客户端严格按序渲染

  2. 推送通道:集成 Web Push / APNs / FCM,App 后台时也能收到通知

  3. 读写分离:历史消息归档到只读库,缓解主库压力

  4. 多设备同步:引入 sync 机制,手机和 PC 的消息状态同步

  5. 消息加密:端到端加密(E2EE),服务端无法解密内容


项目技术栈

后端:Java 17 + Spring Boot 3.2 + Spring Cloud 2023 + Netty + Dubbo 3.x 存储:PostgreSQL + Redis + MinIO (S3) 注册:Nacos 2.x 前端:Vue 3 + uni-app + WebRTC
http://www.rkmt.cn/news/1419519.html

相关文章:

  • AI驱动的社交聚合平台:重构信息消费体验,对抗虚假信息
  • 【AI大模型应用开发工程师特训笔记】第04讲(第7章):函数与模块
  • 2026年青岛本地靠谱搬家服务机构推荐:山东臻品老兵搬家有限公司青岛分公司 - 海棠依旧大
  • 高德地图 Flutter 插件:跨 Android / iOS / HarmonyOS 的完整实现
  • 别再死记硬背了!用74LS74和74LS76芯片,手把手教你玩转D、JK、T触发器转换(附波形图分析)
  • Cocos学习笔记:自定义字体、骨骼动画与项目架构
  • 搞定7nm DRC收敛:一份来自Innovus和ICC2实战的避坑清单(附脚本)
  • 告别乱码!实测三款主流Java反编译工具(JD-GUI、Luyten、Jadx)的导出源码对比
  • 用STM32CubeIDE搞定TB6612驱动GB37-520电机:从引脚配置到PWM频率计算全流程
  • fselect:用类SQL语句查找文件
  • AI 告诉你代码安全,它在骗你!
  • PS如何提高照片清晰度?3个方法零基础也能快速搞定高清修图
  • GPT5.5对Gemini3.5对DeepSeekV4编程能力横评
  • 别再死记硬背build.gradle了!用Groovy闭包和DSL思维,5分钟看懂Gradle配置的本质
  • 不只是VMware:开启AMD-V后,你的Win10/Win11还能玩转这些虚拟化工具
  • AI与机器学习驱动的智能运营:从数据到决策的自动化闭环
  • 别再只用洞洞板了!用嘉立创EDA+370电机,低成本搞定POV旋转LED全套硬件
  • 保姆级教空间转录组分析| 01. 绪论
  • 从5篇高温合金文章到16层协议:一个工业AI知识萃取的方法论
  • 用N32G031的TIM1驱动无刷电机:从寄存器配置互补PWM到死区时间实战避坑
  • Elasticsearch聚合分析实战
  • FreeRTOS性能调优利器:用SystemView揪出任务阻塞和中断延迟的元凶
  • 学习导师:从工具模式到感知模式的整合
  • LogAnalyzer实战:除了看系统日志,我这样用它监控Nginx访问和MySQL慢查询
  • AI赋能客户体验:从智能客服到预测性服务的实战指南
  • 别再混淆了!用Python的sklearn手把手教你算多分类的Precision、Recall和Accuracy
  • 164-基于Python的甜点销售数据可视化分析系统
  • ♪苍穹外卖♪Day2 | 项目日记
  • Hermes Agent 完全使用指南:从安装到多平台部署的全流程教程
  • 战略落地难?试试分拆对