【asyncio】Python 协程(Coroutine)与 Process / Thread 对比
/ 6 min read
Table of Contents
在高并发程序中,我们通常会在 进程(Process)、线程(Thread) 和 协程(Coroutine) 之间做取舍。它们各自解决的问题不同,也各有适用场景。
适用场景速览
- Coroutine(协程) 👉 适合 大量等待的任务:I/O、网络请求、数据库访问等
- Process(进程) 👉 适合 CPU 密集型任务:计算、压缩、模型推理等
- Thread(线程) 👉 适合 需要共享内存、但 CPU 使用较低 的任务
asyncio 的核心思想
asyncio 基于 事件循环(Event Loop) 来调度协程:
- 同一时刻只运行 一个协程
- 当协程遇到
await(I/O 等待)时:- 主动让出执行权
- 事件循环立即切换到下一个可运行的协程
- 不涉及内核态切换,调度成本极低
👉 本质上是 单线程 + 协作式调度,用“并发”来最大化 CPU 利用率。
asyncio 基础示例
1️⃣ 最简协程并发示例
task1 和 task2 会并发执行,task3 在它们完成之后才执行:
import asyncio
async def fetch_data(delay, id): print("Fetching data... id:", id) await asyncio.sleep(delay) print("data fetched id:", id) return {"data": "Some data", "id": id}
async def main(): task1 = fetch_data(2, 1) task2 = fetch_data(2, 2)
result = await asyncio.gather(task1, task2)
task3 = fetch_data(2, 3)
print(result) print(await task3)
asyncio.run(main())2️⃣ TaskGroup(Python 3.11+)
TaskGroup 提供了结构化并发(Structured Concurrency):
async with块内的任务 必须全部完成- 出现异常会自动取消其他任务
import asyncio
async def fetch_data(delay, id): print("Fetching data... id:", id) await asyncio.sleep(delay) print("data fetched id:", id) return {"data": "Some data", "id": id}
async def main(): tasks = []
async with asyncio.TaskGroup() as tg: for i, sleep_time in enumerate([2, 1, 3], start=1): task = tg.create_task(fetch_data(sleep_time, i)) tasks.append(task)
print([task.result() for task in tasks])
asyncio.run(main())协程间通信(Synchronization & Communication)
Future:一次性结果通知
Future 表示 “将来某个时刻才会有的值”:
- 只能
set_result()一次 - 错误通过
set_exception(e)传递
import asyncio
async def set_future_value(future, value): await asyncio.sleep(2) future.set_result(value) print("Future result set")
async def main(): loop = asyncio.get_running_loop() future = loop.create_future()
asyncio.create_task(set_future_value(future, "result is ready")) await future
print("Received future result")
asyncio.run(main())Lock:协程锁
虽然协程是单线程的,但 共享状态 + await 仍然可能产生竞态条件:
import asyncio
shared_resource = 0lock = asyncio.Lock()
async def modify_shared_resource(): global shared_resource async with lock: print(f"Before: {shared_resource}") shared_resource += 1 await asyncio.sleep(1) print(f"After: {shared_resource}")
async def main(): await asyncio.gather(*(modify_shared_resource() for _ in range(5)))
asyncio.run(main())Semaphore:并发度限制
即信号量,控制“同时访问资源的协程数量”:
import asyncio
async def access_resource(semaphore, resource_id): async with semaphore: print(f"Accessing resource {resource_id}") await asyncio.sleep(1) print(f"Releasing resource {resource_id}")
async def main(): semaphore = asyncio.Semaphore(2) await asyncio.gather(*(access_resource(semaphore, i) for i in range(10)))
asyncio.run(main())Event:状态通知
Event 是一个 布尔标志:
set()后,所有等待者都会被唤醒- 之后调用
wait()也会立即返回(直到clear())
import asyncio
async def waiter(event): print("Waiting for event") await event.wait() print("Event received")
async def setter(event): await asyncio.sleep(2) event.set() print("Event set")
async def main(): event = asyncio.Event() await asyncio.gather(waiter(event), setter(event))
asyncio.run(main())并发(Concurrency) vs 并行(Parallelism)
- 并发(Concurrency) 👉 在同一时间段内处理多件事情(逻辑同时)
- 并行(Parallelism) 👉 在同一时刻真正执行多件事情(物理同时)
并行是实现并发的一种方式,但并发不一定是并行。
Process / Thread / Coroutine 对比
进程(Process)
- 操作系统 资源分配的最小单位
- 独立地址空间,互不影响
- IPC 成本高(Pipe / Socket / 共享内存)
- 天然支持多核并行
线程(Thread)
- 操作系统 CPU 调度的最小单位
- 同一进程内共享内存
- 上下文切换需要内核参与
- 需要锁来保护共享数据
协程(Coroutine)
- 运行在 线程内部
- 用户态调度(无系统调用)
- 协作式切换(
await显式让出) - 更适合高并发 I/O
协程 vs 线程
| 维度 | 线程 | 协程 |
|---|---|---|
| 调度者 | 操作系统 | 用户态 |
| 调度方式 | 抢占式 | 协作式 |
| 切换成本 | 中 | 极低 |
| 是否并行 | ✅ | ❌ |
| 是否需要锁 | 必须 | 通常不需要 |
一句话总结
进程解决资源隔离,线程解决 CPU 并行,协程解决高并发下的调度成本。
协程间通信选型建议
- Queue
- 生产者 / 消费者模型
- 支持多条消息
- 常用 sentinel(如
None)通知退出
- Future
- 一次性结果
- 只能完成一次
- Event
- 状态通知
- 不适合逐条消息