您的位置:首页 >如何在 Python/FastAPI 中监控事件循环中所有待执行的异步任务数量
发布于2026-05-03 阅读(0)
扫一扫,手机访问
本文详解如何使用 asyncio.all_tasks() 获取当前事件循环中所有未完成(pending 或 running)的 task 对象,并通过实际代码演示任务统计、日志记录与堆栈分析,助力 fastapi 应用性能排查。
在构建高并发 FastAPI 服务时,你是否遇到过响应莫名变慢,或者资源消耗悄然攀升的情况?很多时候,问题的早期信号就藏在异步任务的积压里。不过,这里有个关键点需要先厘清:await 这个操作本身并不会自动创建新任务。它仅仅是挂起当前的协程,等待被 await 的对象(比如一次 asyncio.sleep()、一个 HTTP 请求或数据库查询)完成。真正向事件循环提交独立、可调度任务的,是像 asyncio.create_task()、asyncio.ensure_future() 或者 FastAPI 内置的 BackgroundTasks 这类显式调度行为。所以,我们所说的“待执行任务数”,实际上指的是所有已经提交给事件循环、但尚未完成(即 .done() 返回 False)的 Task 实例总数,这里面包括了正在运行的、暂停等待的、阻塞于 I/O 的,以及还在队列里排队的任务。

最直接、最核心的方法就是调用 asyncio.all_tasks()。它会返回一个集合(set),里面包含了当前运行的事件循环中所有未完成的 Task 对象。统计一下长度,数量就出来了:
import asyncio
# 在任意协程内(如 FastAPI 路由处理函数中)
async def monitor_tasks():
pending_tasks = asyncio.all_tasks()
count = len(pending_tasks)
print(f"当前待执行/运行中的任务总数: {count}")
return count
⚠️ 注意:
asyncio.all_tasks()默认会使用asyncio.get_running_loop()来获取当前的事件循环,通常不需要手动传入 loop 参数。除非你在显式管理多个事件循环,但这在标准的 FastAPI 应用场景中非常少见。
光知道总数,有时候还不够。如果想知道哪些任务还在活跃,甚至想揪出可能卡住的任务,就需要更进一步的分析。下面的代码展示了如何过滤状态、记录日志,甚至打印调用堆栈:
立即学习“Python免费学习笔记(深入)”;
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
async def diagnose_task_backlog():
tasks = asyncio.all_tasks()
# 1. 统计:总任务数 vs 活跃任务数(未完成)
total = len(tasks)
active = len([t for t in tasks if not t.done()])
print(f"[监控] 总任务: {total}, 活跃中: {active}")
# 2. 日志所有任务基本信息(推荐用于 Prometheus / Grafana 集成)
for task in tasks:
logging.debug(
f"Task {task.get_name() or 'unnamed'} | "
f"State: {'DONE' if task.done() else 'PENDING'} | "
f"Coro: {task.get_coro().__qualname__ if task.get_coro() else 'N/A'}"
)
# 3. (谨慎使用)打印卡住任务的完整调用栈(定位阻塞点)
for task in tasks:
if not task.done() and task._coro.cr_await is None: # 粗略判断“疑似卡住”
print(f"\n⚠️ 可能卡住的任务 {task.get_name()} 堆栈:")
task.print_stack(limit=10)
# 在 FastAPI 中作为依赖或中间件调用示例
from fastapi import Depends, APIRouter
router = APIRouter()
@router.get("/health/tasks")
async def get_task_status():
await diagnose_task_backlog()
return {"status": "ok"}
使用这些方法时,有几个细节必须留意,否则可能会得到误导性的结果:
create_task()、ensure_future() 或框架(如 FastAPI)显式调度了的协程才会被计入。直接 await coro() 不会新增任务计数。all_tasks() 返回的是一个瞬时快照。在你遍历集合的时候,有些任务可能已经完成了。因此,建议在关键路径(比如请求的入口或出口)或者定期的健康检查中调用它,数据才更有参考价值。print_stack),可以结合 logging.debug 和条件采样来降低开销;asyncio.current_task().get_name() 可以获取当前任务名,创建时也可以指定:task = asyncio.create_task(some_coro(), name="db-query-user-profile")
len(asyncio.all_tasks()) 就是开销最小的方式。不必要的遍历操作能免则免。说到底,掌握 asyncio.all_tasks() 是构建异步应用可观测性的基础。它本身并不直接解决性能瓶颈,但却能为你提供第一手的证据,精准地告诉你“到底是谁在占用事件循环”。当你发现 pending_tasks 的数量持续增长且迟迟无法回落时,这就是一个明确的信号,提醒你应该深入对应的协程,去分析其 I/O 效率、是否存在锁竞争,或者是否混入了 CPU 密集型的操作。这才是优化 FastAPI 服务吞吐量和延迟表现的正确路径。
售后无忧
立即购买>office旗舰店
售后无忧
立即购买>office旗舰店
售后无忧
立即购买>office旗舰店
售后无忧
立即购买>office旗舰店
正版软件
正版软件
正版软件
正版软件
正版软件
1
2
3
7
9