本文目录导读:

在Java中确认消息消费(即消息确认机制,Message Acknowledgement),主要取决于你使用的消息中间件(如RabbitMQ、Kafka、RocketMQ、ActiveMQ等)。
以下针对最主流的 RabbitMQ 和 Apache Kafka 提供详细的确认方式及Java代码案例。
RabbitMQ 消息确认
RabbitMQ 提供了三种确认模式:自动确认、手动确认 和 批量确认。
核心概念:
- Basic.Ack:确认消息已成功处理,可以删除。
- Basic.Nack:拒绝消息(可以要求重新入队)。
- Basic.Reject:拒绝单条消息(是Nack的简化版,不支持批量)。
案例:手动确认(推荐用于生产环境)
依赖: spring-boot-starter-amqp 或 com.rabbitmq:amqp-client
方式1:使用 Spring Boot @RabbitListener(最常用)
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@RabbitListener(queues = "myQueue")
public void handleMessage(String payload, Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 业务处理
System.out.println("收到消息: " + payload);
// ... 你的业务逻辑 ...
doBusiness(payload);
// 2. 处理成功 -> 确认消息(从队列删除)
channel.basicAck(deliveryTag, false); // false: 只确认当前消息
System.out.println("消息已确认");
} catch (Exception e) {
// 3. 处理失败 -> 拒绝消息并重试(重新入队)
// requeue = true: 重新放回队列(可能导致死循环,通常配合重试次数限制)
// requeue = false: 丢弃或进入死信队列
channel.basicNack(deliveryTag, false, true);
// 或 channel.basicReject(deliveryTag, true);
System.err.println("消息处理失败,已重新入队");
}
}
private void doBusiness(String payload) {
// 模拟业务
if (payload.contains("error")) {
throw new RuntimeException("模拟业务异常");
}
}
}
配置文件 (application.yml):
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动确认模式
retry:
enabled: true # 开启重试
max-attempts: 3 # 最大重试次数
方式2:使用原生 RabbitMQ Client
import com.rabbitmq.client.*;
public class NativeConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume("myQueue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
try {
String message = new String(body, "UTF-8");
System.out.println("收到: " + message);
// 业务处理
process(message);
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 失败时重新入队
channel.basicNack(deliveryTag, false, true);
}
}
});
}
}
Apache Kafka 消息确认
Kafka 的确认机制与 RabbitMQ 不同,它通过 偏移量(Offset)提交 来实现“确认消费”。
核心概念:
- 自动提交(默认):每隔一段时间自动提交偏移量(可能丢失数据)。
- 手动提交:在业务处理成功后手动提交偏移量(更可靠)。
- 同步提交 (
commitSync):阻塞直到提交成功。 - 异步提交 (
commitAsync):非阻塞,配合回调。
案例:手动提交偏移量(至少一次语义)
方式1:Spring Kafka(推荐)
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
// 1. 业务处理
System.out.printf("收到消息: offset=%d, value=%s%n", record.offset(), record.value());
processBusiness(record.value());
// 2. 手动确认(提交偏移量)
ack.acknowledge();
System.out.println("偏移量已提交");
} catch (Exception e) {
// 失败处理:记录日志、发送告警等
System.err.println("处理失败,偏移量未提交");
// 注意:不提交偏移量,消费者重启时会重新消费这条消息
}
}
private void processBusiness(String value) {
// 模拟业务
}
}
配置文件:
spring:
kafka:
consumer:
group-id: myGroup
enable-auto-commit: false # 关闭自动提交,使用手动确认
listener:
ack-mode: manual_immediate # 手动立即确认
方式2:原生 Kafka Consumer
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class NativeKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "myGroup");
props.put("enable.auto.commit", "false"); // 手动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("myTopic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
System.out.printf("offset = %d, value = %s%n", record.offset(), record.value());
processBusiness(record.value());
// 手动同步提交偏移量(每处理一条提交一次)
consumer.commitSync(Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
} catch (Exception e) {
System.err.println("处理失败,等待下次消费");
}
}
}
} finally {
consumer.close();
}
}
}
两种消息中间件的确认机制对比
| 特性 | RabbitMQ(AMQP) | Apache Kafka |
|---|---|---|
| 确认对象 | 消息 (Delivery Tag) | 偏移量 (Offset) |
| 确认时机 | 消息处理完成后 | 消息处理完成后(或批量处理完) |
| 重试机制 | 拒绝消息 + requeue=true |
不提交偏移量,下次poll自动重试 |
| 批量确认 | basicAck(tag, true) |
commitSync(Map) 或 commitAsync() |
| 死信处理 | 拒绝 + requeue=false 进死信队列 |
一般记录日志或发到死信主题 |
最佳实践建议
- 始终使用手动确认:在关键业务中,避免使用自动确认,防止消息丢失。
- 幂等性设计:消息可能重复消费(尤其是在Kafka),你的业务逻辑应该支持幂等(例如使用唯一ID去重)。
- 重试策略:
- RabbitMQ:结合
basicNack(requeue=true)和重试次数限制,避免死循环。 - Kafka:可以设计一个重试队列,将失败的消息写入另一个主题,消费重试。
- RabbitMQ:结合
- 监控与告警:对长时间未确认的消息进行监控,排查消费阻塞。
需要针对某个特定中间件(如RocketMQ、Redis Stream等)的确认机制吗?我可以进一步为你展开。