消息”不丢 + 不重”的完整方案(生产端 / Broker / 消费端)

一句话速记

消息不丢:生产端确认 ACK(同步/异步 confirm)+ Broker 持久化 + 消费端手动 ACK;消息不重(Exactly-Once)本质上极难实现,工程上追求至少一次 + 消费幂等(重复消息不产生错误结果)。三端都有各自的保证机制,任一端出问题都可能丢/重。

通俗解释

消息可靠传递的三段论

生产端 ──────→ Broker ──────→ 消费端
   ↑ 确认机制     ↑ 持久化      ↑ 幂等处理

关键细节

1)生产端:不丢消息

Kafka 生产端

// 关键配置
Properties props = new Properties();
props.put("acks", "all");              // ISR 中所有副本都写入才 ACK(最安全)
// acks=0:不等 ACK(最快,可能丢)
// acks=1:只等 Leader 写入(Leader 宕机可能丢)
// acks=all(-1):等所有 ISR 副本写入(不丢)
 
props.put("retries", Integer.MAX_VALUE);     // 无限重试
props.put("enable.idempotence", "true");     // 开启幂等(防止重试导致重复)
props.put("max.in.flight.requests.per.connection", "5");  // 幂等模式限制
 
// 同步发送(确保投递)
ProducerRecord<String, String> record = new ProducerRecord<>("topic", key, value);
try {
    RecordMetadata metadata = producer.send(record).get();  // .get() 阻塞等待 ACK
    log.info("Sent to partition {}, offset {}", metadata.partition(), metadata.offset());
} catch (Exception e) {
    // 投递失败,保存到本地消息表,后续补偿
    saveToLocalMessageTable(record);
}

RocketMQ 生产端

// 同步发送(推荐)
SendResult result = producer.send(message);
if (result.getSendStatus() != SendStatus.SEND_OK) {
    // 重试或保存本地
}
 
// 事务消息(最可靠)
// 解决"本地事务 + 发消息"的原子性问题
TransactionMQProducer txProducer = new TransactionMQProducer("group");
txProducer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务
            doLocalBusiness();
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // Broker 定时回查本地事务状态(30 秒一次)
        boolean done = checkLocalBizStatus(msg.getTransactionId());
        return done ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
    }
});

本地消息表方案(通用,任何 MQ 都适用)

-- 本地消息表(和业务数据在同一库)
CREATE TABLE local_message (
    id BIGINT PRIMARY KEY,
    topic VARCHAR(100),
    body TEXT,
    status TINYINT DEFAULT 0,  -- 0=待发送, 1=已发送, 2=发送失败
    created_at DATETIME,
    retry_count INT DEFAULT 0
);
 
-- 业务操作 + 写消息表,同一事务(原子性)
BEGIN;
  INSERT INTO orders ...;               -- 业务操作
  INSERT INTO local_message ...;        -- 记录待发送消息
COMMIT;
 
-- 后台定时任务扫描 status=0 的消息并发送到 MQ
-- 发送成功后更新 status=1

2)Broker:不丢消息

Kafka Broker

# 关闭脏选举(Leader 宕机时,只从 ISR 中选 Leader)
unclean.leader.election.enable=false
 
# ISR 最小同步副本数
min.insync.replicas=2  # 至少 2 个副本同步才可写入(配合 acks=all)
 
# 持久化(默认已持久化到磁盘,无需特殊配置)
# Kafka 的日志文件默认 fsync 由 OS 管理(性能优先)
# 极端情况下(电源故障)可能丢最后几条

RocketMQ Broker

# 同步刷盘(每条消息都 fsync,最安全)
flushDiskType=SYNC_FLUSH  # vs ASYNC_FLUSH(异步,快但可能丢)
 
# 主从同步模式
brokerRole=SYNC_MASTER   # 同步复制到 Slave 才 ACK 给 Producer
# vs ASYNC_MASTER(异步复制,快但 Master 宕机可能丢)

3)消费端:不丢 + 幂等

不丢消息(手动 ACK)

// Kafka 消费端:关闭自动提交,手动 commit offset
props.put("enable.auto.commit", "false");
 
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    try {
        processMessage(record.value());
        // 处理成功才提交 offset(按批次提交)
    } catch (Exception e) {
        // 处理失败,不提交 offset,下次重新消费(重试)
        log.error("Process failed, will retry", e);
        break;  // 跳出循环,不提交
    }
}
consumer.commitSync();  // 手动同步提交
 
// 注意:auto.commit 下,poll 后立即提交 offset
// 如果处理失败(业务异常),offset 已提交 → 消息"丢失"

消费幂等(不重复处理)

// 幂等实现方案 1:数据库唯一键
// 消息 ID 作为唯一键,重复消费时 INSERT 失败(忽略)
try {
    jdbcTemplate.update(
        "INSERT INTO processed_messages(message_id, ...) VALUES(?, ...)",
        messageId, ...
    );
    doBusinessLogic();  // 业务逻辑
} catch (DuplicateKeyException e) {
    // 重复消息,忽略
    log.info("Duplicate message {}, skip", messageId);
}
 
// 幂等实现方案 2:Redis 去重
String processKey = "processed:" + messageId;
if (redis.setnx(processKey, "1")) {  // SET NX
    redis.expire(processKey, 86400);  // 24 小时过期(消息幂等窗口)
    doBusinessLogic();
} else {
    log.info("Duplicate message {}, skip", messageId);
}
 
// 幂等实现方案 3:状态机(业务天然幂等)
// 订单状态:CREATED → PAID → SHIPPED
// 重复收到"支付成功"消息时,检查当前状态
if (order.getStatus() != OrderStatus.CREATED) {
    log.info("Order {} already processed, skip", orderId);
    return;
}
// 只有 CREATED 状态才处理

4)“不重”(Exactly-Once)的工程实现

理论上:Exactly-Once = At-Least-Once + 幂等消费

Kafka Exactly-Once(生产端):
  enable.idempotence=true:单会话内幂等(Producer 重启则失效)
  事务 API(Producer.beginTransaction / commitTransaction):
    跨主题、跨分区的原子写入(流处理场景)

实际工程中的"Effectively-Once":
  - Broker 保证 At-Least-Once(可能重复,但不丢)
  - 消费端保证幂等(重复消费不产生副作用)
  - 结合 = 业务视角的 Exactly-Once

重复消费的场景:
  1. 消费端处理成功,commit offset 前崩溃 → 重启后重新消费
  2. 网络分区导致 Broker 收到重复消息(Producer 重试)
  3. Rebalance 期间,已被处理但 offset 未提交的消息被重新分配

延伸追问

  • Q:事务消息和本地消息表方案怎么选? → 本地消息表:无需 MQ 框架支持,DB 原生事务保证,简单可控,适合大多数业务;事务消息(RocketMQ):不依赖 DB 额外表,框架提供回查机制,适合 MQ 是核心组件的架构。性能上本地消息表因额外 DB 操作稍慢,但大多数场景差距可接受。
  • Q:Kafka 的 at-least-once 在什么情况下变成 at-most-once → 开启 auto.commit=true 时:poll 取到消息后,周期性自动提交 offset(不管消息是否处理成功)。如果消费者在 commit 之后、处理之前崩溃,消息被标记已消费但实际未处理 → at-most-once(最多一次)。所以生产中一定要 enable.auto.commit=false + 手动 commit。
  • Q:消息幂等 ID 用什么来源? → 优先用业务 ID(如 order_id + event_type 的组合),天然有业务语义,易于排查;其次用 MQ 框架提供的 message_id(如 RocketMQ 的 msgId,Kafka 的 offset + partition 组合)。避免用随机 UUID(每次重试 UUID 不同,无法去重)。

我的记法

  • 生产端不丢:acks=all + 重试 + 本地消息表(事务保证)
  • Broker 不丢:持久化 + 副本( min.insync.replicas=2)+ 关闭脏选举
  • 消费端不丢:手动 ACK(auto.commit=false),处理成功再 commit
  • 幂等:唯一键(DB)或 Redis SETNX,基于业务 ID 去重
  • 工程上 = At-Least-Once + 消费幂等 = “Effectively-Once”
  • 一句话:「不丢靠三端确认,不重靠消费幂等,二者结合 = 生产级可靠」

状态

  • 已背速记(三端保证)
  • 能写消费端手动 ACK + 幂等代码
  • 能解释本地消息表方案