asyncio 深入与实战模式
掌握 asyncio 的高级用法——任务编排、超时控制、信号量限流,构建高性能异步应用。
asyncio 核心架构
graph TD
LOOP[事件循环] --> CORO[协程 coroutine]
LOOP --> TASK[任务 Task]
LOOP --> FUT[Future]
CORO --> AWAIT[await 挂起]
CORO --> YIELD[释放控制权]
TASK --> GATHER["gather() 并发"]
TASK --> GROUP["TaskGroup 结构化"]
TASK --> WAIT["wait() 条件等待"]
FUT --> CB[回调 callback]
FUT --> RES[结果 result]
style LOOP fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style TASK fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
任务编排
"""
asyncio 任务编排模式
"""
import asyncio
from dataclasses import dataclass
@dataclass
class APIResponse:
url: str
status: int
data: str
async def fetch(url: str) -> APIResponse:
"""模拟 API 请求"""
await asyncio.sleep(0.5) # 模拟网络延迟
return APIResponse(url=url, status=200, data=f"data from {url}")
# === gather:并发等待所有任务 ===
async def fetch_all():
urls = [f"https://api.example.com/item/{i}" for i in range(5)]
results = await asyncio.gather(*[fetch(url) for url in urls])
for r in results:
print(f"{r.url} → {r.status}")
return results
# === TaskGroup(Python 3.11+,推荐)===
async def fetch_with_group():
results = []
async with asyncio.TaskGroup() as tg:
for i in range(5):
task = tg.create_task(fetch(f"https://api.example.com/{i}"))
results.append(task)
# TaskGroup 退出时所有任务已完成
return [t.result() for t in results]
# === as_completed:先完成先处理 ===
async def stream_results():
tasks = [fetch(f"https://api.example.com/{i}") for i in range(5)]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result.url}") # 谁先完成谁先输出
asyncio.run(fetch_all())
超时与取消
"""
超时控制与任务取消
"""
import asyncio
async def slow_operation():
"""模拟耗时操作"""
await asyncio.sleep(10)
return "done"
# === timeout 超时 ===
async def with_timeout():
try:
# Python 3.11+
async with asyncio.timeout(2.0):
result = await slow_operation()
except TimeoutError:
print("操作超时!")
# 兼容写法
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("操作超时!")
# === 任务取消 ===
async def cancellable_task():
try:
while True:
print("工作中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("任务被取消,正在清理...")
# 在这里做清理工作
raise # 重新抛出,让调用方知道
async def cancel_demo():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("主程序:任务已取消")
asyncio.run(with_timeout())
信号量与限流
"""
控制并发数量:信号量
"""
import asyncio
from dataclasses import dataclass, field
@dataclass
class RateLimiter:
"""速率限制器"""
max_concurrent: int = 10
_semaphore: asyncio.Semaphore = field(init=False)
def __post_init__(self):
self._semaphore = asyncio.Semaphore(self.max_concurrent)
async def execute(self, coro):
async with self._semaphore:
return await coro
# 限制同时只有 5 个请求
limiter = RateLimiter(max_concurrent=5)
async def api_call(url: str) -> str:
"""模拟 API 调用"""
await asyncio.sleep(1)
return f"result from {url}"
async def rate_limited_fetch():
urls = [f"https://api.example.com/{i}" for i in range(20)]
tasks = [limiter.execute(api_call(url)) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
# === 异步生产者-消费者 ===
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
print(f"生产: {item}")
# 发送结束信号
await queue.put(None)
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # 通知其他消费者
break
print(f"[{name}] 消费: {item}")
await asyncio.sleep(0.5) # 模拟处理
queue.task_done()
async def producer_consumer_demo():
queue: asyncio.Queue = asyncio.Queue(maxsize=5)
items = [f"task_{i}" for i in range(10)]
await asyncio.gather(
producer(queue, items),
consumer(queue, "worker-1"),
consumer(queue, "worker-2"),
)
asyncio.run(rate_limited_fetch())
异步上下文管理
"""
异步上下文管理器 & 异步迭代器
"""
import asyncio
from contextlib import asynccontextmanager
# 异步上下文管理器
@asynccontextmanager
async def async_db_connection(url: str):
"""模拟异步数据库连接"""
print(f"连接到 {url}...")
await asyncio.sleep(0.1) # 模拟连接
connection = {"url": url, "connected": True}
try:
yield connection
finally:
print("关闭连接")
await asyncio.sleep(0.05)
async def db_demo():
async with async_db_connection("postgres://localhost/mydb") as conn:
print(f"使用连接: {conn}")
# 异步迭代器
class AsyncPageIterator:
"""异步分页迭代器"""
def __init__(self, total_pages: int):
self.total_pages = total_pages
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.total_pages:
raise StopAsyncIteration
self.current += 1
await asyncio.sleep(0.2) # 模拟网络请求
return {"page": self.current, "items": [f"item_{i}" for i in range(10)]}
async def paginate():
async for page in AsyncPageIterator(5):
print(f"第 {page['page']} 页: {len(page['items'])} 条")
asyncio.run(paginate())
并发模式对比
| 模式 | 适用场景 | 并发控制 | Python 版本 |
|---|---|---|---|
gather | 等待所有任务完成 | 无限制 | 3.4+ |
TaskGroup | 结构化并发 | 自动取消 | 3.11+ |
as_completed | 先完成先处理 | 无限制 | 3.4+ |
Semaphore | 限制并发数 | 最大 N 个 | 3.4+ |
Queue | 生产者-消费者 | 队列大小 | 3.4+ |
wait_for | 单任务超时 | 超时取消 | 3.4+ |
timeout | 代码块超时 | 超时取消 | 3.11+ |
本章小结
| 知识点 | 要点 |
|---|---|
| TaskGroup | 3.11+ 推荐的结构化并发 |
| 超时 | asyncio.timeout() 或 wait_for() |
| 信号量 | Semaphore 控制并发数量 |
| 队列 | asyncio.Queue 生产者-消费者 |
| 异步上下文 | async with + @asynccontextmanager |
下一章:文件与 IO——文件读写与数据格式处理。