消息”不丢 + 不重”的完整方案(生产端 / Broker / 消费端)
一句话速记
消息不丢:生产端确认 ACK(同步/异步 confirm)+ Broker 持久化 + 消费端手动 ACK;消息不重(Exactly-Once)本质上极难实现,工程上追求至少一次 + 消费幂等(重复消息不产生错误结果)。三端都有各自的保证机制,任一端出问题都可能丢/重。
通俗解释
消息可靠传递的三段论:
生产端 ──────→ Broker ──────→ 消费端
↑ 确认机制 ↑ 持久化 ↑ 幂等处理
关键细节
1)生产端:不丢消息
Kafka 生产端:
// 关键配置
Properties props = new Properties();
props.put("acks", "all"); // ISR 中所有副本都写入才 ACK(最安全)
// acks=0:不等 ACK(最快,可能丢)
// acks=1:只等 Leader 写入(Leader 宕机可能丢)
// acks=all(-1):等所有 ISR 副本写入(不丢)
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("enable.idempotence", "true"); // 开启幂等(防止重试导致重复)
props.put("max.in.flight.requests.per.connection", "5"); // 幂等模式限制
// 同步发送(确保投递)
ProducerRecord<String, String> record = new ProducerRecord<>("topic", key, value);
try {
RecordMetadata metadata = producer.send(record).get(); // .get() 阻塞等待 ACK
log.info("Sent to partition {}, offset {}", metadata.partition(), metadata.offset());
} catch (Exception e) {
// 投递失败,保存到本地消息表,后续补偿
saveToLocalMessageTable(record);
}RocketMQ 生产端:
// 同步发送(推荐)
SendResult result = producer.send(message);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 重试或保存本地
}
// 事务消息(最可靠)
// 解决"本地事务 + 发消息"的原子性问题
TransactionMQProducer txProducer = new TransactionMQProducer("group");
txProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
doLocalBusiness();
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// Broker 定时回查本地事务状态(30 秒一次)
boolean done = checkLocalBizStatus(msg.getTransactionId());
return done ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
}
});本地消息表方案(通用,任何 MQ 都适用):
-- 本地消息表(和业务数据在同一库)
CREATE TABLE local_message (
id BIGINT PRIMARY KEY,
topic VARCHAR(100),
body TEXT,
status TINYINT DEFAULT 0, -- 0=待发送, 1=已发送, 2=发送失败
created_at DATETIME,
retry_count INT DEFAULT 0
);
-- 业务操作 + 写消息表,同一事务(原子性)
BEGIN;
INSERT INTO orders ...; -- 业务操作
INSERT INTO local_message ...; -- 记录待发送消息
COMMIT;
-- 后台定时任务扫描 status=0 的消息并发送到 MQ
-- 发送成功后更新 status=12)Broker:不丢消息
Kafka Broker:
# 关闭脏选举(Leader 宕机时,只从 ISR 中选 Leader)
unclean.leader.election.enable=false
# ISR 最小同步副本数
min.insync.replicas=2 # 至少 2 个副本同步才可写入(配合 acks=all)
# 持久化(默认已持久化到磁盘,无需特殊配置)
# Kafka 的日志文件默认 fsync 由 OS 管理(性能优先)
# 极端情况下(电源故障)可能丢最后几条RocketMQ Broker:
# 同步刷盘(每条消息都 fsync,最安全)
flushDiskType=SYNC_FLUSH # vs ASYNC_FLUSH(异步,快但可能丢)
# 主从同步模式
brokerRole=SYNC_MASTER # 同步复制到 Slave 才 ACK 给 Producer
# vs ASYNC_MASTER(异步复制,快但 Master 宕机可能丢)3)消费端:不丢 + 幂等
不丢消息(手动 ACK):
// Kafka 消费端:关闭自动提交,手动 commit offset
props.put("enable.auto.commit", "false");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record.value());
// 处理成功才提交 offset(按批次提交)
} catch (Exception e) {
// 处理失败,不提交 offset,下次重新消费(重试)
log.error("Process failed, will retry", e);
break; // 跳出循环,不提交
}
}
consumer.commitSync(); // 手动同步提交
// 注意:auto.commit 下,poll 后立即提交 offset
// 如果处理失败(业务异常),offset 已提交 → 消息"丢失"消费幂等(不重复处理):
// 幂等实现方案 1:数据库唯一键
// 消息 ID 作为唯一键,重复消费时 INSERT 失败(忽略)
try {
jdbcTemplate.update(
"INSERT INTO processed_messages(message_id, ...) VALUES(?, ...)",
messageId, ...
);
doBusinessLogic(); // 业务逻辑
} catch (DuplicateKeyException e) {
// 重复消息,忽略
log.info("Duplicate message {}, skip", messageId);
}
// 幂等实现方案 2:Redis 去重
String processKey = "processed:" + messageId;
if (redis.setnx(processKey, "1")) { // SET NX
redis.expire(processKey, 86400); // 24 小时过期(消息幂等窗口)
doBusinessLogic();
} else {
log.info("Duplicate message {}, skip", messageId);
}
// 幂等实现方案 3:状态机(业务天然幂等)
// 订单状态:CREATED → PAID → SHIPPED
// 重复收到"支付成功"消息时,检查当前状态
if (order.getStatus() != OrderStatus.CREATED) {
log.info("Order {} already processed, skip", orderId);
return;
}
// 只有 CREATED 状态才处理4)“不重”(Exactly-Once)的工程实现
理论上:Exactly-Once = At-Least-Once + 幂等消费
Kafka Exactly-Once(生产端):
enable.idempotence=true:单会话内幂等(Producer 重启则失效)
事务 API(Producer.beginTransaction / commitTransaction):
跨主题、跨分区的原子写入(流处理场景)
实际工程中的"Effectively-Once":
- Broker 保证 At-Least-Once(可能重复,但不丢)
- 消费端保证幂等(重复消费不产生副作用)
- 结合 = 业务视角的 Exactly-Once
重复消费的场景:
1. 消费端处理成功,commit offset 前崩溃 → 重启后重新消费
2. 网络分区导致 Broker 收到重复消息(Producer 重试)
3. Rebalance 期间,已被处理但 offset 未提交的消息被重新分配
延伸追问
- Q:事务消息和本地消息表方案怎么选? → 本地消息表:无需 MQ 框架支持,DB 原生事务保证,简单可控,适合大多数业务;事务消息(RocketMQ):不依赖 DB 额外表,框架提供回查机制,适合 MQ 是核心组件的架构。性能上本地消息表因额外 DB 操作稍慢,但大多数场景差距可接受。
- Q:Kafka 的
at-least-once在什么情况下变成at-most-once? → 开启auto.commit=true时:poll 取到消息后,周期性自动提交 offset(不管消息是否处理成功)。如果消费者在 commit 之后、处理之前崩溃,消息被标记已消费但实际未处理 →at-most-once(最多一次)。所以生产中一定要enable.auto.commit=false+ 手动 commit。 - Q:消息幂等 ID 用什么来源? → 优先用业务 ID(如 order_id + event_type 的组合),天然有业务语义,易于排查;其次用 MQ 框架提供的 message_id(如 RocketMQ 的 msgId,Kafka 的 offset + partition 组合)。避免用随机 UUID(每次重试 UUID 不同,无法去重)。
我的记法
- 生产端不丢:acks=all + 重试 + 本地消息表(事务保证)
- Broker 不丢:持久化 + 副本( min.insync.replicas=2)+ 关闭脏选举
- 消费端不丢:手动 ACK(auto.commit=false),处理成功再 commit
- 幂等:唯一键(DB)或 Redis SETNX,基于业务 ID 去重
- 工程上 = At-Least-Once + 消费幂等 = “Effectively-Once”
- 一句话:「不丢靠三端确认,不重靠消费幂等,二者结合 = 生产级可靠」
状态
- 已背速记(三端保证)
- 能写消费端手动 ACK + 幂等代码
- 能解释本地消息表方案