Celery / gevent 在异步任务与 I/O 密集服务中的常见用法
一句话速记
Celery = 分布式任务队列,把耗时任务(AI 推理、邮件发送、批量处理)异步化,通过 Redis/RabbitMQ 作 broker,默认多进程 Worker 执行(CPU 密集友好);gevent = 协程库,通过 monkey-patch 把同步 Python 代码(requests、psycopg2)变成非阻塞协程,让 Flask/Django 服务不改代码就支持高并发(I/O 密集友好)。两者定位不同:Celery 管理任务调度,gevent 改变I/O 执行模型。
关键细节
1)Celery 核心概念
组件:
Broker(消息中间件):Redis / RabbitMQ
- 存放任务队列(task queue)
- Producer(Web 应用)把任务推进来
- Consumer(Worker)从队列取任务执行
Worker:执行任务的进程
- 默认:prefork(每个任务一个进程,CPU 密集型)
- 可选:gevent/eventlet(每个任务一个协程,I/O 密集型)
- 可选:solo(单线程,调试用)
Result Backend(结果存储):Redis / 数据库
- 存放任务执行结果(可选)
- 主程序通过 task.get() 查询结果
2)Celery 典型用法
# tasks.py
from celery import Celery
import time
app = Celery(
"myapp",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1"
)
# 定义任务
@app.task(bind=True, max_retries=3)
def process_document(self, doc_id: str, model: str = "gpt-4"):
"""处理文档(耗时 AI 任务)"""
try:
doc = fetch_document(doc_id)
result = call_llm(doc.content, model)
save_result(doc_id, result)
return {"doc_id": doc_id, "status": "done"}
except Exception as exc:
# 失败自动重试(指数退避)
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
# 在 Web 服务里调用(异步,不等结果):
@app.post("/process")
async def process_endpoint(doc_id: str):
task = process_document.delay(doc_id) # 推入队列,立即返回
return {"task_id": task.id, "status": "queued"}
# 查询结果:
@app.get("/task/{task_id}")
async def get_task_result(task_id: str):
task = process_document.AsyncResult(task_id)
return {
"status": task.status, # PENDING / STARTED / SUCCESS / FAILURE
"result": task.result if task.ready() else None
}Worker 启动:
# 启动 Worker(默认 prefork,CPU 核数个进程)
celery -A tasks worker --loglevel=info
# I/O 密集任务:gevent 模式(单进程,多协程)
celery -A tasks worker --pool=gevent --concurrency=100
# 定时任务(Celery Beat)
celery -A tasks beat --loglevel=info3)Celery 在 AI 场景的应用
# AI 推理队列化(避免 API 超时)
@app.task(queue="inference", time_limit=120)
def run_inference(request_id: str, prompt: str, model_config: dict):
"""
GPU 推理任务队列化:
- 用户请求 → API 立即返回 task_id
- 后台 Worker 调用 GPU 推理(可能几十秒)
- 前端轮询 /task/{task_id} 查询状态
"""
model = load_model(model_config)
result = model.generate(prompt)
return result
# Celery 优先级队列
@app.task(queue="high_priority")
def premium_inference(prompt): ...
@app.task(queue="default")
def standard_inference(prompt): ...
# Worker 配置优先级:
# celery -A tasks worker -Q high_priority,default --concurrency=44)gevent:monkey-patch 魔法
# gevent 的核心:在程序启动时 monkey-patch 标准库
# 把同步 socket 操作替换为非阻塞协程版本
import gevent.monkey
gevent.monkey.patch_all() # 必须在所有 import 之前!
# patch_all() 做了什么:
# - socket → gevent socket(非阻塞)
# - threading → gevent greenlet(协程)
# - time.sleep → gevent.sleep(协程让出)
# - ssl, select, os.fork 等也被 patch
# patch 之后,原本阻塞的代码变成协程:
import requests # import 在 patch 后,requests 的 socket 已被替换
def fetch(url):
return requests.get(url) # 看起来是同步,实际是协程
# 可以并发运行:
from gevent import spawn, joinall
jobs = [spawn(fetch, url) for url in urls] # 创建 100 个协程
joinall(jobs) # 并发等待所有完成(事实上是 gevent 事件循环调度)gevent + Flask/Gunicorn(常见生产配置):
# 用 gevent worker 运行 Flask
gunicorn -w 4 -k gevent app:app
# -w 4:4 个进程(根据 CPU 核数)
# -k gevent:每个进程用 gevent 协程处理请求
# 相当于:4 进程 × N 协程(并发)= 高吞吐量# gunicorn.conf.py
workers = 4
worker_class = "gevent"
worker_connections = 1000 # 每个 worker 最多 1000 个并发协程5)gevent vs asyncio 的核心区别
维度 gevent asyncio
────────────────────────────────────────────────────────────
并发模型 协作式协程(greenlet) 协作式协程(native coroutine)
代码改动 零改动(monkey-patch) 需要 async/await 重写
Python 版本 Python 2/3 Python 3.4+(3.7+ 推荐)
框架支持 Flask/Django(同步框架) FastAPI/Starlette(async框架)
性能 略低(greenlet 开销) 略高(native coroutine)
调试难度 高(隐式切换,难 trace) 较低(显式 await)
AI 友好度 中(旧项目迁移) 高(新项目首选)
6)什么时候选 Celery,什么时候选 asyncio
用 Celery:
✅ 任务需要排队(不能立即处理)
✅ 任务需要重试、持久化
✅ 需要跨服务、跨机器分发任务
✅ 耗时任务(>1s)不想阻塞 HTTP 响应
✅ 定时任务(Celery Beat)
示例:AI 批量推理、报告生成、邮件发送
用 asyncio(FastAPI + aiohttp):
✅ 单次请求内需要并发调用多个 API
✅ 实时流式响应(SSE/WebSocket)
✅ 高并发 I/O 密集服务
示例:LLM API 代理、实时推理服务、聊天机器人
两者结合(常见架构):
FastAPI(接受请求)→ Celery(任务队列)→ GPU Worker(执行推理)→ Redis(结果缓存)
延伸追问
- Q:Celery 的 prefork 和 gevent 模式怎么选? → 任务是CPU 密集型(压缩、哈希计算、纯 Python 计算)→ prefork(进程级隔离,真并行);任务是I/O 密集型(调用外部 API、等待数据库)→ gevent(每个进程管理 N 个协程,节省内存,减少切换)。AI 推理(GPU)特殊:GPU 计算在 C 层,用 prefork(进程隔离,GPU 显存隔离更安全)。
- Q:Celery 的 task 里能用 async/await 吗?
→ 不直接支持(Celery worker 是同步的)。可以用
asyncio.run(async_func())包装,或用第三方库celery-pool-asyncio。但推荐分层:async 层处理协议,Celery 处理任务调度,task 函数保持同步。 - Q:gevent monkey-patch 和 Celery 能一起用吗?
→ 可以,
celery -A tasks worker --pool=gevent就是这样。但monkey.patch_all()必须在 Celery 导入前调用,否则部分 patch 失效。注意 gevent 的 patch 可能与某些 C 扩展(如 numpy fork 安全问题)冲突。
我的记法
- Celery = 任务队列(异步、持久化、分布式),broker=Redis,worker=进程/协程
- gevent = monkey-patch 让同步代码变协程,零改动适配旧框架
- AI 场景:FastAPI 接请求 → Celery 任务队列 → GPU Worker 执行
- prefork(CPU 密集)vs gevent(I/O 密集)选 pool 类型
- 一句话:「Celery 管任务的去哪跑,gevent 管代码怎么跑,两者可以叠用」
状态
- 已背速记
- 能写 Celery 任务 + delay 调用 + 结果查询
- 能解释 gevent monkey-patch 原理