1|# 并发编程:多线程、协程和asyncio 2| 3|## Python并发编程的特殊性 4| 5|Java程序员写并发很直接——多线程就是真并行。但Python因为GIL的存在,多线程的行为和Java完全不同。 6| 7|## 一、GIL:Python并发最大的坑 8| 9|### 什么是GIL? 10| 11|GIL(Global Interpreter Lock,全局解释器锁)是CPython解释器中的一个互斥锁。同一时刻,只有一个线程能执行Python字节码。 12| 13|
14|Java多线程:
15|线程1: ████ ████ ████ ████ (真并行)
16|线程2: ████ ████ ████ ████
17|
18|Python多线程(CPU密集型):
19|线程1: ████_████_████ (交替执行)
20|线程2: _████████_ (同一时刻只有一个)
21|
22|
23|### 为什么有GIL?
24|
25|1. 引用计数垃圾回收的线程安全:Python用引用计数管理内存,多线程同时修改引用计数会导致内存泄漏
26|2. C扩展兼容性:很多C库(如NumPy底层)不是线程安全的,GIL简化了集成
27|3. 简化CPython实现:不需要在每个对象上加锁
28|
29|### GIL的释放时机
30|
31|GIL不是一直持有,以下情况会释放:
32|- IO操作:文件读写、网络请求、time.sleep()
33|- C扩展执行:NumPy的矩阵运算、Cython代码
34|- 字节码指令计数:每执行一定数量字节码就会检查是否释放
35|
36|### CPU密集型:多线程没用
37|
38| 39|import threading, time
40|
41|def cpu_bound():
42| total = 0
43| for i in range(10_000_000):
44| total += i
45|
46|start = time.time()
47|t1 = threading.Thread(target=cpu_bound)
48|t2 = threading.Thread(target=cpu_bound)
49|t1.start(); t2.start()
50|t1.join(); t2.join()
51|print(f"多线程: {time.time()-start:.2f}秒") # 比单线程还慢!
52|
53|
54|### GIL的未来
55|
56|- Python 3.12+:子解释器可以有独立GIL(PEP 684)
57|- Python 3.13+:实验性free-threaded模式(可选禁用GIL)
58|- PyPy、Jython:没有GIL的Python实现
59|
60|## 二、concurrent.futures:高级并发API
61|
62|Java程序员最熟悉的模型——线程池/进程池。
63|
64| 65|from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
66|
67|def download(url):
68| time.sleep(2)
69| return f"{url} done"
70|
71|# 线程池
72|with ThreadPoolExecutor(max_workers=5) as executor:
73| # map批量提交
74| results = list(executor.map(download, ["url1", "url2", "url3"]))
75|
76| # as_completed:先完成先处理
77| futures = {executor.submit(download, url): url for url in ["url1", "url2"]}
78| for future in as_completed(futures):
79| print(f"{futures[future]} 完成")
80|
81|# 进程池(CPU密集型)
82|with ProcessPoolExecutor(max_workers=4) as executor:
83| results = list(executor.map(cpu_intensive, data))
84|
85|
86|Java对比: ExecutorService + Future,几乎一样的API设计。
87|
88|## 三、asyncio:Python最强大的并发方式
89|
90|### 核心概念
91|
92|- 协程(Coroutine):async def定义的函数,遇到IO时主动让出控制权
93|- 事件循环(Event Loop):调度协程执行的引擎
94|- Task:协程的包装,可以并发执行
95|- await:暂停当前协程,等待结果
96|
97|### 基础用法
98|
99| 100|import asyncio
101|
102|async def fetch_data(url, delay):
103| print(f"开始请求 {url}")
104| await asyncio.sleep(delay) # 异步等待(不阻塞其他协程)
105| print(f"完成 {url}")
106| return {"url": url}
107|
108|async def main():
109| # gather:等待所有完成
110| results = await asyncio.gather(
111| fetch_data("url1", 2),
112| fetch_data("url2", 3),
113| fetch_data("url3", 1),
114| )
115| # 总耗时3秒(不是6秒)
116|
117| # as_completed:先完成先处理
118| tasks = [fetch_data(f"url{i}", i) for i in range(1, 4)]
119| for coro in asyncio.as_completed(tasks):
120| result = await coro
121| print(f"先完成: {result}")
122|
123|asyncio.run(main())
124|
125|
126|### 任务取消与异常处理
127|
128| 129|async def main():
130| task = asyncio.create_task(slow_task())
131| await asyncio.sleep(1)
132| task.cancel() # 取消任务
133|
134| # 异常处理:return_exceptions=True
135| results = await asyncio.gather(
136| risky_task(),
137| safe_task(),
138| return_exceptions=True, # 异常作为结果返回
139| )
140|
141|
142|### 异步迭代器和异步上下文管理器
143|
144| 145|# async for
146|async def main():
147| async for data in async_stream():
148| process(data)
149|
150|# async with
151|async with aiohttp.ClientSession() as session:
152| async with session.get(url) as resp:
153| data = await resp.json()
154|
155|
156|### asyncio与多线程混合
157|
158| 159|async def main():
160| loop = asyncio.get_event_loop()
161| with ThreadPoolExecutor() as pool:
162| # 在线程池中执行阻塞操作
163| result = await loop.run_in_executor(pool, blocking_function)
164|
165|
166|## 四、multiprocessing:绕过GIL
167|
168|### 进程间通信
169|
170| 171|from multiprocessing import Process, Queue, Pipe, Manager
172|
173|# Queue:生产者-消费者
174|def worker(queue):
175| queue.put("结果")
176|
177|q = Queue()
178|Process(target=worker, args=(q,)).start()
179|print(q.get()) # "结果"
180|
181|# Pipe:双向通信
182|parent, child = Pipe()
183|Process(target=sender, args=(child,)).start()
184|print(parent.recv())
185|
186|# Manager:共享对象
187|with Manager() as manager:
188| shared = manager.dict()
189| Process(target=worker, args=(shared,)).start()
190|
191|
192|## 五、选择指南
193|
194|| 场景 | 推荐 | 原因 |
195||------|------|------|
196|| IO密集(网络) | asyncio | 性能最好 |
197|| IO密集(简单) | threading | 容易写 |
198|| 已有同步代码 | concurrent.futures | 最小改动 |
199|| CPU密集 | multiprocessing | 绕过GIL |
200|| 混合场景 | asyncio + ProcessPoolExecutor | 最佳组合 |
201|
202|## 六、gevent/eventlet:其他方案
203|
204| 205|# gevent:基于协程的并发(monkey patch方式)
206|from gevent import monkey; monkey.patch_all()
207|import gevent
208|
209|def download(url):
210| gevent.sleep(2)
211| return f"{url} done"
212|
213|jobs = [gevent.spawn(download, url) for url in ["url1", "url2"]]
214|gevent.joinall(jobs)
215|
216|
217|注意: gevent需要monkey patch,侵入性较强。新项目推荐asyncio。
218|
219|## 总结
220|
221|| 特性 | Python | Java |
222||------|--------|------|
223|| GIL | 有(限制并行) | 没有 |
224|| 多线程 | threading | Thread |
225|| 进程池 | ProcessPoolExecutor | 不需要 |
226|| 异步 | asyncio/await | CompletableFuture |
227|| 高级API | concurrent.futures | ExecutorService |
228|
229|下一篇聊Python标准库精选。
230|
231|---
232|
233|本系列持续更新中,关注不迷路。