并发编程:多线程、协程和asyncio

admin 📖 13 分钟阅读

1|# 并发编程:多线程、协程和asyncio 2| 3|## Python并发编程的特殊性 4| 5|Java程序员写并发很直接——多线程就是真并行。但Python因为GIL的存在,多线程的行为和Java完全不同。 6| 7|## 一、GIL:Python并发最大的坑 8| 9|### 什么是GIL? 10| 11|GIL(Global Interpreter Lock,全局解释器锁)是CPython解释器中的一个互斥锁。同一时刻,只有一个线程能执行Python字节码。 12| 13|

    14|Java多线程:
    15|线程1: ████ ████ ████ ████  (真并行)
    16|线程2: ████ ████ ████ ████
    17|
    18|Python多线程(CPU密集型):
    19|线程1: ████_████_████  (交替执行)
    20|线程2: _████████_  (同一时刻只有一个)
    21|
22| 23|### 为什么有GIL? 24| 25|1. 引用计数垃圾回收的线程安全:Python用引用计数管理内存,多线程同时修改引用计数会导致内存泄漏 26|2. C扩展兼容性:很多C库(如NumPy底层)不是线程安全的,GIL简化了集成 27|3. 简化CPython实现:不需要在每个对象上加锁 28| 29|### GIL的释放时机 30| 31|GIL不是一直持有,以下情况会释放: 32|- IO操作:文件读写、网络请求、time.sleep() 33|- C扩展执行:NumPy的矩阵运算、Cython代码 34|- 字节码指令计数:每执行一定数量字节码就会检查是否释放 35| 36|### CPU密集型:多线程没用 37| 38|
    39|import threading, time
    40|
    41|def cpu_bound():
    42|    total = 0
    43|    for i in range(10_000_000):
    44|        total += i
    45|
    46|start = time.time()
    47|t1 = threading.Thread(target=cpu_bound)
    48|t2 = threading.Thread(target=cpu_bound)
    49|t1.start(); t2.start()
    50|t1.join(); t2.join()
    51|print(f"多线程: {time.time()-start:.2f}秒")  # 比单线程还慢!
    52|
53| 54|### GIL的未来 55| 56|- Python 3.12+:子解释器可以有独立GIL(PEP 684) 57|- Python 3.13+:实验性free-threaded模式(可选禁用GIL) 58|- PyPy、Jython:没有GIL的Python实现 59| 60|## 二、concurrent.futures:高级并发API 61| 62|Java程序员最熟悉的模型——线程池/进程池。 63| 64|
    65|from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
    66|
    67|def download(url):
    68|    time.sleep(2)
    69|    return f"{url} done"
    70|
    71|# 线程池
    72|with ThreadPoolExecutor(max_workers=5) as executor:
    73|    # map批量提交
    74|    results = list(executor.map(download, ["url1", "url2", "url3"]))
    75|  
76| # as_completed:先完成先处理 77| futures = {executor.submit(download, url): url for url in ["url1", "url2"]} 78| for future in as_completed(futures): 79| print(f"{futures[future]} 完成") 80| 81|# 进程池(CPU密集型) 82|with ProcessPoolExecutor(max_workers=4) as executor: 83| results = list(executor.map(cpu_intensive, data)) 84|
85| 86|Java对比: ExecutorService + Future,几乎一样的API设计。 87| 88|## 三、asyncio:Python最强大的并发方式 89| 90|### 核心概念 91| 92|- 协程(Coroutine):async def定义的函数,遇到IO时主动让出控制权 93|- 事件循环(Event Loop):调度协程执行的引擎 94|- Task:协程的包装,可以并发执行 95|- await:暂停当前协程,等待结果 96| 97|### 基础用法 98| 99|
   100|import asyncio
   101|
   102|async def fetch_data(url, delay):
   103|    print(f"开始请求 {url}")
   104|    await asyncio.sleep(delay)  # 异步等待(不阻塞其他协程)
   105|    print(f"完成 {url}")
   106|    return {"url": url}
   107|
   108|async def main():
   109|    # gather:等待所有完成
   110|    results = await asyncio.gather(
   111|        fetch_data("url1", 2),
   112|        fetch_data("url2", 3),
   113|        fetch_data("url3", 1),
   114|    )
   115|    # 总耗时3秒(不是6秒)
   116|  
117| # as_completed:先完成先处理 118| tasks = [fetch_data(f"url{i}", i) for i in range(1, 4)] 119| for coro in asyncio.as_completed(tasks): 120| result = await coro 121| print(f"先完成: {result}") 122| 123|asyncio.run(main()) 124|
125| 126|### 任务取消与异常处理 127| 128|
   129|async def main():
   130|    task = asyncio.create_task(slow_task())
   131|    await asyncio.sleep(1)
   132|    task.cancel()  # 取消任务
   133|  
134| # 异常处理:return_exceptions=True 135| results = await asyncio.gather( 136| risky_task(), 137| safe_task(), 138| return_exceptions=True, # 异常作为结果返回 139| ) 140|
141| 142|### 异步迭代器和异步上下文管理器 143| 144|
   145|# async for
   146|async def main():
   147|    async for data in async_stream():
   148|        process(data)
   149|
   150|# async with
   151|async with aiohttp.ClientSession() as session:
   152|    async with session.get(url) as resp:
   153|        data = await resp.json()
   154|
155| 156|### asyncio与多线程混合 157| 158|
   159|async def main():
   160|    loop = asyncio.get_event_loop()
   161|    with ThreadPoolExecutor() as pool:
   162|        # 在线程池中执行阻塞操作
   163|        result = await loop.run_in_executor(pool, blocking_function)
   164|
165| 166|## 四、multiprocessing:绕过GIL 167| 168|### 进程间通信 169| 170|
   171|from multiprocessing import Process, Queue, Pipe, Manager
   172|
   173|# Queue:生产者-消费者
   174|def worker(queue):
   175|    queue.put("结果")
   176|
   177|q = Queue()
   178|Process(target=worker, args=(q,)).start()
   179|print(q.get())  # "结果"
   180|
   181|# Pipe:双向通信
   182|parent, child = Pipe()
   183|Process(target=sender, args=(child,)).start()
   184|print(parent.recv())
   185|
   186|# Manager:共享对象
   187|with Manager() as manager:
   188|    shared = manager.dict()
   189|    Process(target=worker, args=(shared,)).start()
   190|
191| 192|## 五、选择指南 193| 194|| 场景 | 推荐 | 原因 | 195||------|------|------| 196|| IO密集(网络) | asyncio | 性能最好 | 197|| IO密集(简单) | threading | 容易写 | 198|| 已有同步代码 | concurrent.futures | 最小改动 | 199|| CPU密集 | multiprocessing | 绕过GIL | 200|| 混合场景 | asyncio + ProcessPoolExecutor | 最佳组合 | 201| 202|## 六、gevent/eventlet:其他方案 203| 204|
   205|# gevent:基于协程的并发(monkey patch方式)
   206|from gevent import monkey; monkey.patch_all()
   207|import gevent
   208|
   209|def download(url):
   210|    gevent.sleep(2)
   211|    return f"{url} done"
   212|
   213|jobs = [gevent.spawn(download, url) for url in ["url1", "url2"]]
   214|gevent.joinall(jobs)
   215|
216| 217|注意: gevent需要monkey patch,侵入性较强。新项目推荐asyncio。 218| 219|## 总结 220| 221|| 特性 | Python | Java | 222||------|--------|------| 223|| GIL | 有(限制并行) | 没有 | 224|| 多线程 | threading | Thread | 225|| 进程池 | ProcessPoolExecutor | 不需要 | 226|| 异步 | asyncio/await | CompletableFuture | 227|| 高级API | concurrent.futures | ExecutorService | 228| 229|下一篇聊Python标准库精选。 230| 231|--- 232| 233|本系列持续更新中,关注不迷路。

🤖 本文内容由AI辅助整理生成,仅供参考
← 上一篇 包管理和虚拟环境:告别DLL地狱 下一篇 → Python标准库精选:这些内置工具你一定要会