怎么发现和定位这种”隐形阻塞”
一句话速记
隐形阻塞 = 在 async 函数里调用了同步阻塞操作(requests、time.sleep、同步 ORM、重计算),事件循环被卡住但没有错误信息。定位方法:① PYTHONASYNCIODEBUG=1 开启调试(自动报告慢回调);② 代码搜索常见同步调用;③ 在关键路径加 timing 日志;④ APM 工具(Datadog、Sentry)的异步 trace。修复:换 async 库或 asyncio.to_thread 包装。
关键细节
1)工具箱:5 种定位方法
方法 1:asyncio 调试模式(开发环境首选)
# 方式 A:环境变量
PYTHONASYNCIODEBUG=1 python app.py
# 方式 B:Python 代码
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.05 # 超过 50ms 打印告警
# 输出示例:
# WARNING:asyncio:Executing <Task...> took 0.532 seconds
# Traceback of where task was scheduled:
# File "app.py", line 23, in main
# result = await handler(request)方法 2:代码搜索(最快,CI 里加规则)
# 搜索项目里的同步阻塞调用
rg "requests\.(get|post|put|delete|patch|head)" --type py
rg "import requests" --type py
rg "time\.sleep\b" --type py # 而非 asyncio.sleep
rg "open\(" --type py | grep -v aiofiles # 同步文件读写
rg "\.execute\b" --type py # 同步 DB execute(sqlalchemy)
rg "\.query\b" --type py # sqlalchemy 同步查询
# 在 async def 函数里出现这些,就是嫌疑方法 3:自定义中间件 + 日志
import time
import asyncio
from fastapi import Request
@app.middleware("http")
async def detect_slow_handlers(request: Request, call_next):
start = time.monotonic()
# 检测事件循环是否被阻塞:记录开始时的循环迭代次数
loop = asyncio.get_event_loop()
response = await call_next(request)
elapsed = time.monotonic() - start
if elapsed > 0.1: # 100ms 阈值
# 注意:这个时间包含了 await(正常等待),要结合其他信息判断
logger.warning(
f"SLOW_ENDPOINT path={request.url.path} "
f"elapsed={elapsed:.3f}s"
)
return response方法 4:blocking-detector / watchdog
# pip install anyio[asyncio] (包含调试工具)
# 或使用 asyncio-watchdog
# 原理:后台线程每 N ms 检查事件循环是否还在运转
# 如果事件循环没有在预期时间内 tick,说明被阻塞了
import asyncio
import threading
import time
class EventLoopWatchdog(threading.Thread):
"""后台线程监控事件循环健康状态"""
def __init__(self, loop, timeout=0.1):
super().__init__(daemon=True)
self.loop = loop
self.timeout = timeout
self._last_tick = time.monotonic()
def run(self):
while True:
time.sleep(self.timeout)
elapsed = time.monotonic() - self._last_tick
if elapsed > self.timeout * 3:
print(f"⚠️ 事件循环被阻塞!{elapsed:.2f}s 未 tick")
async def tick(self):
"""在事件循环里周期性更新时间戳"""
while True:
self._last_tick = time.monotonic()
await asyncio.sleep(0.01) # 每 10ms tick 一次方法 5:APM / Profiling 工具
# py-spy:无侵入性采样 profiler(生产可用)
pip install py-spy
py-spy top --pid <PID> # 实时看哪些函数在占 CPU
py-spy record -o profile.svg --pid <PID> # 生成火焰图
# pyinstrument:asyncio 友好的 profiler
pip install pyinstrument
python -m pyinstrument app.py
# Austin:C 扩展级别的采样
pip install austin-python
austin -p <PID>2)常见隐形阻塞清单及修复
阻塞来源 检测方式 修复方案
──────────────────────────────────────────────────────────────────
requests.get/post 代码搜索 → aiohttp 或 httpx.AsyncClient
time.sleep() 代码搜索 → await asyncio.sleep()
open() 文件读写 代码搜索 → aiofiles.open()
SQLAlchemy 同步查询 代码搜索 → SQLAlchemy async
pymysql/psycopg2 代码搜索 → asyncpg / aiomysql
redis-py 同步 代码搜索 → aioredis / redis.asyncio
Celery.apply()同步等结果 代码搜索 → Celery.apply_async + callback
纯 Python 重计算 py-spy 火焰图 → asyncio.to_thread()
numpy/torch(极大矩阵) py-spy 火焰图 → to_thread 或独立进程
boto3 S3 操作 代码搜索 → aioboto3
subprocess.run() 代码搜索 → asyncio.create_subprocess_exec
3)生产环境定位策略
Step 1:监控告警(P99 延迟突增,CPU 使用率低但吞吐量低)
↓
Step 2:PYTHONASYNCIODEBUG + slow_callback_duration 在 staging 复现
↓
Step 3:py-spy 采样,看 CPU 时间花在哪里
↓
Step 4:定位到具体函数,搜索同步阻塞调用
↓
Step 5:换 async 库或 to_thread 包装,验证 P99 恢复
4)特别注意:logging 可能是阻塞源
# 同步日志(写磁盘/网络)可能是 async 服务的隐形阻塞
import logging
# ❌ 在 async 路由里大量同步写日志
@app.get("/")
async def handler():
logging.info("request started") # 如果 handler 是 RotatingFileHandler,可能阻塞
...
# ✅ 使用 QueueHandler(异步日志)
import logging.handlers, queue
log_queue = queue.Queue(-1)
queue_handler = logging.handlers.QueueHandler(log_queue)
listener = logging.handlers.QueueListener(
log_queue, logging.FileHandler("app.log")
)
listener.start() # 后台线程写日志
# 或用 structlog 的 async-safe 配置延伸追问
- Q:为什么
PYTHONASYNCIODEBUG=1只在开发环境用? → 调试模式会记录每个协程的调用栈(traceback),大量 Task 时内存和性能开销显著;slow_callback_duration的检查也有额外开销。生产环境用 py-spy 或 APM 代替,按需采样。 - Q:
asyncio.to_thread和loop.run_in_executor的区别? →asyncio.to_thread是 Python 3.9+ 的高级封装,底层调用loop.run_in_executor(None, func, *args),使用默认的ThreadPoolExecutor;run_in_executor允许自定义线程池(限制并发数、自定义线程名等)。生产服务推荐自定义线程池:executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix='io')。 - Q:如果整个项目都是同步的(Flask + SQLAlchemy),迁移到 async 值得吗?
→ 看场景:如果服务是 CPU 密集型或每个请求处理时间很短(< 10ms),asyncio 收益有限;如果服务有大量外部 API 调用、数据库等待(高 I/O 比),asyncio 可显著提升吞吐量。迁移成本高,可以考虑先
gevent.monkey.patch_all()(用协程 patch 同步库,无需改代码),然后逐步迁移关键路径。
我的记法
- 调试:
PYTHONASYNCIODEBUG=1+slow_callback_duration=0.05 - 代码搜索:
rg "requests\." --type py,找 async def 里的同步调用 - 运行时:
py-spy top --pid看 CPU 火焰图 - 修复链:能换 async 库就换,不能就
to_thread包装 - 日志也可能是隐形阻塞——生产用
QueueHandler - 一句话:「先 asyncio debug 看告警,再 rg 搜代码,再 py-spy 看火焰图」
状态
- 已背速记(5 种方法)
- 能写 middleware 检测慢接口
- 能说常见阻塞来源和修复方案