如何用数据库表实现一个简单的消息队列?

wen java案例 51

本文目录导读:

如何用数据库表实现一个简单的消息队列?

  1. 表结构设计
  2. 生产者:入队操作
  3. 消费者:出队操作(核心难点)
  4. 处理完成后的逻辑
  5. 处理超时与死信(生产必备)
  6. 完整使用示例(Python伪代码)
  7. 优缺点对比
  8. 何时适用?何时应换用专业队列?

使用数据库表实现消息队列是一种常见的“轮询”模式,适合低并发、对实时性要求不高的场景,下面我将从表结构设计生产者入队消费者出队并发控制几个核心环节详细说明。


表结构设计

最简单的消息队列表通常包含以下字段:

CREATE TABLE message_queue (
    id          BIGINT AUTO_INCREMENT PRIMARY KEY,  -- 主键,用于排序和唯一标识
    topic       VARCHAR(64) NOT NULL,               -- 队列主题(不同业务分开)
    content     TEXT NOT NULL,                       -- 消息内容(JSON 格式更灵活)
    status      TINYINT NOT NULL DEFAULT 0,         -- 状态:0=待处理, 1=处理中, 2=完成, -1=处理失败
    retry_count INT NOT NULL DEFAULT 0,             -- 重试次数(可选)
    created_at  DATETIME NOT NULL DEFAULT NOW(),    -- 创建时间
    updated_at  DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW() -- 最后更新时间
);
-- 建议建立的索引
CREATE INDEX idx_topic_status ON message_queue(topic, status, created_at);

生产者:入队操作

生产者只需要向表中插入一条新记录:

-- 插入一条待处理的消息
INSERT INTO message_queue (topic, content) 
VALUES ('order_paid', '{"order_id": 1001, "amount": 299}');

消费者:出队操作(核心难点)

数据库作为队列的最大挑战是避免多个消费者同时消费同一条消息,常用的方法有:

方案1:乐观锁 + 原子更新

这是最常用的方式,通过 UPDATE ... WHERE status=0 LIMIT 1 并利用数据库的行级锁:

-- 1. 占用一条消息(原子操作)
UPDATE message_queue
SET    status = 1, updated_at = NOW()
WHERE  status = 0
  AND  topic = 'order_paid'
  AND  id = (
    SELECT id FROM (
        SELECT id FROM message_queue
        WHERE topic = 'order_paid' AND status = 0
        ORDER BY created_at ASC
        LIMIT 1
    ) AS tmp
);

改进版本(更简洁,支持更新后返回数据):

-- MySQL 8.0+ 使用 CTE,但更常用的是先 SELECT FOR UPDATE 再 UPDATE
BEGIN;
-- 2. 锁定并获取一条待处理消息
SELECT id, content 
FROM   message_queue  
WHERE  topic = 'order_paid' AND status = 0  
ORDER BY created_at  
LIMIT 1  
FOR UPDATE;
-- 3. 更新状态为“处理中”
UPDATE message_queue  
SET    status = 1, updated_at = NOW()  
WHERE  id = ?;
COMMIT;

方案2:使用唯一业务ID(幂等设计)

如果每条消息有全局唯一的业务ID(如订单号),可以用 INSERT ... ON DUPLICATE KEY UPDATE 避免重复消费,但这是更偏幂等设计的做法。


处理完成后的逻辑

消费者处理完消息后,更新状态:

-- 处理成功
UPDATE message_queue
SET    status = 2, updated_at = NOW()
WHERE  id = ?;
-- 处理失败(可设置重试)
UPDATE message_queue
SET    status = -1, retry_count = retry_count + 1, updated_at = NOW()
WHERE  id = ?;

处理超时与死信(生产必备)

如果消费者在处理过程中崩溃,消息会一直卡在 status=1,需要定期任务扫描超时消息:

-- 将超过30秒仍处于“处理中”的消息重置为待处理
UPDATE message_queue
SET    status = 0, updated_at = NOW()
WHERE  status = 1
  AND  updated_at < NOW() - INTERVAL 30 SECOND;

对于超过最大重试次数的消息,可标记为死信并告警:

-- 重试超过3次的消息,标记为死信
UPDATE message_queue
SET    status = -99, updated_at = NOW()
WHERE  status = -1 AND retry_count >= 3;

完整使用示例(Python伪代码)

import pymysql
import time
def poll_and_process():
    conn = pymysql.connect(...)
    while True:
        try:
            with conn.cursor() as cursor:
                # 1. 尝试获取一条消息(原子操作)
                sql = """
                    UPDATE message_queue
                    SET status = 1, updated_at = NOW()
                    WHERE status = 0 AND topic = 'order_paid'
                    ORDER BY created_at ASC
                    LIMIT 1
                """
                cursor.execute(sql)
                affected = cursor.rowcount
                if affected == 0:
                    time.sleep(1)  # 没有消息,等待
                    continue
                # 2. 获取刚占用的那条消息
                cursor.execute("SELECT id, content FROM message_queue WHERE status = 1 ORDER BY updated_at DESC LIMIT 1")
                msg = cursor.fetchone()
                # 3. 处理业务
                try:
                    process_order(msg['content'])
                    # 处理成功
                    cursor.execute("UPDATE message_queue SET status=2 WHERE id=%s", (msg['id'],))
                except Exception as e:
                    # 处理失败
                    cursor.execute("UPDATE message_queue SET status=-1, retry_count=retry_count+1 WHERE id=%s", (msg['id'],))
                conn.commit()
        except Exception:
            time.sleep(1)

优缺点对比

优点 缺点
不需要引入额外中间件(如RabbitMQ、Redis) 轮询压力大,不适合高吞吐量(>10k/s)
数据持久化、支持事务,无需额外同步机制 每条消息都需要数据库行锁,并发能力有限
可以方便地实现消息重试、死信等高级功能 数据库IO是瓶颈,延迟通常几毫秒到几十毫秒

何时适用?何时应换用专业队列?

  • 适用场景:内部定时任务、数据同步、非关键通知(如邮件发送)、低并发(<1000条/秒)场景。
  • 不适用场景:高并发秒杀、支付核心链路、需要严格消息顺序保证的场景。

如果项目逐渐变大,建议尽早迁移到专业队列:Redis List(轻量) → RabbitMQ / Kafka(生产级)


如果你愿意,我还可以帮你将这个方案扩展到 多消费者并行消费 + 公平调度,或者告诉你如何用 数据库表实现延迟队列

抱歉,评论功能暂时关闭!