并发与异步编程
让程序同时干多件事——多线程处理 IO 密集,多进程跑 CPU 密集,asyncio 应对高并发网络。
并发模型对比
graph TD
CONCURRENCY[Python 并发] --> THREAD[多线程 threading]
CONCURRENCY --> PROCESS[多进程 multiprocessing]
CONCURRENCY --> ASYNC[异步 asyncio]
THREAD --> |GIL 限制| IO[IO 密集型]
PROCESS --> |真并行| CPU[CPU 密集型]
ASYNC --> |协程| NET[高并发网络]
IO --> T1[文件读写]
IO --> T2[网络请求]
CPU --> P1[数据计算]
CPU --> P2[图像处理]
NET --> A1[Web 服务器]
NET --> A2[爬虫]
style CONCURRENCY fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style THREAD fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style PROCESS fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style ASYNC fill:#fce4ec,stroke:#c62828,stroke-width:2px
多线程
"""
多线程:适合 IO 密集型任务
"""
import threading
from concurrent.futures import ThreadPoolExecutor
import time
# === 基本多线程 ===
def download(url: str) -> str:
"""模拟下载"""
print(f"下载 {url}...")
time.sleep(1) # 模拟网络延迟
return f"{url} 完成"
# 创建线程
threads = []
urls = [f"https://api.example.com/page/{i}" for i in range(5)]
start = time.time()
for url in urls:
t = threading.Thread(target=download, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join() # 等待所有线程完成
print(f"多线程耗时: {time.time() - start:.2f}s") # ~1s
# === 线程池(推荐)===
def fetch_data(url: str) -> dict:
"""模拟 API 请求"""
time.sleep(0.5)
return {"url": url, "status": 200}
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_data, url) for url in urls]
results = [f.result() for f in futures]
print(f"获取 {len(results)} 个结果")
# 更简洁的 map
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_data, urls))
print(f"获取 {len(results)} 个结果")
# === 线程安全 ===
class SafeCounter:
"""线程安全计数器"""
def __init__(self):
self.count = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.count += 1
def get(self) -> int:
with self.lock:
return self.count
counter = SafeCounter()
threads = []
for _ in range(1000):
t = threading.Thread(target=counter.increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"计数: {counter.get()}") # 1000(线程安全)
多进程
"""
多进程:绕过 GIL,真正的并行计算
"""
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool, cpu_count
import time
# === CPU 密集型任务 ===
def compute_heavy(n: int) -> int:
"""CPU 密集计算"""
total = 0
for i in range(n):
total += i ** 2
return total
# 进程池
def benchmark():
"""对比单进程和多进程"""
nums = [10_000_000] * 8
# 单进程
start = time.time()
results_single = [compute_heavy(n) for n in nums]
t_single = time.time() - start
# 多进程
start = time.time()
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
results_multi = list(executor.map(compute_heavy, nums))
t_multi = time.time() - start
print(f"单进程: {t_single:.2f}s")
print(f"多进程 ({cpu_count()} 核): {t_multi:.2f}s")
print(f"加速比: {t_single / t_multi:.1f}x")
# benchmark() # 取消注释运行
# === 进程间通信 ===
from multiprocessing import Queue, Process
def producer(queue: Queue, items: list):
"""生产者"""
for item in items:
queue.put(item)
queue.put(None) # 结束信号
def consumer(queue: Queue):
"""消费者"""
while True:
item = queue.get()
if item is None:
break
print(f"处理: {item}")
# 使用示例
print(f"CPU 核心数: {cpu_count()}")
asyncio 异步编程
"""
asyncio:高并发 IO 利器
"""
import asyncio
# === 基本协程 ===
async def say_hello(name: str, delay: float):
"""异步函数(协程)"""
await asyncio.sleep(delay) # 非阻塞等待
print(f"Hello, {name}!")
return f"{name} done"
async def main_basic():
"""基本使用"""
# 串行执行
await say_hello("A", 1)
await say_hello("B", 1)
# 总耗时 ~2s
# 并发执行
results = await asyncio.gather(
say_hello("C", 1),
say_hello("D", 1),
say_hello("E", 1),
)
# 总耗时 ~1s
print(results)
# asyncio.run(main_basic())
# === 实用:并发 HTTP 请求 ===
async def fetch_url(session, url: str) -> dict:
"""异步 HTTP 请求"""
# 需要 pip install aiohttp
# async with session.get(url) as response:
# return await response.json()
await asyncio.sleep(0.1) # 模拟
return {"url": url, "status": 200}
async def fetch_all(urls: list[str]) -> list[dict]:
"""并发请求所有 URL"""
# import aiohttp
# async with aiohttp.ClientSession() as session:
# tasks = [fetch_url(session, url) for url in urls]
# return await asyncio.gather(*tasks)
tasks = [fetch_url(None, url) for url in urls]
return await asyncio.gather(*tasks)
# === 异步生成器 ===
async def async_range(start: int, stop: int, delay: float = 0.1):
"""异步生成器"""
for i in range(start, stop):
await asyncio.sleep(delay)
yield i
async def consume_async():
async for num in async_range(0, 5):
print(num, end=" ")
print()
# asyncio.run(consume_async())
# === 信号量控制并发 ===
async def limited_fetch(sem: asyncio.Semaphore, url: str) -> dict:
"""限制并发数"""
async with sem:
await asyncio.sleep(0.1)
return {"url": url}
async def main_limited():
sem = asyncio.Semaphore(10) # 最多 10 个并发
urls = [f"https://api.example.com/{i}" for i in range(100)]
tasks = [limited_fetch(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
# asyncio.run(main_limited())
print("asyncio 示例准备就绪,取消注释 asyncio.run() 运行")
性能对比
| 方案 | 适用场景 | GIL 影响 | 开销 | 示例 |
|---|---|---|---|---|
| 多线程 | IO 密集 | 受限 | 低 | 文件读写、网络请求 |
| 多进程 | CPU 密集 | 不受 | 高 | 数学计算、图像处理 |
| asyncio | 高并发 IO | 不受 | 最低 | Web 服务、爬虫 |
| 顺序执行 | 简单任务 | - | 无 | 脚本、小工具 |
何时用哪个
"""
选择指南
"""
DECISION_GUIDE = {
"网络爬虫 (100+ URL)": "asyncio + aiohttp",
"文件批量处理": "ThreadPoolExecutor",
"图像/视频处理": "ProcessPoolExecutor",
"Web 服务器": "asyncio (FastAPI/Starlette)",
"数据科学计算": "ProcessPoolExecutor + numpy",
"简单脚本": "顺序执行就好",
"数据库操作": "ThreadPoolExecutor 或 asyncio",
}
for scenario, solution in DECISION_GUIDE.items():
print(f" {scenario} → {solution}")
下一章:文件与 IO——文件读写、JSON/CSV/YAML 处理。