本文目录导读:

防止消息重复消费是分布式系统中的一个经典问题(通常称为“幂等性”问题),在 Java 案例中,常见的解决方案可以从业务层、中间件层和数据库层三个维度来设计。
以下是几种主流且实用的防止消息重复消费的方案,附有 Java 代码示例和适用场景分析。
核心思路:幂等性设计
重复消费往往无法在消息队列层面(如 Kafka、RocketMQ、RabbitMQ)完全杜绝,因此消费者端自己实现幂等是根本解决办法。
基于唯一业务标识 + 去重表(最常用)
这是最推荐的方案,利用数据库的唯一索引或 Redis 的原子性,确保同一条业务消息只被执行一次。
原理: 每条消息都携带一个全局唯一的业务ID(如订单号、流水号),在处理消息前,先检查这个ID是否已经被处理过。
使用数据库唯一索引(强一致性)
适用场景: 金融、交易等对数据一致性要求极高的场景。
Java 伪代码示例:
// 1. 消息结构体
public class OrderMessage {
private String messageId; // 全局唯一ID (业务唯一键)
private String orderId;
private BigDecimal amount;
}
// 2. 数据库去重表 (DDL)
// CREATE TABLE msg_de_dup (
// id BIGINT AUTO_INCREMENT,
// message_id VARCHAR(64) NOT NULL COMMENT '业务去重关键值',
// status TINYINT COMMENT '处理状态 0-处理中 1-成功',
// PRIMARY KEY (id),
// UNIQUE KEY uk_message_id (message_id) // 唯一索引是关键
// );
// 3. 消费者逻辑
@Service
public class OrderConsumer {
@Autowired
private JdbcTemplate jdbcTemplate;
public void handleOrder(OrderMessage msg) {
// 尝试插入去重记录(利用数据库唯一索引防重)
try {
jdbcTemplate.update(
"INSERT INTO msg_de_dup (message_id, status) VALUES (?, ?)",
msg.getMessageId(), 0
);
} catch (DuplicateKeyException e) {
// 插入失败,说明已经处理过或正在处理,直接跳过
log.warn("重复消息已忽略: {}", msg.getMessageId());
return;
}
// 执行真正的业务逻辑(需要保证事务)
try {
// 更新订单状态,扣减库存等
orderService.payOrder(msg.getOrderId());
// 成功后更新状态
jdbcTemplate.update(
"UPDATE msg_de_dup SET status = 1 WHERE message_id = ? AND status = 0",
msg.getMessageId()
);
} catch (Exception e) {
// 业务失败,删除去重记录或标记为失败(允许重试机制重试)
jdbcTemplate.update("DELETE FROM msg_de_dup WHERE message_id = ?", msg.getMessageId());
throw e; // 抛出异常触发 MQ 重试
}
}
}
使用 Redis 的 SETNX 命令(高性能)
适用场景: 高并发、对一致性要求稍低一些的场景(可通过 TTL 兜底)。
Java 伪代码示例:
@Service
public class OrderConsumerRedis {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void handleOrder(OrderMessage msg) {
String key = "msg:de-dup:" + msg.getMessageId();
// 1. 尝试设置键(SETNX),如果存在则返回false
// 注意:需要设置过期时间,避免Redis内存泄漏;TTL应大于最大重试间隔
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "processing", Duration.ofMinutes(30));
if (Boolean.FALSE.equals(success)) {
log.warn("重复消息已忽略: {}", msg.getMessageId());
return;
}
// 2. 执行业务逻辑
try {
orderService.payOrder(msg.getOrderId());
// 3. 业务成功,更新状态(可选)
redisTemplate.opsForValue().set(key, "success", Duration.ofMinutes(30));
} catch (Exception e) {
// 业务失败,删除key,允许下次重试
redisTemplate.delete(key);
throw e;
}
}
}
基于消息中间件自带机制
某些消息队列提供了“消息不重复”或“恰好一次”的语义,但通常需要配合使用。
RocketMQ:消息去重机制
RocketMQ 支持在生产端设置 setKeys 或在 Broker 层面开启消息去重(需要配合配置,且有一定局限性)。
Kafka:幂等生产者 + 事务
Kafka 可以保证生产端不重复,但消费端需要自己处理,如果使用 Kafka 的事务 API 并配合 read_committed 隔离级别,可以一定程度避免消费端读到重复数据,但仍然无法完全避免因消费者重启或 rebalance 导致的重复。
利用业务逻辑本身的幂等性(最优雅)
设计业务接口时,使其天然支持“多次调用与一次调用效果相同”。
常见实现:
- 数据库状态机: 通过
update table set status=2 where status=1 and id=?来判断。- 如果返回
rows == 1,说明执行成功。 - 如果返回
rows == 0,说明状态已变化(可能是重复消费),直接忽略。
- 如果返回
- 全局流水号: 接口必须携带唯一流水号,后端根据流水号自然去重。
- 版本号乐观锁: 带版本号更新,只有版本匹配才更新。
Java 示例:基于数据库状态机的幂等处理
public void processPayment(String orderId, int currentStatus, int targetStatus) {
// SQL 语句本身有判断条件,保证只更新一次
int rows = paymentMapper.updateStatusByOrderId(orderId, currentStatus, targetStatus);
if (rows == 0) {
// 说明已经处理过,直接返回成功
return;
}
// 继续后续操作...
}
结合定时任务 + 状态标记(兜底)
对于高可用系统,往往不依赖单一机制,而是采用“主方案 + 兜底”策略。
- 主方案: 使用 Redis SETNX 或数据库唯一索引进行实时拦截。
- 兜底: 记录所有消息的处理状态(如
message_id+status),定时任务扫描是否存在 状态为“处理中”但实际已超时 的记录,或者通过人工补偿机制处理。
总结对比与选择建议
| 方案 | 实现复杂度 | 性能 | 一致性 | 推荐场景 |
|---|---|---|---|---|
| 数据库唯一索引 | 低 | 中 | 强 | 金融、交易、核心系统 |
| Redis SETNX | 低 | 高 | 高(需关注TTL) | 高并发、推送、通知 |
| 业务幂等设计 | 高(需设计) | 最高 | 取决于设计 | 具备强业务唯一键的场景 |
| MQ 自带机制 | 依赖 MQ | 中 | 有限 | 特定 MQ 的高级用法(如 RocketMQ) |
最佳实践建议:
- 首选方案: 业务逻辑幂等性(方案三)+ Redis 或数据库去重表(方案一)。
- 小技巧: 生产者端可以自行生成全局唯一 ID(如 UUID、雪花算法),并把它放入消息体中。
- 治标不治本: 单纯依赖“消息不重试”是不靠谱的,因为 MQ 可能因为各种底层原因出现重复投递。
- 关键原则: 消费端的处理逻辑必须设计成幂等的,这是最终的防线。