本文目录导读:

基于消息的最终一致性事务,是分布式系统中解决跨服务数据一致性问题的一种常见模式,它的核心思想是:通过可靠的异步消息传递,结合本地事务和补偿机制,保证数据在最终时刻是一致的。
以下是实现该模式的完整架构、核心步骤、关键问题及代码示例。
核心架构
通常包含三个主要角色:
- 主动方服务:发起事务的服务,它先执行本地事务并发送消息。
- 消息中间件:存储和路由消息,如 RocketMQ、Kafka 或 RabbitMQ。
- 被动方服务:消费消息并执行自身业务逻辑的服务,如果执行失败,需要触发补偿。
实现步骤(以最常见的方法为例)
本地消息表(最经典、可靠)
这种方法需要一个额外的“本地消息表”来记录事务状态,保证消息一定被发送。
流程:
-
开启本地事务: 主动方服务在自己的数据库中,同时执行:
- 更新业务数据(如“扣减库存”)。
- 向“本地消息表”插入一条状态为“待发送”的消息记录。 注意: 这两个操作必须在同一个数据库本地事务中完成(ACID)。
-
发送消息: 主动方服务中的定时任务或后台线程,轮询“本地消息表”中状态为“待发送”的消息,并发送到消息队列。
-
被动方消费消息并执行: 被动方服务从队列接收到消息后,执行自己的业务逻辑(如“新增订单”)。 关键: 需要保证消费的幂等性(通过唯一业务ID去重)。
-
确认与更新状态:
- 如果被动方执行成功,主动方收到回调或被动方通过消息队列的自动确认机制,主动方更新“本地消息表”状态为“已发送”。
- 如果被动方执行失败(如业务异常、服务崩溃),消息会重试,如果多次重试后依然失败,主动方可将消息状态改为“需人工处理”或“回滚”,并触发补偿。
RocketMQ 事务消息(推荐)
RocketMQ 原生支持事务消息,省去了自行实现本地消息表的轮询逻辑。
流程:
-
发送“半消息”(prepare): 主动方服务向 RocketMQ 发送一条“半消息”,此时消息处于“暂不可见”状态,消费者无法消费。
-
执行本地事务: 主动方服务执行本地业务操作。
-
提交/回滚消息:
- 如果本地事务执行成功,向 RocketMQ 发送
commit指令,消息变为“可见”,消费者可消费。 - 如果本地事务执行失败,发送
rollback指令,消息被丢弃。
- 如果本地事务执行成功,向 RocketMQ 发送
-
回查机制: RocketMQ 长时间未收到 commit/rollback,会主动回调主动方服务的
checkLocalTransaction方法,询问本地事务最终状态(成功、失败、未知),这是保证消息100%不丢失的关键。
核心难点及解决方案
-
消息的可靠性(必须发出去):
- 方案:本地消息表 + 定时重试 + 幂等性。
- 原理:只要本地事务提交成功,消息记录就在数据库里,后续无论发送失败多少次,定时任务都会尝试重发。
-
消费的幂等性(必须只执行一次):
- 方案:使用业务唯一键(如订单号、事务ID)做去重表或数据库主键冲突。
- 原理:被动方在执行业务前,先查一下去重表中是否有该ID,如果有则跳过。
-
被动方执行失败(需补偿):
- 方案:提供补偿接口。
- 原理:如果被动手变更失败(如库存不足),主动方需要调用被动方的“回滚接口”(如释放已锁定的库存),通常是异步的。
代码示例(简化版)
主动方:本地消息表实现
// 主动方服务
@Service
public class OrderService {
@Autowired
private LocalMessageMapper localMessageMapper; // 数据库
@Transactional
public void createOrder(Order order) {
// 1. 业务操作:扣库存,生成订单
inventoryService.deduct(order.getProductId());
orderDao.insert(order);
// 2. 插入本地消息表(状态:待发送)
LocalMessage msg = new LocalMessage();
msg.setId(UUID.randomUUID().toString());
msg.setBusinessType("order");
msg.setMsgBody(JSON.toJSONString(order));
msg.setStatus(0); // 0-待发送
localMessageMapper.insert(msg);
}
// 定时任务:扫描并发送待发送消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List<LocalMessage> pendingMsgs = localMessageMapper.selectByStatus(0);
for (LocalMessage msg : pendingMsgs) {
try {
// 发送到MQ
messageQueue.send(msg.getBusinessType(), msg.getMsgBody());
// 发送成功,更新状态
msg.setStatus(1); // 1-已发送
localMessageMapper.updateById(msg);
} catch (Exception e) {
// 记录失败,下次再重试
log.error("Send msg failed, msgId:{}", msg.getId());
}
}
}
}
被动方:幂等消费 + 补偿
@Component
@RocketMQMessageListener(topic = "order", consumerGroup = "order_group")
public class OrderConsumer implements RocketMQListener<String> {
@Transactional
public void onMessage(String message) {
// 1. 获取业务ID(如订单号)
String orderId = extractOrderId(message);
// 2. 幂等性检查(去重表或唯一索引)
if (dedupDao.exists(orderId)) {
log.info("重复消息,直接ack: {}", orderId);
return; // 如果RocketMQ自动ack,此时会返回成功
}
// 3. 执行业务逻辑(如新增订单)
Order order = JSON.parseObject(message, Order.class);
orderDao.insert(order);
// 4. 插入去重记录
dedupDao.insert(new DedupRecord(orderId));
}
// 如果业务失败,触发补偿
public void compensate(String orderId) {
// 调用主动方或被动方自身的回滚接口
inventoryService.addBack(orderId);
}
}
缺点与适用场景
- 优点:高可用、高可靠,适用于对最终一致性要求严格的场景(如支付、订单、库存)。
- 缺点:
- 实现复杂度高(需考虑重试、去重、回查)。
- 存在秒级延迟(消息投递、定时轮询)。
- 不支持强一致性,只保证最终一致性。
- 适用场景:转账、下单(库存扣减与订单生成)、用户注册(积分发放)等。
基于消息的最终一致性事务,本质上是用消息队列的可靠投递 + 本地事务的原子性 + 消费的幂等性,来确保分布式系统下数据最终一致,它不保证实时一致,但保证最终一致。
如果你的业务对强一致性要求高(如余额扣减与订单同步),可能需要考虑 Seata 等分布式事务框架;但如果允许秒级延迟,基于消息的最终一致性模式是最常见且成熟的选择。