RabbitMQ 三大关键问题
2025/12/13大约 3 分钟约 951 字
RabbitMQ 三大关键问题
一、可靠性
核心目标:保证消息从发送到消费的全链路不丢失。
从 3 个阶段来保证:
Producer ──→ RabbitMQ ──→ Consumer
① ② ③
发送阶段 存储阶段 消费阶段1. 发送端
开启 Confirm 模式 和 Return 模式,确保消息正确到达 MQ 服务器和队列。
| 模式 | 触发时机 | 说明 |
|---|---|---|
| Confirm | 消息到达 Exchange 后 | Broker 返回 ack/nack,确认消息已收到 |
| Return | 消息路由失败时 | 消息到了 Exchange 但找不到匹配的 Queue |
配合 本地消息表 + 定时任务扫描,防止发送端应用崩溃导致的消息丢失。
2. 存储端
将 Exchange、Queue、Message 全部设置为持久化(durable=true, deliveryMode=2),并搭建 镜像队列 (Mirrored Queue) 集群,防止单机故障。
3. 消费端
将自动 Ack 改为 手动 Ack。只有业务逻辑执行成功(DB 落库)后才提交 Ack。配合 死信队列 (DLQ) 处理长期失败的消息。
二、有序性
核心目标:保证特定业务的一组消息(如 订单创建 → 支付 → 发货)按顺序执行。
乱序原因
- 多消费者争抢:消费者 A 拿了第一条但处理慢,消费者 B 拿了第二条且处理快(先入库了)。
- 异常重试:第一条消息处理失败,被退回队列尾部(或死信),导致后续消息先被处理。
解决方案
| 方案 | 思路 | 适用场景 |
|---|---|---|
| 物理分片 (Queue Hashing) | 拆分队列 + 定向路由。利用 Hash 算法,保证同一 ID 的消息只进同一个 Queue,且该 Queue 只有一个消费者 | 对顺序要求严格 |
| 逻辑排序 (状态机 + 延迟重试) | 允许乱序 + 业务纠错。消费者收到消息先查库,如果前置状态不对,暂存延迟队列稍后再试 | 对顺序要求不那么严格,追求高吞吐 |
三、幂等性
核心目标:无论 MQ 重发多少次(网络抖动、ACK 丢失),消费端的业务逻辑只执行一次,数据不重复、不错乱。
重复原因
- ACK 丢失:消费者做完了,但没告诉 MQ,MQ 以为没做,超时重发。
- 生产者重发:发送端以为没发成功,发了两遍。
三层防重机制
| 层级 | 手段 | 作用 | 特点 |
|---|---|---|---|
| 第一层 | Redis 原子锁 SETNX lock:msg:{id} 10s | 防止多线程/进程同时处理同一条消息 | 抗瞬时并发 |
| 第二层 | Redis 状态标记 GET mark:processed:{id} | 处理完后写入 Redis(有效期 24h),新消息先查 Redis | 抗历史重复,快速过滤 |
| 第三层 | DB 唯一索引 Unique Key (biz_id + action) | 数据库层面兜底,报 DuplicateKeyException 则回滚 | 最终护城河 |
业务执行顺序
- 抢锁 (Redis SETNX) → 失败则退出
- 查标记 (Redis GET) → 有则 ACK
- 落库 (DB Insert) → 报错则 ACK(说明是重复)
- 跑业务 (Service)
- 写标记 (Redis SET)
- ACK (MQ)
四、消费者伪代码
@RabbitListener(queues = "order.queue")
public void onMessage(Message msg, Channel channel) {
String msgId = msg.getMessageProperties().getMessageId();
// 1. 【幂等】Redis 抢锁 & 查重
if (!tryLock(msgId) || isProcessed(msgId)) {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
return;
}
try {
// 2. 【有序】状态机前置校验
if (!checkStatus(msg)) {
// 状态不对(乱序),扔进【延迟队列】稍后再试
sendToDelayQueue(msg);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
return;
}
// 3. 【兜底】消息落库 (Msg Log) - 利用唯一索引防重
saveMsgLog(msgId);
// 4. 【业务】执行业务逻辑
doBusiness(msg);
// 5. 【标记】写入 Redis 已处理标记
markAsProcessed(msgId);
// 6. 【ACK】确认消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (DuplicateKeyException e) {
// 重复消息,直接 ACK
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 异常:重试耗尽 -> nack + 死信队列 + 报警
handleFailure(msg, channel, e);
} finally {
// 7. 【释放锁】无论成功失败都要释放
releaseLock(msgId);
}
}