顺序消息的实现 / 延迟消息的实现(RocketMQ 的 18 级 / Kafka 的时间轮)
一句话速记
顺序消息:通过将同一业务 key 的消息路由到同一分区/队列(Kafka 指定 partition key,RocketMQ 指定 MessageQueueSelector)+ 单线程消费来保证顺序;延迟消息:RocketMQ 原生支持 18 个固定延迟级别(1s~2h),Kafka 原生不支持(需要时间轮插件或业务实现)。
关键细节
1)顺序消息的实现
Kafka 顺序消息:
Kafka 顺序保证的层级:
Partition 内:消息严格顺序(offset 递增)
Topic 内:不保证(跨 Partition 无序)
实现顺序消息:
Step 1:生产时按业务 key(如 order_id)路由到同一 Partition
Step 2:消费时,同一 Partition 只有一个消费者处理(Partition 级别的串行)
// 生产端:指定 partition key,相同 key 的消息进同一 Partition
producer.send(new ProducerRecord<>(
"orders", // topic
"order-12345", // key → hash(key) % partitionCount 决定 partition
orderEvent // value
));
// 同一 order_id 的所有消息(创建、支付、发货)都在同一 Partition
// 消费端:默认 1 Partition = 1 Consumer(ConsumerGroup 内)
// 每个 Consumer 串行处理分配到的 Partition → 分区内有序RocketMQ 顺序消息:
// 生产端:指定消息队列选择器
producer.send(orderMessage, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long orderId = (Long) arg;
int index = (int)(orderId % mqs.size()); // 相同 orderId → 同一 Queue
return mqs.get(index);
}
}, orderId);
// 消费端:顺序消费监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext ctx) {
// 同一 Queue 内串行执行,保证顺序
for (MessageExt msg : msgs) {
processOrderEvent(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// MessageListenerOrderly:同一 Queue 只有一个线程处理,严格顺序
// MessageListenerConcurrently:多线程并发,不保证顺序顺序消息的代价:
1. 吞吐量受限:一个 Partition/Queue 串行 → 并发度 = Partition/Queue 数量
2. 消费者扩展受限:消费者数量不能超过 Partition 数(多余的消费者空闲)
3. 局部有序:只在 Partition/Queue 内有序,不是全局有序
4. 失败处理复杂:顺序消费中,一条失败会阻塞后续消息(需要有重试上限)
2)延迟消息的实现
RocketMQ 固定级别延迟(原生支持):
// 18 个延迟级别(不可自定义):
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Message msg = new Message("OrderTopic", body);
msg.setDelayTimeLevel(3); // 级别 3 = 10s 延迟
// 发送后,消息在 SCHEDULE_TOPIC_XXXX 内部主题暂存
// 定时任务(ScheduleMessageService)检查到期的消息
// 到期后移到目标 Topic,消费者才能消费RocketMQ 5.0 任意时间延迟(Timer Wheel):
// RocketMQ 5.0 支持精确延迟(任意时间)
Message msg = new Message("OrderTopic", body);
msg.setDeliverTimeMs(System.currentTimeMillis() + 30 * 60 * 1000); // 30分钟后
producer.send(msg);Kafka 原生不支持延迟消息的实现方案:
方案 1:多级 Topic 模拟(RocketMQ 的 18 级思路)
delay_1s, delay_5s, delay_10s ... 等中转 Topic
Consumer 从延迟 Topic 消费 → 检查到期 → 转发到真正的目标 Topic
代价:Topic 数量多,运维复杂
方案 2:时间轮(Kafka Stream / 应用层)
内存时间轮(HashedWheelTimer)
消息存入时间轮 → 到期触发 → 发送到目标 Topic
Kafka + Flink/KStreams 实现
方案 3:DB + 定时扫描(简单但有延迟)
消息先存 DB(含投递时间)
定时任务(每秒)扫描到期消息 → 发送到 Kafka
适用:延迟精度要求不高(秒级)
方案 4:专用延迟消息服务(推荐)
DDMQ(滴滴)、QMQ 延迟消息(携程)、Pulsar(原生支持)
Pulsar: msg.deliverAfter(30, TimeUnit.MINUTES)
时间轮算法(HashedWheelTimer):
时间轮 = 一个环形数组,每个槽代表一个时间刻度(如 100ms)
一圈 = 360 个槽 = 36s(每槽 100ms)
新任务(delay=5s):
当前 tick=10 → 目标 tick=10+50=60(5s / 100ms = 50 个刻度)
挂在第 60 个槽(链表)
时间轮推进:
每 100ms 推进一个槽,检查槽上的任务是否到期
到期 → 执行(如转发到目标 Topic)
未到期(多圈任务)→ rounds-1(跨圈计数)
优点:O(1) 复杂度添加和触发,适合大量延迟任务
缺点:精度受槽大小限制(100ms 精度),时钟回拨问题
3)实际场景
订单超时取消(30 分钟未支付取消):
RocketMQ:setDelayTimeLevel(16) = 30分钟
Kafka:DB 定时扫描 + 发送
Pulsar:deliverAfter(30, TimeUnit.MINUTES)
优惠券到期提醒(提前 1 天提醒):
提前 1 天 = 精确到分钟级别
→ RocketMQ 5.0 精确延迟 or 定时任务
定时推送(如每天早 9 点):
→ 不适合 MQ 延迟消息(需要大量消息在同一时刻触发)
→ 定时任务(Quartz/XXL-Job)更合适
延伸追问
- Q:RocketMQ 18 级延迟为什么不能自定义时间? → 设计权衡:固定级别简化实现(每个级别一个内部队列),避免每条消息的过期时间不同导致的排序开销(否则需要堆/时间轮)。RocketMQ 5.0 通过时间轮(Timer Wheel)支持任意精确延迟,但 4.x 保持向后兼容的固定级别。
- Q:Kafka 的时间轮是指什么?
→ Kafka 内部(
kafka.utils.timer)用时间轮管理网络请求超时(如 Produce/Fetch 请求的超时),不是用于消息延迟。外部的延迟消息通常是社区实现(如 kafka-delayed-message-queue)或使用 Pulsar 替代。 - Q:顺序消息失败了怎么处理(不能跳过)?
→ RocketMQ OrderlyConsumer 失败时,可以返回
SUSPEND_CURRENT_QUEUE_A_MOMENT(暂停当前队列,等待重试),重试间隔逐步增加。需要设置最大重试次数(consumer.setMaxReconsumeTimes(3)),超过后发送到死信队列(DLQ)人工处理,避免永久阻塞。
我的记法
- 顺序消息:同 key → 同 Partition/Queue + 单线程消费
- RocketMQ 延迟:18 固定级别(1s~2h)/ 5.0+ 任意精确延迟
- Kafka 延迟:原生不支持,用多级 Topic / DB 定时扫描 / 换 Pulsar
- 时间轮:环形数组,O(1) 添加,100ms 精度,Kafka 内部用于超时管理
- 一句话:「顺序靠同 key 同队列,延迟靠 RocketMQ 18 级或 Pulsar 任意」
状态
- 已背速记
- 能写 Kafka 顺序消息的生产端和消费端
- 能说 Kafka 实现延迟消息的方案