设计多档位依赖/降级调度体系(T-k 日桶)

一句话速记

大规模数据调度系统中,跑批任务依赖 T-1(昨日)数据,当依赖数据未就绪或质量异常时,系统需要自动降档到 T-2、T-7 或基线,保证任务能够运行(可用性优先于最优数据)。设计重点:数据就绪检测 + 自动档位选择 + DAG 依赖管理 + 告警分级

注:此条目与 多档位降级设计 互补,本条目侧重调度系统架构设计,后者侧重业务降级逻辑实现

系统架构

数据生产层:
  T-1 数据管道(ETL/Spark Job)→ 写入数仓分区 → 写就绪标志
  
就绪检测层:
  DataReadinessChecker → 扫描各日期分区是否就绪
  
档位选择层:
  GradeSelectorService → 根据就绪状态选择最优档位
  
调度执行层:
  Airflow / DolphinScheduler → DAG 调度,传入档位参数执行任务
  
监控告警层:
  Prometheus + Grafana → 档位状态大盘
  PagerDuty → 降级告警

核心组件设计

1)数据就绪标志系统

# 数据生产者写完后调用
class DataReadinessRegistry:
    
    def mark_ready(self, table: str, partition_date: str, 
                    row_count: int, quality_score: float):
        """数据生产完成,注册就绪信息"""
        record = {
            "table": table,
            "partition_date": partition_date,
            "row_count": row_count,
            "quality_score": quality_score,  # 0-1,数据质量分
            "ready_at": datetime.utcnow().isoformat(),
            "status": "READY" if quality_score >= 0.95 else "LOW_QUALITY"
        }
        
        # 写 DB(持久化)
        db.upsert("data_readiness_log", record)
        
        # 写 Redis(快速查询)
        redis_key = f"data:ready:{table}:{partition_date}"
        redis.setex(redis_key, 86400 * 7, json.dumps(record))
    
    def is_ready(self, table: str, partition_date: str, 
                  min_quality: float = 0.95) -> bool:
        """检查数据是否就绪且质量达标"""
        redis_key = f"data:ready:{table}:{partition_date}"
        record_json = redis.get(redis_key)
        
        if not record_json:
            return False
        
        record = json.loads(record_json)
        return (record["status"] in ["READY", "LOW_QUALITY"] and 
                record["quality_score"] >= min_quality)

2)档位选择 DAG 节点

Airflow 实现

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
 
def select_data_grade(**context):
    """档位选择算子,在 DAG 开始时执行"""
    today = context["ds"]  # 调度日期(YYYY-MM-DD)
    
    registry = DataReadinessRegistry()
    
    # 档位配置(从配置中心加载)
    grades = [
        {"level": 0, "offset": 1, "min_quality": 0.95, "desc": "T-1(正常)"},
        {"level": 1, "offset": 2, "min_quality": 0.90, "desc": "T-2(一档降级)"},
        {"level": 2, "offset": 7, "min_quality": 0.80, "desc": "T-7(二档降级)"},
        {"level": 3, "offset": None, "min_quality": 0,  "desc": "固定基线"},
    ]
    
    selected = None
    for grade in grades:
        if grade["offset"] is None:
            # 最后兜底
            selected = {**grade, "data_date": "baseline"}
            break
        
        data_date = (datetime.strptime(today, "%Y-%m-%d") - 
                     timedelta(days=grade["offset"])).strftime("%Y-%m-%d")
        
        if registry.is_ready(table="user_behavior", partition_date=data_date,
                              min_quality=grade["min_quality"]):
            selected = {**grade, "data_date": data_date}
            break
    
    # 写入 XCom(传给后续 Task)
    context["task_instance"].xcom_push(key="data_grade", value=selected)
    
    # 告警
    if selected["level"] > 0:
        send_alert(selected)
    
    return selected
 
 
def run_recommendation_job(**context):
    """实际业务 Task,读取档位参数"""
    ti = context["task_instance"]
    grade = ti.xcom_pull(key="data_grade")
    
    data_date = grade["data_date"]
    grade_level = grade["level"]
    
    print(f"Using data grade {grade_level}: date={data_date}")
    
    # 按档位参数读取对应分区数据
    spark.sql(f"""
        INSERT OVERWRITE TABLE recommend_result PARTITION(dt='{context["ds"]}')
        SELECT user_id, recommend_items
        FROM user_behavior
        WHERE dt = '{data_date}'  -- 使用选定的数据日期
    """)
 
 
with DAG("daily_recommend", start_date=datetime(2024, 1, 1), 
         schedule_interval="0 6 * * *") as dag:
    
    # Task 1:档位选择(5 分钟超时,选不出来则用最后兜底)
    grade_select = PythonOperator(
        task_id="select_data_grade",
        python_callable=select_data_grade,
        execution_timeout=timedelta(minutes=5),
    )
    
    # Task 2:等待数据就绪(超时自动触发降档)
    wait_data = DataReadySensor(
        task_id="wait_data_ready",
        table="user_behavior",
        timeout=3600,  # 最多等 1 小时
        soft_fail=True,  # 超时不失败 DAG,让档位选择兜底
    )
    
    # Task 3:业务任务(使用档位参数)
    run_job = PythonOperator(
        task_id="run_recommendation_job",
        python_callable=run_recommendation_job,
    )
    
    wait_data >> grade_select >> run_job

3)DAG 依赖管理

依赖场景:
  Task A 依赖 T-1 用户行为数据 → 生成推荐结果
  Task B 依赖 Task A 的输出 → 生成推荐报表
  Task C 依赖 T-1 销售数据 → 生成库存预测
  Task D 依赖 Task A + Task C → 综合分析

档位传播规则:
  Task A 选择了 T-2 档位(降级)
  Task B 应该知道上游降级了(数据新鲜度降低)
  → Task B 可以决定:继续执行(接受旧数据)或等待 T-1 数据就绪

实现:XCom 传递档位元数据
  grade_info = { "level": 1, "data_date": "2024-01-08", "source_tasks": ["A"] }
  Task B 收到后记录到报表元数据("本报表使用了 T-2 数据")

4)监控大盘

Prometheus 指标:
  data_grade_level{table="user_behavior"}: 当前档位(0/1/2/3)
  data_grade_change_total{table, from_level, to_level}: 档位变化次数
  data_ready_lag_seconds{table}: 数据就绪延迟(当前时间 - 数据就绪时间)

Grafana 大盘:
  各表当前档位状态(颜色:绿=0档,黄=1档,橙=2档,红=3档)
  过去 7 天档位历史趋势(是否频繁降级)
  数据就绪延迟趋势(P50/P99)

告警规则:
  任意关键表降至 1 档 → Slack 告警(oncall 群)
  任意关键表降至 2 档 → PagerDuty 告警(叫人)
  任意关键表降至 3 档 → 电话告警(最高级别)
  数据就绪延迟 > 2h → 预警(提前告知可能降档)

延伸追问

  • Q:如果所有表都降到 3 档(固定基线),系统还有意义吗? → 有意义——保证系统不崩溃,任务能执行完(哪怕结果质量差)。同时通过告警快速触发人工介入,修复数据管道,重新生成数据后手动回刷(backfill)。3 档通常是极端情况(数据平台整体故障),出现则全力修复数据平台。
  • Q:为什么不直接用 Airflow 的 Sensor 等待数据就绪? → Airflow Sensor 等待超时后会让 DAG 失败(默认行为),而不是自动降档。需要配合 soft_fail=True(超时不失败,而是跳过)+ 后续档位选择逻辑来实现自动降档。纯等待 Sensor 的问题:数据一直不来,DAG 卡住,不如设定超时 + 降档来得优雅。

我的记法

  • 核心思路:可用性 > 最优数据(宁可用旧数据,也不让任务失败)
  • 就绪检测:Redis 快速查 + DB 持久化,含行数和质量分
  • 档位选择:按序检查 T-1→T-2→T-7→基线,第一个就绪胜出
  • Airflow:grade_select Task → xcom_push → 业务 Task 读取参数
  • 监控:档位状态大盘 + 延迟趋势 + 分级告警(Slack/PagerDuty/电话)
  • 一句话:「DAG 开始时选档位,XCom 传参给业务 Task,降档则分级告警」

状态

  • 已背速记
  • 能画调度系统的四层架构
  • 能说 Airflow DAG 的档位传播方式