消费端单条处理 20s 的坑(心跳 / 重平衡)
一句话速记
Kafka 消费端处理单条消息超过 max.poll.interval.ms(默认 5 分钟),Broker 认为该消费者已死,触发 Rebalance,把分区分配给其他消费者 → 消息被重复消费;还有心跳超时(session.timeout.ms,默认 10s)也触发 Rebalance。单条处理 20s 的根本解法:提高 max.poll.interval.ms 或 异步化处理(poll 后投线程池,主线程保持 poll 节奏)。
通俗解释
Kafka 的两个超时机制:
1. 心跳超时(session.timeout.ms):
Consumer 需要定期发送心跳给 Broker
默认 10s 内没心跳 → Broker 认为 Consumer 挂了 → Rebalance
心跳由后台线程发送,与 poll() 无关(只要进程存活就会发)
2. poll 间隔超时(max.poll.interval.ms):
两次 poll() 调用之间的最大间隔
默认 5 分钟,超过 → Broker 强制踢出 Consumer → Rebalance
"处理 20s" 的问题:
poll() 取到消息 → 处理每条消息 20s × 50 条 = 1000s
1000s > 5 分钟 → 下次 poll() 之前已超时 → Rebalance
关键细节
1)问题的完整时间线
Consumer A(max.poll.interval.ms=300s, 单条处理 20s):
t=0s: poll(),取到 100 条消息(max.poll.records=100)
t=0~20s:处理第 1 条(耗时 20s)
t=20~40s:处理第 2 条
...
t=2000s:处理第 100 条完成
BUT:t=300s 时,Consumer A 的 max.poll.interval.ms 已超时!
→ Broker 触发 Rebalance
→ Consumer A 的分区被分配给 Consumer B
→ Consumer B 从上次提交的 offset 开始消费
→ Consumer A 已处理的消息(但未提交 offset)被 B 重复消费
同时:Consumer A 尝试提交 offset → 失败(CommitFailedException)
→ 因为它已经不在消费组里了
2)解法 1:增大 max.poll.interval.ms
props.put("max.poll.interval.ms", "600000"); // 10 分钟(根据实际处理时间调整)
props.put("max.poll.records", "10"); // 减少每次 poll 的消息数(10 条 × 20s = 200s < 600s)
// 适用:处理时间可预期,偶尔慢
// 缺点:Broker 需要等更长时间才能发现消费者真正挂掉3)解法 2:异步处理(推荐,高吞吐场景)
// 主线程只负责 poll + submit,保持 poll 节奏
// 工作线程处理消息(可以慢)
ExecutorService executor = Executors.newFixedThreadPool(10);
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 把消息投递到工作线程(不等待完成)
CompletableFuture.runAsync(() -> {
processMessage(record); // 可以耗时 20s
}, executor);
}
// 主线程立即回到 poll(保持心跳 + max.poll.interval.ms)
// 注意:异步处理不能直接 commitSync(顺序不保证)
// 需要追踪完成情况后才提交 offset
}异步处理 + offset 管理(复杂但精确):
// 追踪异步任务的完成状态
Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new ConcurrentHashMap<>();
Map<TopicPartition, Long> completedOffsets = new ConcurrentHashMap<>();
for (ConsumerRecord<String, String> record : records) {
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
long offset = record.offset();
CompletableFuture.runAsync(() -> {
processMessage(record);
// 记录已完成的 offset(取同一分区内最大的连续完成 offset)
completedOffsets.merge(tp, offset, Math::max);
}, executor);
}
// 下次 poll 前提交已确认完成的 offset
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : completedOffsets.entrySet()) {
toCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1));
}
if (!toCommit.isEmpty()) {
consumer.commitAsync(toCommit, (offsets, ex) -> {
if (ex != null) log.error("Commit failed", ex);
});
}4)解法 3:减小 max.poll.records
// 每次 poll 只取少量消息,确保处理完成后才 poll 下一批
props.put("max.poll.records", "5"); // 每次 5 条
props.put("max.poll.interval.ms", "300000"); // 5 分钟
// 5 条 × 20s = 100s < 300s ✓
// 处理完 5 条,commit offset,再 poll 下一批
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record); // 阻塞处理
}
consumer.commitSync(); // 全部处理完再提交
}5)RocketMQ 的类似问题
// RocketMQ 消费端:ProcessQueue 中的消息超过 consumeTimeout 会触发告警
consumer.setConsumeTimeout(15); // 默认 15 分钟(比 Kafka 宽松)
// RocketMQ 顺序消费:单条处理慢会阻塞后续消息
// 非顺序(并发)消费:慢消息不影响其他消息处理
// MessageListenerConcurrently(并发消费,默认)
// 每条消息有独立线程处理,慢消息不阻塞其他消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
processMessage(msg); // 各自独立处理
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});6)监控和排查
# 查看消费组 lag(积压量)
kafka-consumer-groups.sh --bootstrap-server host:9092 \
--describe --group my-consumer-group
# 输出:
# PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST
# 0 1000 1050 50 consumer-1 /192.168.1.1
# 1 2000 2100 100 consumer-2 /192.168.1.2
# lag 持续增大 → 消费速度 < 生产速度,可能是处理慢
# lag 突增 → 可能发生了 Rebalance(消费停止了一段时间)延伸追问
- Q:Rebalance 期间消息会丢失吗? → 不会丢失。Rebalance 期间消费者停止消费(STW),Broker 持有消息不删除;Rebalance 完成后,各分区从上次提交的 offset 继续消费。已处理但未提交 offset 的消息会被重复消费(不丢但重复)。
- Q:心跳线程和 poll 线程是同一个线程吗?
→ 不是。Kafka Consumer 内部有两个线程:用户线程(调用 poll())和后台心跳线程(HeartbeatThread,定期发送心跳)。只要进程存活,心跳就会发送,不依赖 poll() 的频率。但
max.poll.interval.ms是 Broker 侧的检测,不依赖心跳——Broker 检测两次 poll 之间的时间间隔。 - Q:Rebalance 会造成什么业务影响? → Rebalance 期间所有消费者暂停消费(Stop-The-World),持续时间 = 所有消费者的 session.timeout.ms 最大值(通常 30~60s)。业务影响:消息积压暂时增大、延迟升高;如果消费端没有幂等,重复消费可能造成重复扣款等业务问题。
我的记法
- 两个超时:心跳超时(
session.timeout.ms=10s)+ poll 间隔超时(max.poll.interval.ms=5min) - 处理 20s 的问题:100 条 × 20s = 2000s > 5min → Rebalance
- 解法:减少
max.poll.records(如 5)/ 增大max.poll.interval.ms/ 异步处理 - 异步处理时要手动追踪 offset 提交时机
- Rebalance = STW,消息重复,不丢
- 一句话:「poll 完必须在 max.poll.interval.ms 内再次 poll,否则被踢出消费组」
状态
- 已背速记
- 能画出 Rebalance 触发的时间线
- 能说三种解法