本文目录导读:

使用数据库表实现消息队列是一种常见的“轮询”模式,适合低并发、对实时性要求不高的场景,下面我将从表结构设计、生产者入队、消费者出队及并发控制几个核心环节详细说明。
表结构设计
最简单的消息队列表通常包含以下字段:
CREATE TABLE message_queue (
id BIGINT AUTO_INCREMENT PRIMARY KEY, -- 主键,用于排序和唯一标识
topic VARCHAR(64) NOT NULL, -- 队列主题(不同业务分开)
content TEXT NOT NULL, -- 消息内容(JSON 格式更灵活)
status TINYINT NOT NULL DEFAULT 0, -- 状态:0=待处理, 1=处理中, 2=完成, -1=处理失败
retry_count INT NOT NULL DEFAULT 0, -- 重试次数(可选)
created_at DATETIME NOT NULL DEFAULT NOW(), -- 创建时间
updated_at DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW() -- 最后更新时间
);
-- 建议建立的索引
CREATE INDEX idx_topic_status ON message_queue(topic, status, created_at);
生产者:入队操作
生产者只需要向表中插入一条新记录:
-- 插入一条待处理的消息
INSERT INTO message_queue (topic, content)
VALUES ('order_paid', '{"order_id": 1001, "amount": 299}');
消费者:出队操作(核心难点)
数据库作为队列的最大挑战是避免多个消费者同时消费同一条消息,常用的方法有:
方案1:乐观锁 + 原子更新
这是最常用的方式,通过 UPDATE ... WHERE status=0 LIMIT 1 并利用数据库的行级锁:
-- 1. 占用一条消息(原子操作)
UPDATE message_queue
SET status = 1, updated_at = NOW()
WHERE status = 0
AND topic = 'order_paid'
AND id = (
SELECT id FROM (
SELECT id FROM message_queue
WHERE topic = 'order_paid' AND status = 0
ORDER BY created_at ASC
LIMIT 1
) AS tmp
);
改进版本(更简洁,支持更新后返回数据):
-- MySQL 8.0+ 使用 CTE,但更常用的是先 SELECT FOR UPDATE 再 UPDATE BEGIN; -- 2. 锁定并获取一条待处理消息 SELECT id, content FROM message_queue WHERE topic = 'order_paid' AND status = 0 ORDER BY created_at LIMIT 1 FOR UPDATE; -- 3. 更新状态为“处理中” UPDATE message_queue SET status = 1, updated_at = NOW() WHERE id = ?; COMMIT;
方案2:使用唯一业务ID(幂等设计)
如果每条消息有全局唯一的业务ID(如订单号),可以用 INSERT ... ON DUPLICATE KEY UPDATE 避免重复消费,但这是更偏幂等设计的做法。
处理完成后的逻辑
消费者处理完消息后,更新状态:
-- 处理成功 UPDATE message_queue SET status = 2, updated_at = NOW() WHERE id = ?; -- 处理失败(可设置重试) UPDATE message_queue SET status = -1, retry_count = retry_count + 1, updated_at = NOW() WHERE id = ?;
处理超时与死信(生产必备)
如果消费者在处理过程中崩溃,消息会一直卡在 status=1,需要定期任务扫描超时消息:
-- 将超过30秒仍处于“处理中”的消息重置为待处理 UPDATE message_queue SET status = 0, updated_at = NOW() WHERE status = 1 AND updated_at < NOW() - INTERVAL 30 SECOND;
对于超过最大重试次数的消息,可标记为死信并告警:
-- 重试超过3次的消息,标记为死信 UPDATE message_queue SET status = -99, updated_at = NOW() WHERE status = -1 AND retry_count >= 3;
完整使用示例(Python伪代码)
import pymysql
import time
def poll_and_process():
conn = pymysql.connect(...)
while True:
try:
with conn.cursor() as cursor:
# 1. 尝试获取一条消息(原子操作)
sql = """
UPDATE message_queue
SET status = 1, updated_at = NOW()
WHERE status = 0 AND topic = 'order_paid'
ORDER BY created_at ASC
LIMIT 1
"""
cursor.execute(sql)
affected = cursor.rowcount
if affected == 0:
time.sleep(1) # 没有消息,等待
continue
# 2. 获取刚占用的那条消息
cursor.execute("SELECT id, content FROM message_queue WHERE status = 1 ORDER BY updated_at DESC LIMIT 1")
msg = cursor.fetchone()
# 3. 处理业务
try:
process_order(msg['content'])
# 处理成功
cursor.execute("UPDATE message_queue SET status=2 WHERE id=%s", (msg['id'],))
except Exception as e:
# 处理失败
cursor.execute("UPDATE message_queue SET status=-1, retry_count=retry_count+1 WHERE id=%s", (msg['id'],))
conn.commit()
except Exception:
time.sleep(1)
优缺点对比
| 优点 | 缺点 |
|---|---|
| 不需要引入额外中间件(如RabbitMQ、Redis) | 轮询压力大,不适合高吞吐量(>10k/s) |
| 数据持久化、支持事务,无需额外同步机制 | 每条消息都需要数据库行锁,并发能力有限 |
| 可以方便地实现消息重试、死信等高级功能 | 数据库IO是瓶颈,延迟通常几毫秒到几十毫秒 |
何时适用?何时应换用专业队列?
- 适用场景:内部定时任务、数据同步、非关键通知(如邮件发送)、低并发(<1000条/秒)场景。
- 不适用场景:高并发秒杀、支付核心链路、需要严格消息顺序保证的场景。
如果项目逐渐变大,建议尽早迁移到专业队列:Redis List(轻量) → RabbitMQ / Kafka(生产级)。
如果你愿意,我还可以帮你将这个方案扩展到 多消费者并行消费 + 公平调度,或者告诉你如何用 数据库表实现延迟队列。