顺序消息的实现 / 延迟消息的实现(RocketMQ 的 18 级 / Kafka 的时间轮)

一句话速记

顺序消息:通过将同一业务 key 的消息路由到同一分区/队列(Kafka 指定 partition key,RocketMQ 指定 MessageQueueSelector)+ 单线程消费来保证顺序;延迟消息:RocketMQ 原生支持 18 个固定延迟级别(1s~2h),Kafka 原生不支持(需要时间轮插件或业务实现)。

关键细节

1)顺序消息的实现

Kafka 顺序消息

Kafka 顺序保证的层级:
  Partition 内:消息严格顺序(offset 递增)
  Topic 内:不保证(跨 Partition 无序)

实现顺序消息:
  Step 1:生产时按业务 key(如 order_id)路由到同一 Partition
  Step 2:消费时,同一 Partition 只有一个消费者处理(Partition 级别的串行)
// 生产端:指定 partition key,相同 key 的消息进同一 Partition
producer.send(new ProducerRecord<>(
    "orders",       // topic
    "order-12345",  // key → hash(key) % partitionCount 决定 partition
    orderEvent      // value
));
// 同一 order_id 的所有消息(创建、支付、发货)都在同一 Partition
 
// 消费端:默认 1 Partition = 1 Consumer(ConsumerGroup 内)
// 每个 Consumer 串行处理分配到的 Partition → 分区内有序

RocketMQ 顺序消息

// 生产端:指定消息队列选择器
producer.send(orderMessage, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        long orderId = (Long) arg;
        int index = (int)(orderId % mqs.size());  // 相同 orderId → 同一 Queue
        return mqs.get(index);
    }
}, orderId);
 
// 消费端:顺序消费监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext ctx) {
        // 同一 Queue 内串行执行,保证顺序
        for (MessageExt msg : msgs) {
            processOrderEvent(msg);
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
// MessageListenerOrderly:同一 Queue 只有一个线程处理,严格顺序
// MessageListenerConcurrently:多线程并发,不保证顺序

顺序消息的代价

1. 吞吐量受限:一个 Partition/Queue 串行 → 并发度 = Partition/Queue 数量
2. 消费者扩展受限:消费者数量不能超过 Partition 数(多余的消费者空闲)
3. 局部有序:只在 Partition/Queue 内有序,不是全局有序
4. 失败处理复杂:顺序消费中,一条失败会阻塞后续消息(需要有重试上限)

2)延迟消息的实现

RocketMQ 固定级别延迟(原生支持)

// 18 个延迟级别(不可自定义):
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
 
Message msg = new Message("OrderTopic", body);
msg.setDelayTimeLevel(3);  // 级别 3 = 10s 延迟
 
// 发送后,消息在 SCHEDULE_TOPIC_XXXX 内部主题暂存
// 定时任务(ScheduleMessageService)检查到期的消息
// 到期后移到目标 Topic,消费者才能消费

RocketMQ 5.0 任意时间延迟(Timer Wheel)

// RocketMQ 5.0 支持精确延迟(任意时间)
Message msg = new Message("OrderTopic", body);
msg.setDeliverTimeMs(System.currentTimeMillis() + 30 * 60 * 1000);  // 30分钟后
producer.send(msg);

Kafka 原生不支持延迟消息的实现方案

方案 1:多级 Topic 模拟(RocketMQ 的 18 级思路)
  delay_1s, delay_5s, delay_10s ... 等中转 Topic
  Consumer 从延迟 Topic 消费 → 检查到期 → 转发到真正的目标 Topic
  代价:Topic 数量多,运维复杂

方案 2:时间轮(Kafka Stream / 应用层)
  内存时间轮(HashedWheelTimer)
  消息存入时间轮 → 到期触发 → 发送到目标 Topic
  Kafka + Flink/KStreams 实现

方案 3:DB + 定时扫描(简单但有延迟)
  消息先存 DB(含投递时间)
  定时任务(每秒)扫描到期消息 → 发送到 Kafka
  适用:延迟精度要求不高(秒级)

方案 4:专用延迟消息服务(推荐)
  DDMQ(滴滴)、QMQ 延迟消息(携程)、Pulsar(原生支持)
  Pulsar: msg.deliverAfter(30, TimeUnit.MINUTES)

时间轮算法(HashedWheelTimer)

时间轮 = 一个环形数组,每个槽代表一个时间刻度(如 100ms)
一圈 = 360 个槽 = 36s(每槽 100ms)

新任务(delay=5s):
  当前 tick=10 → 目标 tick=10+50=60(5s / 100ms = 50 个刻度)
  挂在第 60 个槽(链表)

时间轮推进:
  每 100ms 推进一个槽,检查槽上的任务是否到期
  到期 → 执行(如转发到目标 Topic)
  未到期(多圈任务)→ rounds-1(跨圈计数)

优点:O(1) 复杂度添加和触发,适合大量延迟任务
缺点:精度受槽大小限制(100ms 精度),时钟回拨问题

3)实际场景

订单超时取消(30 分钟未支付取消):
  RocketMQ:setDelayTimeLevel(16) = 30分钟
  Kafka:DB 定时扫描 + 发送
  Pulsar:deliverAfter(30, TimeUnit.MINUTES)

优惠券到期提醒(提前 1 天提醒):
  提前 1 天 = 精确到分钟级别
  → RocketMQ 5.0 精确延迟 or 定时任务
  
定时推送(如每天早 9 点):
  → 不适合 MQ 延迟消息(需要大量消息在同一时刻触发)
  → 定时任务(Quartz/XXL-Job)更合适

延伸追问

  • Q:RocketMQ 18 级延迟为什么不能自定义时间? → 设计权衡:固定级别简化实现(每个级别一个内部队列),避免每条消息的过期时间不同导致的排序开销(否则需要堆/时间轮)。RocketMQ 5.0 通过时间轮(Timer Wheel)支持任意精确延迟,但 4.x 保持向后兼容的固定级别。
  • Q:Kafka 的时间轮是指什么? → Kafka 内部(kafka.utils.timer)用时间轮管理网络请求超时(如 Produce/Fetch 请求的超时),不是用于消息延迟。外部的延迟消息通常是社区实现(如 kafka-delayed-message-queue)或使用 Pulsar 替代。
  • Q:顺序消息失败了怎么处理(不能跳过)? → RocketMQ OrderlyConsumer 失败时,可以返回 SUSPEND_CURRENT_QUEUE_A_MOMENT(暂停当前队列,等待重试),重试间隔逐步增加。需要设置最大重试次数(consumer.setMaxReconsumeTimes(3)),超过后发送到死信队列(DLQ)人工处理,避免永久阻塞。

我的记法

  • 顺序消息:同 key → 同 Partition/Queue + 单线程消费
  • RocketMQ 延迟:18 固定级别(1s~2h)/ 5.0+ 任意精确延迟
  • Kafka 延迟:原生不支持,用多级 Topic / DB 定时扫描 / 换 Pulsar
  • 时间轮:环形数组,O(1) 添加,100ms 精度,Kafka 内部用于超时管理
  • 一句话:「顺序靠同 key 同队列,延迟靠 RocketMQ 18 级或 Pulsar 任意」

状态

  • 已背速记
  • 能写 Kafka 顺序消息的生产端和消费端
  • 能说 Kafka 实现延迟消息的方案