双写期间的一致性校验与差异修复

一句话速记

双写期间两个库会出现差异(新库写失败、延迟、顺序问题),需要持续校验 + 自动修复保证新库最终与老库一致,只有校验通过才能推进切读比例。校验手段:实时抽样校验(每次双写成功后随机抽 1%)+ 全量周期校验(每天凌晨扫描全量)+ 差异修复(补偿任务从老库重新同步差异 ID)。

关键细节

1)差异产生的原因

原因 1:双写中新库写失败
  老库写成功,新库写失败(网络超时、容量不足等)
  → 新库缺少该记录

原因 2:顺序问题(Canal 旁路同步场景)
  老库写操作:先 INSERT(v1),再 UPDATE(v2)
  Canal 消费延迟,或消费乱序(MQ 没保证顺序)
  → 新库可能只有 v1 没有 v2,或两次操作顺序反了

原因 3:类型转换差异
  老库 datetime 类型 → 新库 timestamp 类型(精度差异)
  老库 decimal(10,2) → 新库 float(浮点精度问题)
  → 内容相同但序列化后不同

原因 4:业务代码 bug
  双写代码写错字段映射
  新老库的字段名或含义稍有差异

2)实时抽样校验

双写后立即随机抽验

@Service
public class DoubleWriteService {
    
    @Autowired private CompareService compareService;
    
    public void saveOrder(Order order) {
        // 老库写
        oldDao.save(order);
        
        // 新库写
        try {
            newDao.save(order);
        } catch (Exception e) {
            failedIds.add(order.getId());
            return;
        }
        
        // 随机抽样校验(1% 的双写操作触发)
        if (ThreadLocalRandom.current().nextInt(100) == 0) {
            asyncExecutor.execute(() -> compareService.compareById(order.getId()));
        }
    }
}
 
@Service
public class CompareService {
    
    public void compareById(Long orderId) {
        Order oldOrder = oldDao.findById(orderId);
        Order newOrder = newDao.findById(orderId);
        
        if (!equals(oldOrder, newOrder)) {
            log.warn("Data mismatch: orderId={}", orderId);
            metrics.increment("double_write.mismatch");
            // 记录到 mismatch_log 表
            mismatchLog.save(orderId, serialize(oldOrder), serialize(newOrder));
        }
    }
    
    private boolean equals(Order old, Order newObj) {
        if (old == null && newObj == null) return true;
        if (old == null || newObj == null) return false;
        // 比较关键字段(忽略时间精度差异)
        return Objects.equals(old.getAmount(), newObj.getAmount())
            && Objects.equals(old.getStatus(), newObj.getStatus())
            && Objects.equals(old.getUserId(), newObj.getUserId());
    }
}

3)全量周期校验

分页扫描,分批对比(每天凌晨低峰期)

# 全量校验脚本
class FullVerifier:
    BATCH_SIZE = 5000
    
    def run(self, start_id=0):
        total_mismatch = 0
        current_id = start_id
        
        while True:
            # 从老库分批读取(按 ID 范围,减少全表扫描压力)
            old_records = old_db.query("""
                SELECT id, amount, status, user_id, updated_at
                FROM orders WHERE id > %s ORDER BY id LIMIT %s
            """, current_id, self.BATCH_SIZE)
            
            if not old_records:
                break
            
            ids = [r['id'] for r in old_records]
            
            # 从新库读取相同 ID
            new_records = new_db.query("""
                SELECT id, amount, status, user_id, updated_at
                FROM orders WHERE id IN %s
            """, tuple(ids))
            
            new_dict = {r['id']: r for r in new_records}
            
            for old in old_records:
                new = new_dict.get(old['id'])
                if not new:
                    # 新库缺失
                    self.record_missing(old['id'])
                    total_mismatch += 1
                elif not self.is_equal(old, new):
                    # 内容不一致
                    self.record_mismatch(old['id'], old, new)
                    total_mismatch += 1
            
            current_id = ids[-1]  # 继续下一批
            time.sleep(0.05)      # 限速(防止打满 DB)
        
        return total_mismatch
    
    def is_equal(self, old, new):
        return (old['amount'] == new['amount'] and
                old['status'] == new['status'] and
                old['user_id'] == new['user_id'])

4)差异修复(Repair Job)

@Scheduled(fixedDelay = 60_000)  // 每分钟执行
public void repairMismatch() {
    // 查询最近 1 小时内的差异记录
    List<Long> mismatchIds = mismatchLog.findUnrepaired(Duration.ofHours(1));
    
    for (Long orderId : mismatchIds) {
        try {
            // 以老库为准,同步到新库
            Order oldOrder = oldDao.findById(orderId);
            if (oldOrder == null) {
                // 老库也没有(已删除)→ 新库也删除
                newDao.deleteById(orderId);
            } else {
                // 用老库数据覆盖新库(幂等 UPSERT)
                newDao.upsert(oldOrder);
            }
            // 标记已修复
            mismatchLog.markRepaired(orderId);
        } catch (Exception e) {
            log.error("Repair failed for orderId: {}", orderId, e);
        }
    }
}

5)校验指标与切读门槛

校验指标:
  mismatch_rate = 差异记录数 / 总校验记录数
  repair_success_rate = 修复成功 / 总差异数
  
切读门槛(经验值):
  mismatch_rate < 0.01%(万分之一)且持续 24h → 可以切 10% 读
  mismatch_rate < 0.001% 且持续 48h → 可以切 50% 读
  全量校验通过(差异 = 0)且持续 72h → 可以切 100% 读
  
自动阻断:
  mismatch_rate 突然上升 → 告警 + 暂停切读进度
  repair_success_rate < 90% → 告警(可能有系统性问题,需人工介入)

6)差异类型分类处理

类型 1:新库缺失(MISSING)
  原因:双写失败 / Canal 延迟未到达
  修复:从老库读出 → 写入新库
  
类型 2:新库多余(EXTRA)
  原因:老库已删除,新库未同步删除(删除 Binlog 未到达)
  修复:从新库删除(谨慎,先确认老库确实没有)
  
类型 3:内容不一致(MISMATCH)
  原因:写入顺序问题 / 类型转换差异 / Bug
  修复:以老库为准,UPSERT 到新库
  特殊:如果是字段精度问题(float vs decimal)→ 修改新库字段类型 → 重新迁移

延伸追问

  • Q:校验时新老库都在被写入,如何保证校验的”快照一致性”? → 严格来说做不到实时一致(双写延迟 + Canal 延迟)。工程实践:校验时给老库记录加 updated_at 范围过滤(只校验 N 分钟前的数据,确保 Canal 延迟已过),或接受”最终一致”的校验(允许当前正在写的数据短暂不一致)。全量校验选凌晨低峰(写入量最小)。
  • Q:差异修复时,以老库为准,但老库数据可能也过时了(刚被更新),怎么办? → 修复任务不需要保证实时,下次校验任务会再次检测。修复+校验是一个循环:差异 → 修复 → 再校验 → 无差异 → 完成。允许修复后立即触发一次对该 ID 的快速校验确认。

我的记法

  • 差异来源:新库写失败 / Canal 乱序 / 类型转换 / Bug
  • 校验:实时抽样(1%双写后校验)+ 全量周期(凌晨低峰扫描)
  • 修复:以老库为准,UPSERT 到新库(幂等),标记已修复
  • 门槛:mismatch_rate < 0.01% 持续 24h → 开始切读
  • 一句话:「校验 = 实时抽样 + 全量扫描,修复 = 以老库为准 UPSERT,通过才切读」

状态

  • 已背速记
  • 能说三类差异的来源和修复方式
  • 能说切读的门槛指标