Java案例怎么发送队列消息?

wen java案例 74

本文目录导读:

Java案例怎么发送队列消息?

  1. 方式一:使用 RabbitMQ (最通用的消息队列)
  2. 方式二:使用 Apache Kafka (适合高吞吐、日志/大数据场景)
  3. 方式三:传统 JMS + ActiveMQ (相对较老,仍在使用)
  4. 总结:实际项目中怎么选?

Java发送队列消息通常指的是使用消息中间件(如 RabbitMQ、Apache ActiveMQ、Kafka 等)向一个队列中生产消息。

最常见的两种场景是:

  1. 使用 JMS 标准:主要针对 ActiveMQ 这类传统消息队列。
  2. 使用 AMQP 协议:主要针对 RabbitMQ 这类高性能消息队列。

下面我就比较常用的 RabbitMQKafka 来演示,因为在实际生产中相比 ActiveMQ 更为普遍,我会提供一个通用、规范的案例。

使用 RabbitMQ (最通用的消息队列)

RabbitMQ 用 Java 发送消息,通常会使用 Spring Boot 集成,非常方便。

添加 Maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置连接信息 (application.yml)

spring:
  rabbitmq:
    host: localhost   # 你的 RabbitMQ 服务地址
    port: 5672        # AMQP 端口
    username: guest
    password: guest

配置队列和发送消息 (Java代码)

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
// 1. 定义队列的配置 (通常在配置类中完成)
@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "my.java.queue";
    @Bean
    public Queue myQueue() {
        // 创建一个持久化的队列
        return new Queue(QUEUE_NAME, true);
    }
}
// 2. 消息发送者
@RestController
public class MessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/send")
    public String sendMessage() {
        String message = "Hello, 这是发送到队列的消息! 时间: " + LocalDateTime.now();
        // 关键代码: 发送消息到指定队列
        // 参数: (交换机名称, 路由键, 消息内容)
        // 如果不想使用交换机直接发往队列,路由键写队列名
        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, message);
        return "消息已发送: " + message;
    }
}
// 3. 消息消费者 (接收消息)
@Component
public class MessageConsumer {
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("接收到队列消息: " + message);
    }
}

核心步骤解释:

  • Queue:定义队列名。
  • RabbitTemplate:这是 Spring AMQP 提供的发送消息模板,convertAndSend 方法会自动将对象序列化后发送。
  • @RabbitListener:让方法监听队列,一旦有消息就自动执行。

使用 Apache Kafka (适合高吞吐、日志/大数据场景)

Kafka 内部使用的是“主题(Topic)”,但从消费者角度看,它可以扮演队列的角色(多个消费者在同一个消费组中竞争消费)。

添加 Maven 依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置连接信息 (application.yml)

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

发送消息 (Java代码)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaProducerController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    private static final String TOPIC_NAME = "my-kafka-queue";
    @GetMapping("/sendKafka")
    public String sendKafkaMessage() {
        String message = "Kafka 队列消息 " + System.currentTimeMillis();
        // 关键代码: 发送消息到 Kafka 主题
        kafkaTemplate.send(TOPIC_NAME, message);
        return "Kafka 消息已发送: " + message;
    }
}
// 消费者
@Component
public class KafkaConsumerService {
    @KafkaListener(topics = "my-kafka-queue", groupId = "my-group")
    public void listen(String message) {
        System.out.println("收到 Kafka 消息: " + message);
    }
}

传统 JMS + ActiveMQ (相对较老,仍在使用)

虽然现在用 RabbitMQ 或 Kafka 更多,但一些遗留系统还在用 ActiveMQ。

// 需要 javax.jms 依赖
// 发送代码
public void sendJMSMessage() throws JMSException {
    // 1. 创建连接工厂
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection connection = factory.createConnection();
    connection.start();
    // 2. 创建会话 (是否开启事务, 应答模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 3. 创建目标队列
    Destination destination = session.createQueue("MyOldQueue");
    // 4. 创建生产者并发送
    MessageProducer producer = session.createProducer(destination);
    TextMessage message = session.createTextMessage("你好,旧时代的消息");
    producer.send(message);
    // 5. 关闭资源
    session.close();
    connection.close();
}

实际项目中怎么选?

场景 推荐方案 原因
微服务异步解耦 RabbitMQ 路由灵活、支持复杂消息模型、生态好
日志收集/大数据 Kafka 高吞吐量、持久化、顺序消费
传统企业内部系统 ActiveMQ 遵循 JMS 标准,但性能相对较低,但部署简单
云原生/新项目 RabbitMQ 或 Kafka 更现代,性能更好

关键点提示:

  • 发送消息最核心的就是调用 template.send()convertAndSend() 方法,参数主要是队列名/主题名和。
  • 可以传字符串、JSON 字符串、或者自定义 Java 对象(需要实现 Serializable 或在序列化器中配置)。
  • 如果消息发送失败,通常会报错或抛出 AmqpException,生产环境需要做好重试和异常处理机制。

抱歉,评论功能暂时关闭!