多档位降级设计(T-k 日依赖的调度或跑批里常见)
一句话速记
调度任务依赖 T-1 数据(昨日数据),当 T-1 数据未就绪或质量异常时,不应让整个任务失败——而是降档到 T-2、T-7 或基线数据继续运行,保证业务不中断,同时告警通知人工介入。核心设计:档位定义 + 数据就绪检查 + 自动降档 + 告警分级 + 手动覆盖。
关键细节
1)典型场景
风控规则更新(依赖昨日用户行为数据):
T-1 数据 = 最优,T-7 数据 = 可接受,固定规则 = 最差但可用
推荐算法每日更新(依赖昨日用户行为):
T-1 = 最新画像,T-2 = 略旧,T-7 = 一周前,固定热榜 = 保底
营销活动配置(依赖昨日销售数据):
T-1 数据失败 → 用 T-2 数据的活动策略继续投放
报表/监控大盘(依赖昨日聚合数据):
数据管道延迟 → 先展示 T-1 的数据(昨日),等今日数据就绪再切换
2)档位定义(以推荐系统为例)
档位 0(正常):使用 T-1 数据(昨日用户行为训练的模型)
档位 1(降级一档):使用 T-2 数据(大后天的模型,稍旧)
档位 2(降级两档):使用 T-7 数据(上周的模型)
档位 3(降级三档):使用固定热榜(预计算的 Top100 商品,写死在配置中)
触发条件:
档位 0 → 1:T-1 数据在 N:00 前未就绪 OR 数据质量检查不通过
档位 1 → 2:T-2 数据也不可用(极少见)
档位 2 → 3:历史数据都不可用(系统性故障)
3)数据就绪检查
def check_data_ready(date: str, table: str) -> bool:
"""检查某日数据是否就绪"""
# 方法 1:检查完成标志(最可靠)
flag_key = f"data_ready:{table}:{date}"
if redis.exists(flag_key):
return True
# 方法 2:检查行数(与历史均值对比)
count = db.query(f"SELECT COUNT(*) FROM {table} WHERE dt='{date}'")
history_avg = get_history_avg(table, date) # 过去 7 天同日均值
if count < history_avg * 0.7:
log.warn(f"Data volume too low: {count} vs expected {history_avg}")
return False # 行数异常,认为数据未就绪或质量差
# 方法 3:检查关键指标(业务层校验)
key_metrics = db.query(f"""
SELECT SUM(uv) as total_uv FROM {table} WHERE dt='{date}'
""")
if key_metrics.total_uv < MIN_ACCEPTABLE_UV:
return False
return True
# 生产者端写完数据后,写完成标志
def mark_data_ready(date, table):
redis.setex(f"data_ready:{table}:{date}", 86400 * 3, "1") # 3天有效4)自动降档逻辑
class GradeSelector:
GRADES = [
{"level": 0, "offset_days": 1, "desc": "T-1数据(正常)"},
{"level": 1, "offset_days": 2, "desc": "T-2数据(一档降级)"},
{"level": 2, "offset_days": 7, "desc": "T-7数据(二档降级)"},
{"level": 3, "offset_days": None, "desc": "固定基线数据(最后兜底)"},
]
def select_grade(self, today: date, table: str) -> dict:
"""自动选择最优可用档位"""
for grade in self.GRADES:
if grade["offset_days"] is None:
# 最后兜底,不检查数据就绪
self.alert(grade["level"], "ALL_DATA_UNAVAILABLE")
return grade
target_date = today - timedelta(days=grade["offset_days"])
if check_data_ready(str(target_date), table):
if grade["level"] > 0:
# 降级了,需要告警
self.alert(grade["level"], f"Using {target_date} data")
return {**grade, "data_date": target_date}
# 不应该走到这里(最后一档是固定数据,一定可用)
raise Exception("No available data grade!")
def alert(self, level: int, reason: str):
severity = {0: "INFO", 1: "WARN", 2: "ERROR", 3: "CRITICAL"}[level]
alert_service.send(
title=f"[数据降级] 降至第{level}档",
body=f"原因:{reason}",
severity=severity,
oncall=level >= 2 # 2档以上叫人
)5)配置化与手动覆盖
# 配置中心(Nacos/Apollo),运营可手动修改
recommend:
data:
auto_grade: true # 是否自动降档(可关闭)
force_grade: null # 手动强制档位(null=自动,0/1/2/3=强制)
ready_check_timeout: 30 # 就绪检查超时时间(分钟)
min_volume_ratio: 0.7 # 最低数据量比例(低于均值70%则认为不就绪)def get_effective_grade():
config = load_config()
# 手动强制档位(最高优先级)
if config.force_grade is not None:
log.info(f"Using forced grade: {config.force_grade}")
return GRADES[config.force_grade]
# 自动降档
if config.auto_grade:
return selector.select_grade(today, TABLE)
# 关闭自动降档(固定用 T-1,失败则任务失败)
return GRADES[0]6)档位状态可视化(监控大盘)
监控指标:
current_grade:当前使用档位(0/1/2/3)
data_date:实际使用的数据日期
grade_change_count:今日档位变化次数
last_grade_change_at:最近一次档位变化时间
告警规则:
current_grade >= 1 → 发 Slack 告警(值班群)
current_grade >= 2 → 发 PagerDuty(叫人)
current_grade == 3 → 发 PagerDuty + 短信(紧急)
延伸追问
- Q:T-7 的数据用于当天,会不会导致推荐结果很差? → 会降低推荐质量,但不影响系统可用性。告警出来后人工快速修复数据问题,通常 T-7 只用几小时。对推荐精准度要求极高的场景(如实时竞价广告),需要更强的数据保障(专属数据管道、双路备份),而不是靠降级兜底。
- Q:调度任务如何知道用哪个档位的数据?
→ 档位选择器在任务启动前运行(作为 pre-check),输出所选档位和数据日期,注入到任务的上下文参数中。任务执行时用该参数读取对应日期的数据表(表名通常带日期分区
table_20240101)。也可以写入配置中心,其他依赖方也能读到当前用哪天的数据。
我的记法
- 档位:T-1(正常)→ T-2(一档降级)→ T-7(二档降级)→ 固定基线(最后兜底)
- 就绪检查:完成标志(Redis)+ 行数校验(vs 历史均值)+ 关键指标校验
- 自动降档:按序检查,第一个就绪的档位胜出
- 手动覆盖:配置中心 force_grade,运营可直接干预
- 告警:降档 ≥ 1 发群告警,降档 ≥ 2 叫人,降档 = 3 叫人+短信
- 一句话:「T-k 日依赖 = 就绪检查 + 自动降档 + 分级告警 + 手动覆盖」
状态
- 已背速记
- 能说档位设计的四个档位
- 能说数据就绪检查的三种方法