消费端单条处理 20s 的坑(心跳 / 重平衡)

一句话速记

Kafka 消费端处理单条消息超过 max.poll.interval.ms(默认 5 分钟),Broker 认为该消费者已死,触发 Rebalance,把分区分配给其他消费者 → 消息被重复消费;还有心跳超时session.timeout.ms,默认 10s)也触发 Rebalance。单条处理 20s 的根本解法:提高 max.poll.interval.ms异步化处理(poll 后投线程池,主线程保持 poll 节奏)。

通俗解释

Kafka 的两个超时机制

1. 心跳超时(session.timeout.ms):
   Consumer 需要定期发送心跳给 Broker
   默认 10s 内没心跳 → Broker 认为 Consumer 挂了 → Rebalance
   心跳由后台线程发送,与 poll() 无关(只要进程存活就会发)

2. poll 间隔超时(max.poll.interval.ms):
   两次 poll() 调用之间的最大间隔
   默认 5 分钟,超过 → Broker 强制踢出 Consumer → Rebalance
   
   "处理 20s" 的问题:
   poll() 取到消息 → 处理每条消息 20s × 50 条 = 1000s
   1000s > 5 分钟 → 下次 poll() 之前已超时 → Rebalance

关键细节

1)问题的完整时间线

Consumer A(max.poll.interval.ms=300s, 单条处理 20s):

t=0s:   poll(),取到 100 条消息(max.poll.records=100)
t=0~20s:处理第 1 条(耗时 20s)
t=20~40s:处理第 2 条
...
t=2000s:处理第 100 条完成

BUT:t=300s 时,Consumer A 的 max.poll.interval.ms 已超时!
  → Broker 触发 Rebalance
  → Consumer A 的分区被分配给 Consumer B
  → Consumer B 从上次提交的 offset 开始消费
  → Consumer A 已处理的消息(但未提交 offset)被 B 重复消费

同时:Consumer A 尝试提交 offset → 失败(CommitFailedException)
  → 因为它已经不在消费组里了

2)解法 1:增大 max.poll.interval.ms

props.put("max.poll.interval.ms", "600000");  // 10 分钟(根据实际处理时间调整)
props.put("max.poll.records", "10");           // 减少每次 poll 的消息数(10 条 × 20s = 200s < 600s)
 
// 适用:处理时间可预期,偶尔慢
// 缺点:Broker 需要等更长时间才能发现消费者真正挂掉

3)解法 2:异步处理(推荐,高吞吐场景)

// 主线程只负责 poll + submit,保持 poll 节奏
// 工作线程处理消息(可以慢)
ExecutorService executor = Executors.newFixedThreadPool(10);
 
while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        // 把消息投递到工作线程(不等待完成)
        CompletableFuture.runAsync(() -> {
            processMessage(record);  // 可以耗时 20s
        }, executor);
    }
    // 主线程立即回到 poll(保持心跳 + max.poll.interval.ms)
    
    // 注意:异步处理不能直接 commitSync(顺序不保证)
    // 需要追踪完成情况后才提交 offset
}

异步处理 + offset 管理(复杂但精确)

// 追踪异步任务的完成状态
Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new ConcurrentHashMap<>();
Map<TopicPartition, Long> completedOffsets = new ConcurrentHashMap<>();
 
for (ConsumerRecord<String, String> record : records) {
    TopicPartition tp = new TopicPartition(record.topic(), record.partition());
    long offset = record.offset();
    
    CompletableFuture.runAsync(() -> {
        processMessage(record);
        // 记录已完成的 offset(取同一分区内最大的连续完成 offset)
        completedOffsets.merge(tp, offset, Math::max);
    }, executor);
}
 
// 下次 poll 前提交已确认完成的 offset
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : completedOffsets.entrySet()) {
    toCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1));
}
if (!toCommit.isEmpty()) {
    consumer.commitAsync(toCommit, (offsets, ex) -> {
        if (ex != null) log.error("Commit failed", ex);
    });
}

4)解法 3:减小 max.poll.records

// 每次 poll 只取少量消息,确保处理完成后才 poll 下一批
props.put("max.poll.records", "5");   // 每次 5 条
props.put("max.poll.interval.ms", "300000");  // 5 分钟
 
// 5 条 × 20s = 100s < 300s ✓
// 处理完 5 条,commit offset,再 poll 下一批
 
while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processMessage(record);  // 阻塞处理
    }
    consumer.commitSync();  // 全部处理完再提交
}

5)RocketMQ 的类似问题

// RocketMQ 消费端:ProcessQueue 中的消息超过 consumeTimeout 会触发告警
consumer.setConsumeTimeout(15);  // 默认 15 分钟(比 Kafka 宽松)
 
// RocketMQ 顺序消费:单条处理慢会阻塞后续消息
// 非顺序(并发)消费:慢消息不影响其他消息处理
 
// MessageListenerConcurrently(并发消费,默认)
// 每条消息有独立线程处理,慢消息不阻塞其他消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        processMessage(msg);  // 各自独立处理
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

6)监控和排查

# 查看消费组 lag(积压量)
kafka-consumer-groups.sh --bootstrap-server host:9092 \
  --describe --group my-consumer-group
 
# 输出:
# PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST
# 0          1000            1050            50   consumer-1   /192.168.1.1
# 1          2000            2100            100  consumer-2   /192.168.1.2
 
# lag 持续增大 → 消费速度 < 生产速度,可能是处理慢
# lag 突增 → 可能发生了 Rebalance(消费停止了一段时间)

延伸追问

  • Q:Rebalance 期间消息会丢失吗? → 不会丢失。Rebalance 期间消费者停止消费(STW),Broker 持有消息不删除;Rebalance 完成后,各分区从上次提交的 offset 继续消费。已处理但未提交 offset 的消息会被重复消费(不丢但重复)。
  • Q:心跳线程和 poll 线程是同一个线程吗? → 不是。Kafka Consumer 内部有两个线程:用户线程(调用 poll())和后台心跳线程(HeartbeatThread,定期发送心跳)。只要进程存活,心跳就会发送,不依赖 poll() 的频率。但 max.poll.interval.ms 是 Broker 侧的检测,不依赖心跳——Broker 检测两次 poll 之间的时间间隔。
  • Q:Rebalance 会造成什么业务影响? → Rebalance 期间所有消费者暂停消费(Stop-The-World),持续时间 = 所有消费者的 session.timeout.ms 最大值(通常 30~60s)。业务影响:消息积压暂时增大、延迟升高;如果消费端没有幂等,重复消费可能造成重复扣款等业务问题。

我的记法

  • 两个超时:心跳超时(session.timeout.ms=10s)+ poll 间隔超时(max.poll.interval.ms=5min)
  • 处理 20s 的问题:100 条 × 20s = 2000s > 5min → Rebalance
  • 解法:减少 max.poll.records(如 5)/ 增大 max.poll.interval.ms / 异步处理
  • 异步处理时要手动追踪 offset 提交时机
  • Rebalance = STW,消息重复,不丢
  • 一句话:「poll 完必须在 max.poll.interval.ms 内再次 poll,否则被踢出消费组」

状态

  • 已背速记
  • 能画出 Rebalance 触发的时间线
  • 能说三种解法