Java案例如何实现消息延时发送?

wen java案例 77

本文目录导读:

Java案例如何实现消息延时发送?

  1. 方案一:Java 内置定时器 (简单/单机/不推荐生产)
  2. 方案二:Redis 的 Key 过期通知 (轻量/分布式/有丢消息风险)
  3. 方案三:消息队列 (MQ) 的延迟消息 (推荐生产/可靠)
  4. 方案四:数据库轮询 (传统/稳定/高延迟)
  5. 我的建议

实现消息延时发送(延迟消息/定时消息)在Java中有多种方案,具体选择取决于你的业务场景、数据量、可靠性要求以及是否依赖中间件。

下面我会从简单到复杂,列举几种主流的实现方案,并提供核心代码示例。


Java 内置定时器 (简单/单机/不推荐生产)

利用 java.util.TimerScheduledExecutorService

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SimpleDelayDemo {
    private static final ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(5); // 线程池大小
    public static void sendDelayedMessage(String message, long delayInSeconds) {
        scheduler.schedule(() -> {
            // 这里执行真正的消息发送逻辑
            System.out.println("发送消息: " + message + " (延迟 " + delayInSeconds + " 秒后)");
        }, delayInSeconds, TimeUnit.SECONDS);
    }
    public static void main(String[] args) {
        System.out.println("开始...");
        sendDelayedMessage("Hello, World!", 5);
        System.out.println("立即继续执行...");
        // 程序不要立即退出,否则看不到输出
    }
}
  • 优点:简单,无需外部依赖。
  • 缺点
    • 宕机丢失:JVM重启后所有未执行的任务全部丢失。
    • 不支持分布式:多实例部署时会有重复或丢失。
    • 阻塞风险:任务执行时间过长会阻塞其他任务(如果线程池设置不当)。
  • 适用场景:本地测试、非关键功能的短延迟(几秒到几分钟)。

Redis 的 Key 过期通知 (轻量/分布式/有丢消息风险)

利用 Redis 的 expire + Keyspace notifications。

步骤:

  1. 开启 Redis 通知CONFIG SET notify-keyspace-events Ex
  2. Java 代码
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisDelayDemo {
    private static final String CHANNEL = "__keyevent@0__:expired";
    public static void main(String[] args) {
        // 1. 创建监听线程(订阅过期事件)
        Thread listener = new Thread(() -> {
            try (Jedis jedis = new Jedis("localhost", 6379)) {
                jedis.psubscribe(new JedisPubSub() {
                    @Override
                    public void onPMessage(String pattern, String channel, String message) {
                        // message 就是过期的 key 名
                        System.out.println("收到延迟消息: " + message);
                    }
                }, CHANNEL);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        listener.setDaemon(true);
        listener.start();
        // 2. 模拟发送延迟消息(把消息内容作为 key 的一部分或单独存储)
        String messageKey = "delay:msg:HelloWorld";
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            // 存入实际消息内容
            jedis.set(messageKey, "Hello, World After 5s!");
            // 设置过期时间(秒)
            jedis.expire(messageKey, 5);
            System.out.println("设置延迟消息完成,5秒后触发...");
        }
        // 为了演示保持主线程存活
        try { Thread.sleep(10000); } catch (InterruptedException e) {}
    }
}
  • 优点:轻量,简单,分布式可用(所有实例都监听相同 Redis)。
  • 缺点
    • 消息可靠性低:过期通知不保证送达,Redis 可能丢弃(尤其是大量 Key 同时过期)。
    • 粒度粗:只能精确到秒(Redis 过期精度有限)。
    • 消息体大小受限:Key 本身建议不超过几百字节。
  • 适用场景:非核心、允许少量丢失、秒级延迟的业务(如定时优惠券提醒)。

消息队列 (MQ) 的延迟消息 (推荐生产/可靠)

这是目前工业界最常用的方案,主流 MQ 都支持:

1 RabbitMQ (通过死信队列 + TTL 实现)

原理:消息先发送到“死信队列”,设置 TTL(Time To Live),TTL 到期后消息自动流转到“真实队列”,消费者消费该队列。

优点:成熟稳定,支持可靠投递(ACK)。
缺点:需要维护死信交换器和队列,代码稍复杂。

2 RocketMQ (原生支持延迟消息) - 强烈推荐

RocketMQ 从 4.x 开始内置延迟消息,不需要额外配置。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class RocketMQDelayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        Message msg = new Message("TopicTest", "TagA", "Hello Delay".getBytes());
        // 设置延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        // 对应 level: 1  2  3   4   5  6  7  8  9  10  11  12  13  14  15  16  17  18
        msg.setDelayTimeLevel(3); // 延迟10秒
        producer.send(msg);
        System.out.println("延迟消息已发送,10秒后消费");
        producer.shutdown();
    }
}

3 Kafka (无原生延迟,需设计轮询或时间戳)

Kafka 没有直接支持延迟消息,通常做法是:

  • 消息带一个 deliverTime 字段。
  • 消费者轮询时判断当前时间是否 >= deliverTime,如果是则处理,否则重新丢入队列(或放入本地延迟队列)。

数据库轮询 (传统/稳定/高延迟)

原理

  • 建一张 delay_message 表,包含 message, execute_time, status 字段。
  • 启动一个定时任务(如 Spring @Scheduled),每秒扫描表,查询 execute_time <= now() AND status = 'pending' 的记录。
  • 处理成功后更新 status = 'done'
CREATE TABLE delay_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    content TEXT,
    execute_time DATETIME NOT NULL,
    status VARCHAR(20) DEFAULT 'pending', -- pending, done, failed
    created_at DATETIME
);

优点:极度可靠,支持 ACID,数据不丢失。
缺点:延迟到秒级(不适合毫秒级),对数据库有一定压力(需合理建索引),实时性较差。


方案 可靠性 延迟精度 分布式支持 复杂度 推荐场景
Timer/Scheduler 高(毫秒) 极低 本地测试、小型应用
Redis 过期通知 秒级 允许少量丢消息
RabbitMQ+TTL 毫秒~秒 传统企业、需要稳定性
RocketMQ 原生延时 秒级(可配置) 强烈推荐,新项目首选
数据库轮询 秒级 数据绝对不能丢、已有数据库

我的建议

  1. 如果是新项目,团队有运维能力:直接上 RocketMQ 的延迟消息,最省心、最专业。
  2. 如果项目已使用 RabbitMQ:用死信队列实现。
  3. 如果预算有限、微服务规模小Redis 过期通知 快速解决问题。
  4. 如果对可靠性要求最高(金融机构)数据库轮询 + 定时任务,配合分布式锁。

从你问这个问题来看,我猜测你可能在做一个需要可靠、可控的延时任务系统,如果让我推荐一个最实用、面试也常考的方案,我会选择 RocketMQ 的延迟消息

抱歉,评论功能暂时关闭!