怎么发现和定位这种”隐形阻塞”

一句话速记

隐形阻塞 = 在 async 函数里调用了同步阻塞操作(requests、time.sleep、同步 ORM、重计算),事件循环被卡住但没有错误信息。定位方法:① PYTHONASYNCIODEBUG=1 开启调试(自动报告慢回调);② 代码搜索常见同步调用③ 在关键路径加 timing 日志④ APM 工具(Datadog、Sentry)的异步 trace。修复:换 async 库或 asyncio.to_thread 包装。

关键细节

1)工具箱:5 种定位方法

方法 1:asyncio 调试模式(开发环境首选)

# 方式 A:环境变量
PYTHONASYNCIODEBUG=1 python app.py
 
# 方式 B:Python 代码
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.05  # 超过 50ms 打印告警
 
# 输出示例:
# WARNING:asyncio:Executing <Task...> took 0.532 seconds
# Traceback of where task was scheduled:
#   File "app.py", line 23, in main
#     result = await handler(request)

方法 2:代码搜索(最快,CI 里加规则)

# 搜索项目里的同步阻塞调用
rg "requests\.(get|post|put|delete|patch|head)" --type py
rg "import requests" --type py
rg "time\.sleep\b" --type py            # 而非 asyncio.sleep
rg "open\(" --type py | grep -v aiofiles # 同步文件读写
rg "\.execute\b" --type py              # 同步 DB execute(sqlalchemy)
rg "\.query\b" --type py                # sqlalchemy 同步查询
 
# 在 async def 函数里出现这些,就是嫌疑

方法 3:自定义中间件 + 日志

import time
import asyncio
from fastapi import Request
 
@app.middleware("http")
async def detect_slow_handlers(request: Request, call_next):
    start = time.monotonic()
    
    # 检测事件循环是否被阻塞:记录开始时的循环迭代次数
    loop = asyncio.get_event_loop()
    
    response = await call_next(request)
    elapsed = time.monotonic() - start
    
    if elapsed > 0.1:  # 100ms 阈值
        # 注意:这个时间包含了 await(正常等待),要结合其他信息判断
        logger.warning(
            f"SLOW_ENDPOINT path={request.url.path} "
            f"elapsed={elapsed:.3f}s"
        )
    return response

方法 4:blocking-detector / watchdog

# pip install anyio[asyncio]  (包含调试工具)
# 或使用 asyncio-watchdog
 
# 原理:后台线程每 N ms 检查事件循环是否还在运转
# 如果事件循环没有在预期时间内 tick,说明被阻塞了
import asyncio
import threading
import time
 
class EventLoopWatchdog(threading.Thread):
    """后台线程监控事件循环健康状态"""
    def __init__(self, loop, timeout=0.1):
        super().__init__(daemon=True)
        self.loop = loop
        self.timeout = timeout
        self._last_tick = time.monotonic()
    
    def run(self):
        while True:
            time.sleep(self.timeout)
            elapsed = time.monotonic() - self._last_tick
            if elapsed > self.timeout * 3:
                print(f"⚠️ 事件循环被阻塞!{elapsed:.2f}s 未 tick")
    
    async def tick(self):
        """在事件循环里周期性更新时间戳"""
        while True:
            self._last_tick = time.monotonic()
            await asyncio.sleep(0.01)  # 每 10ms tick 一次

方法 5:APM / Profiling 工具

# py-spy:无侵入性采样 profiler(生产可用)
pip install py-spy
py-spy top --pid <PID>               # 实时看哪些函数在占 CPU
py-spy record -o profile.svg --pid <PID>  # 生成火焰图
 
# pyinstrument:asyncio 友好的 profiler
pip install pyinstrument
python -m pyinstrument app.py
 
# Austin:C 扩展级别的采样
pip install austin-python
austin -p <PID>

2)常见隐形阻塞清单及修复

阻塞来源                    检测方式              修复方案
──────────────────────────────────────────────────────────────────
requests.get/post           代码搜索              → aiohttp 或 httpx.AsyncClient
time.sleep()                代码搜索              → await asyncio.sleep()
open() 文件读写              代码搜索              → aiofiles.open()
SQLAlchemy 同步查询          代码搜索              → SQLAlchemy async
pymysql/psycopg2             代码搜索              → asyncpg / aiomysql
redis-py 同步                代码搜索              → aioredis / redis.asyncio
Celery.apply()同步等结果    代码搜索              → Celery.apply_async + callback
纯 Python 重计算             py-spy 火焰图         → asyncio.to_thread()
numpy/torch(极大矩阵)      py-spy 火焰图         → to_thread 或独立进程
boto3 S3 操作                代码搜索              → aioboto3
subprocess.run()            代码搜索              → asyncio.create_subprocess_exec

3)生产环境定位策略

Step 1:监控告警(P99 延迟突增,CPU 使用率低但吞吐量低)
  ↓
Step 2:PYTHONASYNCIODEBUG + slow_callback_duration 在 staging 复现
  ↓
Step 3:py-spy 采样,看 CPU 时间花在哪里
  ↓
Step 4:定位到具体函数,搜索同步阻塞调用
  ↓
Step 5:换 async 库或 to_thread 包装,验证 P99 恢复

4)特别注意:logging 可能是阻塞源

# 同步日志(写磁盘/网络)可能是 async 服务的隐形阻塞
import logging
 
# ❌ 在 async 路由里大量同步写日志
@app.get("/")
async def handler():
    logging.info("request started")  # 如果 handler 是 RotatingFileHandler,可能阻塞
    ...
 
# ✅ 使用 QueueHandler(异步日志)
import logging.handlers, queue
 
log_queue = queue.Queue(-1)
queue_handler = logging.handlers.QueueHandler(log_queue)
listener = logging.handlers.QueueListener(
    log_queue, logging.FileHandler("app.log")
)
listener.start()  # 后台线程写日志
 
# 或用 structlog 的 async-safe 配置

延伸追问

  • Q:为什么 PYTHONASYNCIODEBUG=1 只在开发环境用? → 调试模式会记录每个协程的调用栈(traceback),大量 Task 时内存和性能开销显著;slow_callback_duration 的检查也有额外开销。生产环境用 py-spy 或 APM 代替,按需采样。
  • Q:asyncio.to_threadloop.run_in_executor 的区别?asyncio.to_thread 是 Python 3.9+ 的高级封装,底层调用 loop.run_in_executor(None, func, *args),使用默认的 ThreadPoolExecutorrun_in_executor 允许自定义线程池(限制并发数、自定义线程名等)。生产服务推荐自定义线程池:executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix='io')
  • Q:如果整个项目都是同步的(Flask + SQLAlchemy),迁移到 async 值得吗? → 看场景:如果服务是 CPU 密集型或每个请求处理时间很短(< 10ms),asyncio 收益有限;如果服务有大量外部 API 调用、数据库等待(高 I/O 比),asyncio 可显著提升吞吐量。迁移成本高,可以考虑先 gevent.monkey.patch_all()(用协程 patch 同步库,无需改代码),然后逐步迁移关键路径。

我的记法

  • 调试PYTHONASYNCIODEBUG=1 + slow_callback_duration=0.05
  • 代码搜索rg "requests\." --type py,找 async def 里的同步调用
  • 运行时py-spy top --pid 看 CPU 火焰图
  • 修复链:能换 async 库就换,不能就 to_thread 包装
  • 日志也可能是隐形阻塞——生产用 QueueHandler
  • 一句话:「先 asyncio debug 看告警,再 rg 搜代码,再 py-spy 看火焰图」

状态

  • 已背速记(5 种方法)
  • 能写 middleware 检测慢接口
  • 能说常见阻塞来源和修复方案