Java案例怎么确认消息消费?

wen java案例 78

本文目录导读:

Java案例怎么确认消息消费?

  1. RabbitMQ 消息确认
  2. Apache Kafka 消息确认
  3. 两种消息中间件的确认机制对比
  4. 最佳实践建议

在Java中确认消息消费(即消息确认机制,Message Acknowledgement),主要取决于你使用的消息中间件(如RabbitMQ、Kafka、RocketMQ、ActiveMQ等)。

以下针对最主流的 RabbitMQApache Kafka 提供详细的确认方式及Java代码案例。


RabbitMQ 消息确认

RabbitMQ 提供了三种确认模式:自动确认手动确认批量确认

核心概念:

  • Basic.Ack:确认消息已成功处理,可以删除。
  • Basic.Nack:拒绝消息(可以要求重新入队)。
  • Basic.Reject:拒绝单条消息(是Nack的简化版,不支持批量)。

案例:手动确认(推荐用于生产环境)

依赖: spring-boot-starter-amqpcom.rabbitmq:amqp-client

方式1:使用 Spring Boot @RabbitListener(最常用)

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
    @RabbitListener(queues = "myQueue")
    public void handleMessage(String payload, Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1. 业务处理
            System.out.println("收到消息: " + payload);
            // ... 你的业务逻辑 ...
            doBusiness(payload);
            // 2. 处理成功 -> 确认消息(从队列删除)
            channel.basicAck(deliveryTag, false); // false: 只确认当前消息
            System.out.println("消息已确认");
        } catch (Exception e) {
            // 3. 处理失败 -> 拒绝消息并重试(重新入队)
            // requeue = true: 重新放回队列(可能导致死循环,通常配合重试次数限制)
            // requeue = false: 丢弃或进入死信队列
            channel.basicNack(deliveryTag, false, true); 
            // 或 channel.basicReject(deliveryTag, true);
            System.err.println("消息处理失败,已重新入队");
        }
    }
    private void doBusiness(String payload) {
        // 模拟业务
        if (payload.contains("error")) {
            throw new RuntimeException("模拟业务异常");
        }
    }
}

配置文件 (application.yml):

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual   # 手动确认模式
        retry:
          enabled: true            # 开启重试
          max-attempts: 3          # 最大重试次数

方式2:使用原生 RabbitMQ Client

import com.rabbitmq.client.*;
public class NativeConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume("myQueue", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, 
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                long deliveryTag = envelope.getDeliveryTag();
                try {
                    String message = new String(body, "UTF-8");
                    System.out.println("收到: " + message);
                    // 业务处理
                    process(message);
                    // 手动确认
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    // 失败时重新入队
                    channel.basicNack(deliveryTag, false, true);
                }
            }
        });
    }
}

Apache Kafka 消息确认

Kafka 的确认机制与 RabbitMQ 不同,它通过 偏移量(Offset)提交 来实现“确认消费”。

核心概念:

  • 自动提交(默认):每隔一段时间自动提交偏移量(可能丢失数据)。
  • 手动提交:在业务处理成功后手动提交偏移量(更可靠)。
  • 同步提交 (commitSync):阻塞直到提交成功。
  • 异步提交 (commitAsync):非阻塞,配合回调。

案例:手动提交偏移量(至少一次语义)

方式1:Spring Kafka(推荐)

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessageConsumer {
    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            // 1. 业务处理
            System.out.printf("收到消息: offset=%d, value=%s%n", record.offset(), record.value());
            processBusiness(record.value());
            // 2. 手动确认(提交偏移量)
            ack.acknowledge();
            System.out.println("偏移量已提交");
        } catch (Exception e) {
            // 失败处理:记录日志、发送告警等
            System.err.println("处理失败,偏移量未提交");
            // 注意:不提交偏移量,消费者重启时会重新消费这条消息
        }
    }
    private void processBusiness(String value) {
        // 模拟业务
    }
}

配置文件:

spring:
  kafka:
    consumer:
      group-id: myGroup
      enable-auto-commit: false   # 关闭自动提交,使用手动确认
    listener:
      ack-mode: manual_immediate  # 手动立即确认

方式2:原生 Kafka Consumer

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class NativeKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "myGroup");
        props.put("enable.auto.commit", "false"); // 手动提交
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("myTopic"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        System.out.printf("offset = %d, value = %s%n", record.offset(), record.value());
                        processBusiness(record.value());
                        // 手动同步提交偏移量(每处理一条提交一次)
                        consumer.commitSync(Map.of(
                            new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1)
                        ));
                    } catch (Exception e) {
                        System.err.println("处理失败,等待下次消费");
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

两种消息中间件的确认机制对比

特性 RabbitMQ(AMQP) Apache Kafka
确认对象 消息 (Delivery Tag) 偏移量 (Offset)
确认时机 消息处理完成后 消息处理完成后(或批量处理完)
重试机制 拒绝消息 + requeue=true 不提交偏移量,下次poll自动重试
批量确认 basicAck(tag, true) commitSync(Map)commitAsync()
死信处理 拒绝 + requeue=false 进死信队列 一般记录日志或发到死信主题

最佳实践建议

  1. 始终使用手动确认:在关键业务中,避免使用自动确认,防止消息丢失。
  2. 幂等性设计:消息可能重复消费(尤其是在Kafka),你的业务逻辑应该支持幂等(例如使用唯一ID去重)。
  3. 重试策略
    • RabbitMQ:结合 basicNack(requeue=true) 和重试次数限制,避免死循环。
    • Kafka:可以设计一个重试队列,将失败的消息写入另一个主题,消费重试。
  4. 监控与告警:对长时间未确认的消息进行监控,排查消费阻塞。

需要针对某个特定中间件(如RocketMQ、Redis Stream等)的确认机制吗?我可以进一步为你展开。

抱歉,评论功能暂时关闭!