本文目录导读:

- 方式一:使用 RabbitMQ (最通用的消息队列)
- 方式二:使用 Apache Kafka (适合高吞吐、日志/大数据场景)
- 方式三:传统 JMS + ActiveMQ (相对较老,仍在使用)
- 总结:实际项目中怎么选?
Java发送队列消息通常指的是使用消息中间件(如 RabbitMQ、Apache ActiveMQ、Kafka 等)向一个队列中生产消息。
最常见的两种场景是:
- 使用 JMS 标准:主要针对 ActiveMQ 这类传统消息队列。
- 使用 AMQP 协议:主要针对 RabbitMQ 这类高性能消息队列。
下面我就比较常用的 RabbitMQ 和 Kafka 来演示,因为在实际生产中相比 ActiveMQ 更为普遍,我会提供一个通用、规范的案例。
使用 RabbitMQ (最通用的消息队列)
RabbitMQ 用 Java 发送消息,通常会使用 Spring Boot 集成,非常方便。
添加 Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置连接信息 (application.yml)
spring:
rabbitmq:
host: localhost # 你的 RabbitMQ 服务地址
port: 5672 # AMQP 端口
username: guest
password: guest
配置队列和发送消息 (Java代码)
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
// 1. 定义队列的配置 (通常在配置类中完成)
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "my.java.queue";
@Bean
public Queue myQueue() {
// 创建一个持久化的队列
return new Queue(QUEUE_NAME, true);
}
}
// 2. 消息发送者
@RestController
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String sendMessage() {
String message = "Hello, 这是发送到队列的消息! 时间: " + LocalDateTime.now();
// 关键代码: 发送消息到指定队列
// 参数: (交换机名称, 路由键, 消息内容)
// 如果不想使用交换机直接发往队列,路由键写队列名
rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, message);
return "消息已发送: " + message;
}
}
// 3. 消息消费者 (接收消息)
@Component
public class MessageConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveMessage(String message) {
System.out.println("接收到队列消息: " + message);
}
}
核心步骤解释:
Queue:定义队列名。RabbitTemplate:这是 Spring AMQP 提供的发送消息模板,convertAndSend方法会自动将对象序列化后发送。@RabbitListener:让方法监听队列,一旦有消息就自动执行。
使用 Apache Kafka (适合高吞吐、日志/大数据场景)
Kafka 内部使用的是“主题(Topic)”,但从消费者角度看,它可以扮演队列的角色(多个消费者在同一个消费组中竞争消费)。
添加 Maven 依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置连接信息 (application.yml)
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
发送消息 (Java代码)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC_NAME = "my-kafka-queue";
@GetMapping("/sendKafka")
public String sendKafkaMessage() {
String message = "Kafka 队列消息 " + System.currentTimeMillis();
// 关键代码: 发送消息到 Kafka 主题
kafkaTemplate.send(TOPIC_NAME, message);
return "Kafka 消息已发送: " + message;
}
}
// 消费者
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "my-kafka-queue", groupId = "my-group")
public void listen(String message) {
System.out.println("收到 Kafka 消息: " + message);
}
}
传统 JMS + ActiveMQ (相对较老,仍在使用)
虽然现在用 RabbitMQ 或 Kafka 更多,但一些遗留系统还在用 ActiveMQ。
// 需要 javax.jms 依赖
// 发送代码
public void sendJMSMessage() throws JMSException {
// 1. 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
// 2. 创建会话 (是否开启事务, 应答模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 3. 创建目标队列
Destination destination = session.createQueue("MyOldQueue");
// 4. 创建生产者并发送
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("你好,旧时代的消息");
producer.send(message);
// 5. 关闭资源
session.close();
connection.close();
}
实际项目中怎么选?
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 微服务异步解耦 | RabbitMQ | 路由灵活、支持复杂消息模型、生态好 |
| 日志收集/大数据 | Kafka | 高吞吐量、持久化、顺序消费 |
| 传统企业内部系统 | ActiveMQ | 遵循 JMS 标准,但性能相对较低,但部署简单 |
| 云原生/新项目 | RabbitMQ 或 Kafka | 更现代,性能更好 |
关键点提示:
- 发送消息最核心的就是调用
template.send()或convertAndSend()方法,参数主要是队列名/主题名和。 - 可以传字符串、JSON 字符串、或者自定义 Java 对象(需要实现
Serializable或在序列化器中配置)。 - 如果消息发送失败,通常会报错或抛出 AmqpException,生产环境需要做好重试和异常处理机制。