尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

消息队列--消息顺序性保障

消息队列--消息顺序性保障
📅 发布时间:2026/6/19 6:49:36

目录

参考美团技术团队博客

项目中实现思路


参考美团技术团队博客

两种通用的解决方案: 1. 版本号。 2. 状态机。

版本号

举个简单的例子,一个产品的状态有上线/下线状态。如果消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,如果不做重复性判断,显然最终状态是错误的。 但是,如果每个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。如果再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。 每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以接收,同时更新版本号为2.当另一条下线消息到来时,如果版本号是3.则是真实的下线消息。如果是1,则是重复投递的消息。 如果业务方只关心消息重复不重复,那么问题就已经解决了。但很多时候另一个头疼的问题来了,就是消息顺序如果和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则最后会发生状态错误。 参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。 如果到来的顺序是21,则先把2存起来,待1到来后,先处理1,再处理2,这样重复性和顺序性要求就都达到了。

。。。

基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:

  1. 对发送方必须要求消息带业务版本号。
  2. 下游必须存储消息的版本号,对于要严格保证顺序的。

https://tech.meituan.com/2016/07/01/mq-design.htmlhttps://tech.meituan.com/2016/07/01/mq-design.html其中参考主要思想:

参考TCP/IP协议,如果想让乱序的消息最后能够正确的被组织,那么就应该只接收比当前版本号大一的消息。并且在一个session周期内要一直保存各个消息的版本号。 如果到来的顺序是21,则先把2存起来,待1到来后,先处理1,再处理2,这样重复性和顺序性要求就都达到了。

项目中实现思路

消息的顺序性保障不靠中间件去实施,在消费者这一侧来实现消息的顺序性。

一个broker的topic=xxxx 是专门用来发顺序消息的

消费者这边,收到消息之后全部落库不处理,落库成功之后返回ack。利用mysql存消息

数据库这边,business_key + version有唯一索引。重复消息被唯一索引幂等掉。

假设发的时 2 1 3 1 4,数据库真实存储的是2 1 3 4。

消费者这边处理的逻辑(伪代码)

while(ture) { // 查询所有未处理的消息,按bussiness_key 和 version 升序排列 List<Message> messages = DB.query("SELECT * FROM messages WHERE processed = FALSE ORDER BY business_key, version ASC"); for(Message msg : messages) { String businessKey = msg.getBusinessKey(); int version = msg.getVersion(); // 检查是否有上一版本号未处理 boolean preVersionProcessed = DB.esists( "SELECT 1 FROM messages WHERE business_key = ? AND version = ? AND processed = TRUE", businessKey, version - 1 ); // 如果是version=1,直接处理;或者上一个版本已经处理了,也可以处理当前版本 if (version == 1 || preVersionProcessed) { processMessage(msg); // 标记消息已处理 DB.update("UPDATE message SET processed = TRUE WHERE id = ?", msg.getId()); } else { continue; } } sleep(5000); }

优化点:避免重复读取,可以使用分布式锁,或者加行级锁,FOR UPDATE。加上事务机制。

List<Message> messages = DB.query("SELECT * FROM messages WHERE processed = FALSE ORDER BY business_key, version ASC");

相关新闻

  • 2025最新!8个AI论文平台测评:继续教育写论文不再难
  • 实力强的腾讯广告专业服务商推荐,为品牌营销保驾护航
  • 算力饱和打击:无人机蜂群的实时三维重建如何碾碎战场迷雾 - 品牌2025

最新新闻

  • LLM前摄干扰缺陷:为什么大模型无法准确追踪最新数据
  • Narou.rb:日本网络小说下载与管理的终极解决方案
  • 2026专业奢侈品回收综合实力榜 透明报价与口碑双优 - 工业品牌热点
  • Apkmod安全注意事项:合法使用APK逆向工程工具的道德和法律边界
  • HDPE双壁波纹管行业实力风云榜,2026口碑供应商横评 - mypinpai
  • Wox终极指南:如何用跨平台启动器提升10倍工作效率

日新闻

  • 5分钟掌握Python进化算法:Geatpy高性能优化工具完全指南
  • Microchip 24AA044 EEPROM选型与应用全指南:从参数解析到实战编程
  • 华为的鸿蒙到底有多牛?为什么称作遥遥领先?

周新闻

  • 3步解锁iOS设备:applera1n激活锁绕过完全指南
  • 39 2026 人工智能证书终极盘点,普通人选 AI 证书可以从这些方向入手
  • Redis 暴露公网有多危险?从端口检查到补救步骤

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号