设计优先级任务队列

一句话速记

优先级队列核心问题:高优先级任务先执行,同优先级 FIFO,低优先级任务不饿死。实现方案:多优先级独立 Topic(最简单,N 个 Topic 对应 N 个优先级,消费者按优先级顺序 poll);或单 Topic + 优先级字段(消费者排序,适合优先级粒度细的场景);复杂场景用多消费者组 + 权重分配(高优先级消费者多,低优先级消费者少)。

系统需求

场景举例:
  AI 平台的任务调度:VIP 用户的推理任务 > 普通用户 > 批量任务
  风控系统:实时风控决策 > 异步策略更新 > 历史数据回刷
  数据管道:核心数据处理 > 非核心数据 > 冷数据归档

需求:
  任务有优先级(如 P0/P1/P2,或 1-10 分)
  高优先级任务比低优先级任务先消费
  同优先级任务保持 FIFO 顺序
  低优先级任务不永远等待(避免饥饿)
  支持动态提升优先级(VIP 用户付费升级)

方案对比

方案 1:多 Topic(最常见,推荐)

设计:
  创建 N 个 Topic,每个对应一个优先级:
  task.priority.high    (P0 高优先级)
  task.priority.medium  (P1 中优先级)
  task.priority.low     (P2 低优先级)

消费者:
  严格消费顺序:先消费 high,high 空了才消费 medium,以此类推
  
  while True:
    # 按优先级顺序 poll
    records = consumer.poll(high_topic, timeout=0)    # 非阻塞
    if not records:
      records = consumer.poll(medium_topic, timeout=0)
    if not records:
      records = consumer.poll(low_topic, timeout=100)  # 低优先级等待 100ms
    process(records)

优点:
  简单,MQ 原生支持(Kafka/RocketMQ 都可)
  高优先级任务延迟低
  
缺点:
  低优先级任务可能饥饿(高优先级持续有消息时,低优先级永远不被消费)
  多 Topic 管理复杂度增加

饥饿防护

class PriorityConsumer:
    def __init__(self):
        self.high_consumer = create_consumer("task.priority.high")
        self.medium_consumer = create_consumer("task.priority.medium")
        self.low_consumer = create_consumer("task.priority.low")
        
        self.low_priority_count = 0  # 本轮高优先级处理的数量
        self.LOW_PRIORITY_THRESHOLD = 100  # 每处理 100 个高优先级,强制处理 1 个低优先级
    
    def poll_next(self) -> Optional[Task]:
        # 强制低优先级保底(避免饥饿)
        if self.low_priority_count >= self.LOW_PRIORITY_THRESHOLD:
            low_task = self.low_consumer.poll(timeout=0)
            if low_task:
                self.low_priority_count = 0
                return low_task
        
        # 按优先级顺序
        task = (self.high_consumer.poll(timeout=0) or 
                self.medium_consumer.poll(timeout=0) or 
                self.low_consumer.poll(timeout=100))
        
        if task and task.priority == "high":
            self.low_priority_count += 1
        
        return task

方案 2:多消费者组 + 权重(按资源分配)

设计:
  所有任务进同一 Topic(或少量 Topic)
  部署多个消费者组,资源权重不同:
  
  high_group:  10 个 Consumer(70% 的处理能力)
  medium_group:5 个 Consumer(20% 的处理能力)
  low_group:   2 个 Consumer(10% 的处理能力)
  
  消费者按任务优先级字段路由:
    接收任务 → 检查 priority 字段
    priority=HIGH → 正常处理
    priority=LOW  → 判断当前 high 队列是否积压
      有积压 → 暂停(sleep 1s 后重试)
      无积压 → 处理

适用:优先级粒度细(1-10 分),或任务来源混合进同一队列

方案 3:Redis Sorted Set(内存队列,最灵活)

import redis
import time
 
class PriorityQueue:
    def __init__(self, redis_client, name: str):
        self.redis = redis_client
        self.key = f"pq:{name}"
    
    def push(self, task_id: str, priority: int, deadline: float = None):
        """
        score = 优先级分数(越小越先消费)
        同优先级时,用 enqueue_time 保证 FIFO
        """
        # score 设计:priority * 1e13 + enqueue_timestamp(毫秒)
        # 确保高优先级(priority=0)的 score 最小(最先弹出)
        score = priority * 1e13 + time.time() * 1000
        
        # 如果有 deadline,即将过期的任务提升优先级
        if deadline and deadline < time.time() + 60:
            score = score * 0.1  # 快要过期的任务提优先级
        
        self.redis.zadd(self.key, {task_id: score})
    
    def pop(self) -> Optional[str]:
        """原子弹出最高优先级任务"""
        result = self.redis.zpopmin(self.key, count=1)
        if result:
            task_id, score = result[0]
            return task_id
        return None
    
    def upgrade_priority(self, task_id: str, new_priority: int):
        """动态提升优先级(VIP 用户付费升级)"""
        new_score = new_priority * 1e13 + time.time() * 1000
        self.redis.zadd(self.key, {task_id: new_score}, xx=True)  # xx=只更新存在的
 
# 优点:支持动态修改优先级,Redis 单线程无锁竞争
# 缺点:Redis 内存限制,不如 MQ 可靠(重启丢数据,需要持久化配合)

方案 4:数据库 + 优先级字段(低吞吐场景)

CREATE TABLE task_queue (
    id          BIGINT      PRIMARY KEY AUTO_INCREMENT,
    task_type   VARCHAR(64),
    payload     JSON,
    priority    TINYINT     DEFAULT 1,   -- 0=最高, 1=中, 2=低
    status      VARCHAR(16) DEFAULT 'PENDING',
    created_at  DATETIME,
    execute_at  DATETIME    DEFAULT NOW()
);
 
-- 创建联合索引(优先级 + 时间)
CREATE INDEX idx_priority_time ON task_queue (status, priority, execute_at);
 
-- 消费者拉取(按优先级+时间排序)
SELECT * FROM task_queue 
WHERE status='PENDING' AND execute_at <= NOW()
ORDER BY priority ASC, created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED;  -- SKIP LOCKED 防止多消费者锁竞争
 
-- 适用:任务量小(每秒几十个),无需高性能 MQ

场景选型指南

场景                          推荐方案
─────────────────────────────────────────────────────────
简单的 3 档优先级,任务量大   多 Topic(Kafka/RocketMQ)
需要动态修改优先级            Redis Sorted Set
任务量小,避免额外组件        数据库 + SKIP LOCKED
精细优先级(1-100分)         Redis Sorted Set
需要持久化保证,任务量大      多 Topic + DB 幂等

延伸追问

  • Q:Kafka 本身支持优先级队列吗? → Kafka 原生不支持消息优先级(FIFO 队列)。RocketMQ 也不支持(但支持多 Topic 模拟)。RabbitMQ 支持优先级队列(x-max-priority,1-255 级别)。实际上多 Topic 方案是 Kafka 场景下最常见的选择。
  • Q:动态提升优先级有什么业务场景? → 1) VIP 用户购买了”加速服务”→ 任务优先级从 2 提升到 0;2) 任务即将超过 SLA 时间 → 自动提升优先级(避免违约);3) 运营紧急推送 → 手动将某批任务提为最高优先级。Redis Sorted Set 可以 O(log n) 修改某个任务的 score,支持实时提权。

我的记法

  • 多 Topic:每个优先级一个 Topic,消费者按序 poll,最简单
  • 饥饿防护:每处理 N 个高优先级,强制处理 1 个低优先级
  • Redis Sorted Set:score = priority × 1e13 + timestamp(支持动态修改)
  • DB SKIP LOCKED:低吞吐场景,简单无额外组件
  • 一句话:「高吞吐用多 Topic 按序 poll,需动态改优先级用 Redis ZSet」

状态

  • 已背速记
  • 能说多 Topic 方案的饥饿防护
  • 能写 Redis ZSet 的优先级 score 设计