并发与异步编程
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read163 words

并发与异步编程

让程序同时干多件事——多线程处理 IO 密集,多进程跑 CPU 密集,asyncio 应对高并发网络。

并发模型对比

graph TD CONCURRENCY[Python 并发] --> THREAD[多线程 threading] CONCURRENCY --> PROCESS[多进程 multiprocessing] CONCURRENCY --> ASYNC[异步 asyncio] THREAD --> |GIL 限制| IO[IO 密集型] PROCESS --> |真并行| CPU[CPU 密集型] ASYNC --> |协程| NET[高并发网络] IO --> T1[文件读写] IO --> T2[网络请求] CPU --> P1[数据计算] CPU --> P2[图像处理] NET --> A1[Web 服务器] NET --> A2[爬虫] style CONCURRENCY fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style THREAD fill:#fff3e0,stroke:#f57c00,stroke-width:2px style PROCESS fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style ASYNC fill:#fce4ec,stroke:#c62828,stroke-width:2px

多线程

"""
多线程:适合 IO 密集型任务
"""
import threading
from concurrent.futures import ThreadPoolExecutor
import time
# === 基本多线程 ===
def download(url: str) -> str:
"""模拟下载"""
print(f"下载 {url}...")
time.sleep(1)  # 模拟网络延迟
return f"{url} 完成"
# 创建线程
threads = []
urls = [f"https://api.example.com/page/{i}" for i in range(5)]
start = time.time()
for url in urls:
t = threading.Thread(target=download, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()  # 等待所有线程完成
print(f"多线程耗时: {time.time() - start:.2f}s")  # ~1s
# === 线程池(推荐)===
def fetch_data(url: str) -> dict:
"""模拟 API 请求"""
time.sleep(0.5)
return {"url": url, "status": 200}
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_data, url) for url in urls]
results = [f.result() for f in futures]
print(f"获取 {len(results)} 个结果")
# 更简洁的 map
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_data, urls))
print(f"获取 {len(results)} 个结果")
# === 线程安全 ===
class SafeCounter:
"""线程安全计数器"""
def __init__(self):
self.count = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.count += 1
def get(self) -> int:
with self.lock:
return self.count
counter = SafeCounter()
threads = []
for _ in range(1000):
t = threading.Thread(target=counter.increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"计数: {counter.get()}")  # 1000(线程安全)

多进程

"""
多进程:绕过 GIL,真正的并行计算
"""
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool, cpu_count
import time
# === CPU 密集型任务 ===
def compute_heavy(n: int) -> int:
"""CPU 密集计算"""
total = 0
for i in range(n):
total += i ** 2
return total
# 进程池
def benchmark():
"""对比单进程和多进程"""
nums = [10_000_000] * 8
# 单进程
start = time.time()
results_single = [compute_heavy(n) for n in nums]
t_single = time.time() - start
# 多进程
start = time.time()
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
results_multi = list(executor.map(compute_heavy, nums))
t_multi = time.time() - start
print(f"单进程: {t_single:.2f}s")
print(f"多进程 ({cpu_count()} 核): {t_multi:.2f}s")
print(f"加速比: {t_single / t_multi:.1f}x")
# benchmark()  # 取消注释运行
# === 进程间通信 ===
from multiprocessing import Queue, Process
def producer(queue: Queue, items: list):
"""生产者"""
for item in items:
queue.put(item)
queue.put(None)  # 结束信号
def consumer(queue: Queue):
"""消费者"""
while True:
item = queue.get()
if item is None:
break
print(f"处理: {item}")
# 使用示例
print(f"CPU 核心数: {cpu_count()}")

asyncio 异步编程

"""
asyncio:高并发 IO 利器
"""
import asyncio
# === 基本协程 ===
async def say_hello(name: str, delay: float):
"""异步函数(协程)"""
await asyncio.sleep(delay)  # 非阻塞等待
print(f"Hello, {name}!")
return f"{name} done"
async def main_basic():
"""基本使用"""
# 串行执行
await say_hello("A", 1)
await say_hello("B", 1)
# 总耗时 ~2s
# 并发执行
results = await asyncio.gather(
say_hello("C", 1),
say_hello("D", 1),
say_hello("E", 1),
)
# 总耗时 ~1s
print(results)
# asyncio.run(main_basic())
# === 实用:并发 HTTP 请求 ===
async def fetch_url(session, url: str) -> dict:
"""异步 HTTP 请求"""
# 需要 pip install aiohttp
# async with session.get(url) as response:
#     return await response.json()
await asyncio.sleep(0.1)  # 模拟
return {"url": url, "status": 200}
async def fetch_all(urls: list[str]) -> list[dict]:
"""并发请求所有 URL"""
# import aiohttp
# async with aiohttp.ClientSession() as session:
#     tasks = [fetch_url(session, url) for url in urls]
#     return await asyncio.gather(*tasks)
tasks = [fetch_url(None, url) for url in urls]
return await asyncio.gather(*tasks)
# === 异步生成器 ===
async def async_range(start: int, stop: int, delay: float = 0.1):
"""异步生成器"""
for i in range(start, stop):
await asyncio.sleep(delay)
yield i
async def consume_async():
async for num in async_range(0, 5):
print(num, end=" ")
print()
# asyncio.run(consume_async())
# === 信号量控制并发 ===
async def limited_fetch(sem: asyncio.Semaphore, url: str) -> dict:
"""限制并发数"""
async with sem:
await asyncio.sleep(0.1)
return {"url": url}
async def main_limited():
sem = asyncio.Semaphore(10)  # 最多 10 个并发
urls = [f"https://api.example.com/{i}" for i in range(100)]
tasks = [limited_fetch(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
# asyncio.run(main_limited())
print("asyncio 示例准备就绪,取消注释 asyncio.run() 运行")

性能对比

方案 适用场景 GIL 影响 开销 示例
多线程 IO 密集 受限 文件读写、网络请求
多进程 CPU 密集 不受 数学计算、图像处理
asyncio 高并发 IO 不受 最低 Web 服务、爬虫
顺序执行 简单任务 - 脚本、小工具

何时用哪个

"""
选择指南
"""
DECISION_GUIDE = {
"网络爬虫 (100+ URL)": "asyncio + aiohttp",
"文件批量处理": "ThreadPoolExecutor",
"图像/视频处理": "ProcessPoolExecutor",
"Web 服务器": "asyncio (FastAPI/Starlette)",
"数据科学计算": "ProcessPoolExecutor + numpy",
"简单脚本": "顺序执行就好",
"数据库操作": "ThreadPoolExecutor 或 asyncio",
}
for scenario, solution in DECISION_GUIDE.items():
print(f"  {scenario} → {solution}")

下一章:文件与 IO——文件读写、JSON/CSV/YAML 处理。