设计多档位依赖/降级调度体系(T-k 日桶)
一句话速记
大规模数据调度系统中,跑批任务依赖 T-1(昨日)数据,当依赖数据未就绪或质量异常时,系统需要自动降档到 T-2、T-7 或基线,保证任务能够运行(可用性优先于最优数据)。设计重点:数据就绪检测 + 自动档位选择 + DAG 依赖管理 + 告警分级。
注:此条目与 多档位降级设计 互补,本条目侧重调度系统架构设计,后者侧重业务降级逻辑实现。
系统架构
数据生产层:
T-1 数据管道(ETL/Spark Job)→ 写入数仓分区 → 写就绪标志
就绪检测层:
DataReadinessChecker → 扫描各日期分区是否就绪
档位选择层:
GradeSelectorService → 根据就绪状态选择最优档位
调度执行层:
Airflow / DolphinScheduler → DAG 调度,传入档位参数执行任务
监控告警层:
Prometheus + Grafana → 档位状态大盘
PagerDuty → 降级告警
核心组件设计
1)数据就绪标志系统
# 数据生产者写完后调用
class DataReadinessRegistry:
def mark_ready(self, table: str, partition_date: str,
row_count: int, quality_score: float):
"""数据生产完成,注册就绪信息"""
record = {
"table": table,
"partition_date": partition_date,
"row_count": row_count,
"quality_score": quality_score, # 0-1,数据质量分
"ready_at": datetime.utcnow().isoformat(),
"status": "READY" if quality_score >= 0.95 else "LOW_QUALITY"
}
# 写 DB(持久化)
db.upsert("data_readiness_log", record)
# 写 Redis(快速查询)
redis_key = f"data:ready:{table}:{partition_date}"
redis.setex(redis_key, 86400 * 7, json.dumps(record))
def is_ready(self, table: str, partition_date: str,
min_quality: float = 0.95) -> bool:
"""检查数据是否就绪且质量达标"""
redis_key = f"data:ready:{table}:{partition_date}"
record_json = redis.get(redis_key)
if not record_json:
return False
record = json.loads(record_json)
return (record["status"] in ["READY", "LOW_QUALITY"] and
record["quality_score"] >= min_quality)2)档位选择 DAG 节点
Airflow 实现:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def select_data_grade(**context):
"""档位选择算子,在 DAG 开始时执行"""
today = context["ds"] # 调度日期(YYYY-MM-DD)
registry = DataReadinessRegistry()
# 档位配置(从配置中心加载)
grades = [
{"level": 0, "offset": 1, "min_quality": 0.95, "desc": "T-1(正常)"},
{"level": 1, "offset": 2, "min_quality": 0.90, "desc": "T-2(一档降级)"},
{"level": 2, "offset": 7, "min_quality": 0.80, "desc": "T-7(二档降级)"},
{"level": 3, "offset": None, "min_quality": 0, "desc": "固定基线"},
]
selected = None
for grade in grades:
if grade["offset"] is None:
# 最后兜底
selected = {**grade, "data_date": "baseline"}
break
data_date = (datetime.strptime(today, "%Y-%m-%d") -
timedelta(days=grade["offset"])).strftime("%Y-%m-%d")
if registry.is_ready(table="user_behavior", partition_date=data_date,
min_quality=grade["min_quality"]):
selected = {**grade, "data_date": data_date}
break
# 写入 XCom(传给后续 Task)
context["task_instance"].xcom_push(key="data_grade", value=selected)
# 告警
if selected["level"] > 0:
send_alert(selected)
return selected
def run_recommendation_job(**context):
"""实际业务 Task,读取档位参数"""
ti = context["task_instance"]
grade = ti.xcom_pull(key="data_grade")
data_date = grade["data_date"]
grade_level = grade["level"]
print(f"Using data grade {grade_level}: date={data_date}")
# 按档位参数读取对应分区数据
spark.sql(f"""
INSERT OVERWRITE TABLE recommend_result PARTITION(dt='{context["ds"]}')
SELECT user_id, recommend_items
FROM user_behavior
WHERE dt = '{data_date}' -- 使用选定的数据日期
""")
with DAG("daily_recommend", start_date=datetime(2024, 1, 1),
schedule_interval="0 6 * * *") as dag:
# Task 1:档位选择(5 分钟超时,选不出来则用最后兜底)
grade_select = PythonOperator(
task_id="select_data_grade",
python_callable=select_data_grade,
execution_timeout=timedelta(minutes=5),
)
# Task 2:等待数据就绪(超时自动触发降档)
wait_data = DataReadySensor(
task_id="wait_data_ready",
table="user_behavior",
timeout=3600, # 最多等 1 小时
soft_fail=True, # 超时不失败 DAG,让档位选择兜底
)
# Task 3:业务任务(使用档位参数)
run_job = PythonOperator(
task_id="run_recommendation_job",
python_callable=run_recommendation_job,
)
wait_data >> grade_select >> run_job3)DAG 依赖管理
依赖场景:
Task A 依赖 T-1 用户行为数据 → 生成推荐结果
Task B 依赖 Task A 的输出 → 生成推荐报表
Task C 依赖 T-1 销售数据 → 生成库存预测
Task D 依赖 Task A + Task C → 综合分析
档位传播规则:
Task A 选择了 T-2 档位(降级)
Task B 应该知道上游降级了(数据新鲜度降低)
→ Task B 可以决定:继续执行(接受旧数据)或等待 T-1 数据就绪
实现:XCom 传递档位元数据
grade_info = { "level": 1, "data_date": "2024-01-08", "source_tasks": ["A"] }
Task B 收到后记录到报表元数据("本报表使用了 T-2 数据")
4)监控大盘
Prometheus 指标:
data_grade_level{table="user_behavior"}: 当前档位(0/1/2/3)
data_grade_change_total{table, from_level, to_level}: 档位变化次数
data_ready_lag_seconds{table}: 数据就绪延迟(当前时间 - 数据就绪时间)
Grafana 大盘:
各表当前档位状态(颜色:绿=0档,黄=1档,橙=2档,红=3档)
过去 7 天档位历史趋势(是否频繁降级)
数据就绪延迟趋势(P50/P99)
告警规则:
任意关键表降至 1 档 → Slack 告警(oncall 群)
任意关键表降至 2 档 → PagerDuty 告警(叫人)
任意关键表降至 3 档 → 电话告警(最高级别)
数据就绪延迟 > 2h → 预警(提前告知可能降档)
延伸追问
- Q:如果所有表都降到 3 档(固定基线),系统还有意义吗? → 有意义——保证系统不崩溃,任务能执行完(哪怕结果质量差)。同时通过告警快速触发人工介入,修复数据管道,重新生成数据后手动回刷(backfill)。3 档通常是极端情况(数据平台整体故障),出现则全力修复数据平台。
- Q:为什么不直接用 Airflow 的 Sensor 等待数据就绪?
→ Airflow Sensor 等待超时后会让 DAG 失败(默认行为),而不是自动降档。需要配合
soft_fail=True(超时不失败,而是跳过)+ 后续档位选择逻辑来实现自动降档。纯等待 Sensor 的问题:数据一直不来,DAG 卡住,不如设定超时 + 降档来得优雅。
我的记法
- 核心思路:可用性 > 最优数据(宁可用旧数据,也不让任务失败)
- 就绪检测:Redis 快速查 + DB 持久化,含行数和质量分
- 档位选择:按序检查 T-1→T-2→T-7→基线,第一个就绪胜出
- Airflow:grade_select Task → xcom_push → 业务 Task 读取参数
- 监控:档位状态大盘 + 延迟趋势 + 分级告警(Slack/PagerDuty/电话)
- 一句话:「DAG 开始时选档位,XCom 传参给业务 Task,降档则分级告警」
状态
- 已背速记
- 能画调度系统的四层架构
- 能说 Airflow DAG 的档位传播方式