skip to content
Liu Yang's Blog

【asyncio】Python 协程(Coroutine)与 Process / Thread 对比

/ 6 min read

Table of Contents

在高并发程序中,我们通常会在 进程(Process)线程(Thread)协程(Coroutine) 之间做取舍。它们各自解决的问题不同,也各有适用场景。

image-20251220125605200

适用场景速览

  • Coroutine(协程) 👉 适合 大量等待的任务:I/O、网络请求、数据库访问等
  • Process(进程) 👉 适合 CPU 密集型任务:计算、压缩、模型推理等
  • Thread(线程) 👉 适合 需要共享内存、但 CPU 使用较低 的任务

asyncio 的核心思想

asyncio 基于 事件循环(Event Loop) 来调度协程:

  • 同一时刻只运行 一个协程
  • 当协程遇到 await(I/O 等待)时:
    • 主动让出执行权
    • 事件循环立即切换到下一个可运行的协程
  • 不涉及内核态切换,调度成本极低

👉 本质上是 单线程 + 协作式调度,用“并发”来最大化 CPU 利用率。


asyncio 基础示例

1️⃣ 最简协程并发示例

task1task2 会并发执行,task3 在它们完成之后才执行:

import asyncio
async def fetch_data(delay, id):
print("Fetching data... id:", id)
await asyncio.sleep(delay)
print("data fetched id:", id)
return {"data": "Some data", "id": id}
async def main():
task1 = fetch_data(2, 1)
task2 = fetch_data(2, 2)
result = await asyncio.gather(task1, task2)
task3 = fetch_data(2, 3)
print(result)
print(await task3)
asyncio.run(main())

2️⃣ TaskGroup(Python 3.11+)

TaskGroup 提供了结构化并发(Structured Concurrency):

  • async with 块内的任务 必须全部完成
  • 出现异常会自动取消其他任务
import asyncio
async def fetch_data(delay, id):
print("Fetching data... id:", id)
await asyncio.sleep(delay)
print("data fetched id:", id)
return {"data": "Some data", "id": id}
async def main():
tasks = []
async with asyncio.TaskGroup() as tg:
for i, sleep_time in enumerate([2, 1, 3], start=1):
task = tg.create_task(fetch_data(sleep_time, i))
tasks.append(task)
print([task.result() for task in tasks])
asyncio.run(main())

协程间通信(Synchronization & Communication)

Future:一次性结果通知

Future 表示 “将来某个时刻才会有的值”

  • 只能 set_result() 一次
  • 错误通过 set_exception(e) 传递
import asyncio
async def set_future_value(future, value):
await asyncio.sleep(2)
future.set_result(value)
print("Future result set")
async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()
asyncio.create_task(set_future_value(future, "result is ready"))
await future
print("Received future result")
asyncio.run(main())

Lock:协程锁

虽然协程是单线程的,但 共享状态 + await 仍然可能产生竞态条件:

import asyncio
shared_resource = 0
lock = asyncio.Lock()
async def modify_shared_resource():
global shared_resource
async with lock:
print(f"Before: {shared_resource}")
shared_resource += 1
await asyncio.sleep(1)
print(f"After: {shared_resource}")
async def main():
await asyncio.gather(*(modify_shared_resource() for _ in range(5)))
asyncio.run(main())

Semaphore:并发度限制

即信号量,控制“同时访问资源的协程数量”:

import asyncio
async def access_resource(semaphore, resource_id):
async with semaphore:
print(f"Accessing resource {resource_id}")
await asyncio.sleep(1)
print(f"Releasing resource {resource_id}")
async def main():
semaphore = asyncio.Semaphore(2)
await asyncio.gather(*(access_resource(semaphore, i) for i in range(10)))
asyncio.run(main())

Event:状态通知

Event 是一个 布尔标志

  • set() 后,所有等待者都会被唤醒
  • 之后调用 wait() 也会立即返回(直到 clear()
import asyncio
async def waiter(event):
print("Waiting for event")
await event.wait()
print("Event received")
async def setter(event):
await asyncio.sleep(2)
event.set()
print("Event set")
async def main():
event = asyncio.Event()
await asyncio.gather(waiter(event), setter(event))
asyncio.run(main())

并发(Concurrency) vs 并行(Parallelism)

  • 并发(Concurrency) 👉 在同一时间段内处理多件事情(逻辑同时)
  • 并行(Parallelism) 👉 在同一时刻真正执行多件事情(物理同时)

并行是实现并发的一种方式,但并发不一定是并行。


Process / Thread / Coroutine 对比

进程(Process)

  • 操作系统 资源分配的最小单位
  • 独立地址空间,互不影响
  • IPC 成本高(Pipe / Socket / 共享内存)
  • 天然支持多核并行

线程(Thread)

  • 操作系统 CPU 调度的最小单位
  • 同一进程内共享内存
  • 上下文切换需要内核参与
  • 需要锁来保护共享数据

协程(Coroutine)

  • 运行在 线程内部
  • 用户态调度(无系统调用)
  • 协作式切换(await 显式让出)
  • 更适合高并发 I/O

协程 vs 线程

维度线程协程
调度者操作系统用户态
调度方式抢占式协作式
切换成本极低
是否并行
是否需要锁必须通常不需要

一句话总结

进程解决资源隔离,线程解决 CPU 并行,协程解决高并发下的调度成本。


协程间通信选型建议

  • Queue
    • 生产者 / 消费者模型
    • 支持多条消息
    • 常用 sentinel(如 None)通知退出
  • Future
    • 一次性结果
    • 只能完成一次
  • Event
    • 状态通知
    • 不适合逐条消息

参考文献

Asyncio in Python - Full Tutorial

进程和线程 - Python教程 - 廖雪峰的官方网站