《金融支付架构实战指南》一书讨论了分布式事务。这里研究RocketMQ事务消息方案。
在支付系统链路中,支付结果通知、订单状态变更、账户余额扣减、优惠券核销分属不同微服务,跨库操作天然面临分布式一致性难题。本地事务无法跨数据源生效,TCC 开发成本高、侵入业务代码,而 RocketMQ 事务消息凭借半消息 + 回调回查机制,成为支付场景轻量化落地最终一致性的主流选型。但原生事务消息暗藏致命空回滚隐患:半消息持久化 Broker 后服务宕机,本地事务滞后执行极易出现「数据库业务落库、消息被回滚销毁」,导致订单与通知脱节、资损隐患。本文结合线上支付落地经验,基于数据库主键占位思想,落地两套可直接投产的 RocketMQ 事务消息方案,适配支付回调、代付扣款两大业务场景,附带完整源码与异常全链路验证。
一、原生RocketMQ事务消息最大致命漏洞
1. 危险场景(原生代码必崩)
完整致命时序:
- Producer 发送Half半消息→ Broker 持久化成功
- 此时机器线程卡顿
- executeLocalTransaction 完全没执行、一行代码都没跑
- Broker 定时回查checkLocalTransaction
- 系统查不到订单、查不到事务记录 → 代码误判返回ROLLBACK
- Broker 删除消息,事务作废
- 服务重启后,之前卡住的 executeLocalTransaction 继续执行成功
最终致命数据不一致:
2. 问题根源
RocketMQ 回查机制无法区分两种状态:
原生写法统一判定为「事务失败」→ 误删消息 → 数据崩坏。
二、行业标准终极解决方案(无BUG方案)
核心原理
回查发现无事务记录时,主动插入一条事务占位记录!
目的:
完整闭环时序(100%无漏洞)
- 发送半消息成功
- 服务宕机,本地事务未执行
- Broker 触发回查
- 回查发现无事务记录 →主动 insert 占位记录
- 返回 UNKNOWN,继续等待
- 服务恢复,之前的 executeLocalTransaction 开始执行
- 执行插入事务日志 →主键冲突异常
- 本地事务整体回滚,业务不生成订单
- 下次回查发现事务失败 → 真正 ROLLBACK 消息
最终绝对一致:业务失败、消息回滚,完全匹配
三、数据库表结构
sql CREATE TABLE tx_transaction_log ( tx_id VARCHAR(64) NOT NULL COMMENT '全局事务ID(订单号)', create_time DATETIME DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (tx_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='RocketMQ事务状态表(防空回滚占位)'; |
极简设计,只需要主键,不需要状态字段!靠主键唯一冲突解决所有问题。
四、完整生产级代码
1. 事务生产者
java @Service public class SeckillTxProducer {
@Autowired private RocketMQTemplate rocketMQTemplate;
public void sendSeckillTxMessage(String orderNo, String json) { Message<String> message = MessageBuilder.withPayload(json).build(); // 发送半消息 rocketMQTemplate.sendMessageInTransaction( "seckill_tx_topic", message, orderNo ); } } |
2. 核心事务监听器(最终无BUG版)
java @Component @RocketMQTransactionListener(producerGroup = "seckill_tx_group") public class SeckillTxListener implements RocketMQLocalTransactionListener {
@Autowired private OrderService orderService; @Autowired private TxLogMapper txLogMapper;
// ===================== 执行本地事务 ===================== @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String orderNo = (String) arg; SeckillOrder order = JSON.parseObject(msg.getPayload().toString(), SeckillOrder.class);
try { // 关键:插入事务占位记录 // 如果回查已经抢先插入,这里直接主键冲突 txLogMapper.insertTxLog(orderNo);
// 执行业务:创建订单 + 扣库存(本地事务) orderService.createOrderAndDeductStock(order);
// 业务成功,提交消息 return RocketMQLocalTransactionState.COMMIT; } catch (DuplicateKeyException e) { // 主键冲突 = 回查已经占位,本次事务作废 return RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } }
// ===================== 事务回查(核心防BUG逻辑) ===================== @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { SeckillOrder order = JSON.parseObject(msg.getPayload().toString(), SeckillOrder.class); String orderNo = order.getOrderNo();
// 1. 查询事务记录 Integer count = txLogMapper.countTxLog(orderNo);
if (count == null || count == 0) { // 【最核心代码】无记录 → 主动占位!! try { txLogMapper.insertTxLog(orderNo); } catch (DuplicateKeyException e) { // 并发回查插入,忽略 } // 占位成功,继续回查,不提交、不回滚 return RocketMQLocalTransactionState.UNKNOWN; }
// 2. 有记录 = 本地事务已执行过 boolean orderExist = orderService.isOrderExist(orderNo); if (orderExist) { return RocketMQLocalTransactionState.COMMIT; } else { return RocketMQLocalTransactionState.ROLLBACK; } } } |
3. 业务服务(本地事务)
java @Service public class OrderService {
@Autowired private OrderMapper orderMapper; @Autowired private StockMapper stockMapper;
@Transactional(rollbackFor = Exception.class) public void createOrderAndDeductStock(SeckillOrder order) { // 创建订单 orderMapper.insert(order); // 扣减库存 int rows = stockMapper.deductStock(order.getGoodsId()); if (rows <= 0) { throw new RuntimeException("库存不足"); } }
public boolean isOrderExist(String orderNo) { return orderMapper.selectByOrderNo(orderNo) != null; } } |
4. Mapper
java @Mapper public interface TxLogMapper { @Insert("INSERT INTO tx_transaction_log(tx_id) VALUES(#{txId})") void insertTxLog(@Param("txId") String txId);
@Select("SELECT count(1) FROM tx_transaction_log WHERE tx_id = #{txId}") Integer countTxLog(@Param("txId") String txId); } |
5. 消费者(幂等)
java @Component @RocketMQMessageListener(topic = "seckill_tx_topic", consumerGroup = "seckill_consumer_group") public class SeckillConsumer implements RocketMQListener<String> {
@Autowired private OrderService orderService;
@Override public void onMessage(String message) { SeckillOrder order = JSON.parseObject(message, SeckillOrder.class); // 幂等判断 if (!orderService.isOrderExist(order.getOrderNo())) { return; } // 执行下游业务:通知、积分、物流等 } } |
五、三套异常场景全覆盖
场景1:半消息成功,本地事务完全没执行
场景2:本地事务执行中,回查提前到来
场景3:本地事务执行失败抛出异常
六、总结
RocketMQ事务消息存在空回滚漏洞:半消息发送成功后若服务宕机,本地事务未执行,Broker回查不到数据会误判回滚;后续服务恢复,滞后的本地事务又会正常提交,导致业务成功、消息回滚的数据不一致问题。
生产解决方案:
1.在回查接口中,如果查询不到事务记录,主动插入一条事务占位数据,利用数据库主键唯一约束,让后续滞后执行的本地事务触发主键冲突强制回滚,彻底杜绝空回滚导致的数据不一致,保证分布式事务最终一致性。
2.保守做法:查不到就返回 UNKNOWN
如果业务极度敏感,开发人员不想在本地事务中耦合事务日志表,或者懒得建表,生产上常见的折中方案是:
回查时,如果查不到业务数据(订单不存在),返回 ROLLBACK。让 Broker 继续等,继续回查。
直到超过 RocketMQ 的最大回查次数(默认 15 次),可设置合适的回查次数,Broker 会自动回滚消息。
前提假设:如果本地事务没执行,由于前面分析的“线程不可能死而复生”,它以后也不会执行了。让 Broker 多等几次回查,虽然占用了 Broker 资源,但绝对安全,不会误杀。