设计优先级任务队列
一句话速记
优先级队列核心问题:高优先级任务先执行,同优先级 FIFO,低优先级任务不饿死。实现方案:多优先级独立 Topic(最简单,N 个 Topic 对应 N 个优先级,消费者按优先级顺序 poll);或单 Topic + 优先级字段(消费者排序,适合优先级粒度细的场景);复杂场景用多消费者组 + 权重分配(高优先级消费者多,低优先级消费者少)。
系统需求
场景举例:
AI 平台的任务调度:VIP 用户的推理任务 > 普通用户 > 批量任务
风控系统:实时风控决策 > 异步策略更新 > 历史数据回刷
数据管道:核心数据处理 > 非核心数据 > 冷数据归档
需求:
任务有优先级(如 P0/P1/P2,或 1-10 分)
高优先级任务比低优先级任务先消费
同优先级任务保持 FIFO 顺序
低优先级任务不永远等待(避免饥饿)
支持动态提升优先级(VIP 用户付费升级)
方案对比
方案 1:多 Topic(最常见,推荐)
设计:
创建 N 个 Topic,每个对应一个优先级:
task.priority.high (P0 高优先级)
task.priority.medium (P1 中优先级)
task.priority.low (P2 低优先级)
消费者:
严格消费顺序:先消费 high,high 空了才消费 medium,以此类推
while True:
# 按优先级顺序 poll
records = consumer.poll(high_topic, timeout=0) # 非阻塞
if not records:
records = consumer.poll(medium_topic, timeout=0)
if not records:
records = consumer.poll(low_topic, timeout=100) # 低优先级等待 100ms
process(records)
优点:
简单,MQ 原生支持(Kafka/RocketMQ 都可)
高优先级任务延迟低
缺点:
低优先级任务可能饥饿(高优先级持续有消息时,低优先级永远不被消费)
多 Topic 管理复杂度增加
饥饿防护:
class PriorityConsumer:
def __init__(self):
self.high_consumer = create_consumer("task.priority.high")
self.medium_consumer = create_consumer("task.priority.medium")
self.low_consumer = create_consumer("task.priority.low")
self.low_priority_count = 0 # 本轮高优先级处理的数量
self.LOW_PRIORITY_THRESHOLD = 100 # 每处理 100 个高优先级,强制处理 1 个低优先级
def poll_next(self) -> Optional[Task]:
# 强制低优先级保底(避免饥饿)
if self.low_priority_count >= self.LOW_PRIORITY_THRESHOLD:
low_task = self.low_consumer.poll(timeout=0)
if low_task:
self.low_priority_count = 0
return low_task
# 按优先级顺序
task = (self.high_consumer.poll(timeout=0) or
self.medium_consumer.poll(timeout=0) or
self.low_consumer.poll(timeout=100))
if task and task.priority == "high":
self.low_priority_count += 1
return task方案 2:多消费者组 + 权重(按资源分配)
设计:
所有任务进同一 Topic(或少量 Topic)
部署多个消费者组,资源权重不同:
high_group: 10 个 Consumer(70% 的处理能力)
medium_group:5 个 Consumer(20% 的处理能力)
low_group: 2 个 Consumer(10% 的处理能力)
消费者按任务优先级字段路由:
接收任务 → 检查 priority 字段
priority=HIGH → 正常处理
priority=LOW → 判断当前 high 队列是否积压
有积压 → 暂停(sleep 1s 后重试)
无积压 → 处理
适用:优先级粒度细(1-10 分),或任务来源混合进同一队列
方案 3:Redis Sorted Set(内存队列,最灵活)
import redis
import time
class PriorityQueue:
def __init__(self, redis_client, name: str):
self.redis = redis_client
self.key = f"pq:{name}"
def push(self, task_id: str, priority: int, deadline: float = None):
"""
score = 优先级分数(越小越先消费)
同优先级时,用 enqueue_time 保证 FIFO
"""
# score 设计:priority * 1e13 + enqueue_timestamp(毫秒)
# 确保高优先级(priority=0)的 score 最小(最先弹出)
score = priority * 1e13 + time.time() * 1000
# 如果有 deadline,即将过期的任务提升优先级
if deadline and deadline < time.time() + 60:
score = score * 0.1 # 快要过期的任务提优先级
self.redis.zadd(self.key, {task_id: score})
def pop(self) -> Optional[str]:
"""原子弹出最高优先级任务"""
result = self.redis.zpopmin(self.key, count=1)
if result:
task_id, score = result[0]
return task_id
return None
def upgrade_priority(self, task_id: str, new_priority: int):
"""动态提升优先级(VIP 用户付费升级)"""
new_score = new_priority * 1e13 + time.time() * 1000
self.redis.zadd(self.key, {task_id: new_score}, xx=True) # xx=只更新存在的
# 优点:支持动态修改优先级,Redis 单线程无锁竞争
# 缺点:Redis 内存限制,不如 MQ 可靠(重启丢数据,需要持久化配合)方案 4:数据库 + 优先级字段(低吞吐场景)
CREATE TABLE task_queue (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
task_type VARCHAR(64),
payload JSON,
priority TINYINT DEFAULT 1, -- 0=最高, 1=中, 2=低
status VARCHAR(16) DEFAULT 'PENDING',
created_at DATETIME,
execute_at DATETIME DEFAULT NOW()
);
-- 创建联合索引(优先级 + 时间)
CREATE INDEX idx_priority_time ON task_queue (status, priority, execute_at);
-- 消费者拉取(按优先级+时间排序)
SELECT * FROM task_queue
WHERE status='PENDING' AND execute_at <= NOW()
ORDER BY priority ASC, created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED; -- SKIP LOCKED 防止多消费者锁竞争
-- 适用:任务量小(每秒几十个),无需高性能 MQ场景选型指南
场景 推荐方案
─────────────────────────────────────────────────────────
简单的 3 档优先级,任务量大 多 Topic(Kafka/RocketMQ)
需要动态修改优先级 Redis Sorted Set
任务量小,避免额外组件 数据库 + SKIP LOCKED
精细优先级(1-100分) Redis Sorted Set
需要持久化保证,任务量大 多 Topic + DB 幂等
延伸追问
- Q:Kafka 本身支持优先级队列吗?
→ Kafka 原生不支持消息优先级(FIFO 队列)。RocketMQ 也不支持(但支持多 Topic 模拟)。RabbitMQ 支持优先级队列(
x-max-priority,1-255 级别)。实际上多 Topic 方案是 Kafka 场景下最常见的选择。 - Q:动态提升优先级有什么业务场景? → 1) VIP 用户购买了”加速服务”→ 任务优先级从 2 提升到 0;2) 任务即将超过 SLA 时间 → 自动提升优先级(避免违约);3) 运营紧急推送 → 手动将某批任务提为最高优先级。Redis Sorted Set 可以 O(log n) 修改某个任务的 score,支持实时提权。
我的记法
- 多 Topic:每个优先级一个 Topic,消费者按序 poll,最简单
- 饥饿防护:每处理 N 个高优先级,强制处理 1 个低优先级
- Redis Sorted Set:score = priority × 1e13 + timestamp(支持动态修改)
- DB SKIP LOCKED:低吞吐场景,简单无额外组件
- 一句话:「高吞吐用多 Topic 按序 poll,需动态改优先级用 Redis ZSet」
状态
- 已背速记
- 能说多 Topic 方案的饥饿防护
- 能写 Redis ZSet 的优先级 score 设计