Python 进程池完全指南:从原理到实战
一、为什么需要进程池?
在 Python 中,当我们处理 CPU 密集型任务时,进程池(Process Pool) 是绕不开的核心工具。要理解它的价值,得先从 Python 的 GIL(全局解释器锁)说起。
1.1 GIL 与线程池的局限
Python 的 GIL 确保同一时刻只有一个线程在执行 Python 字节码。这意味着:
- 线程适合 I/O 密集型任务(文件读写、网络请求等等待场景)
- 线程对 CPU 密集型任务几乎无效,多个线程抢一把锁,实际是"伪并行"
# 演示 GIL 对 CPU 密集任务的限制
import time
from threading import Thread
def cpu_hog(n):
"""纯 CPU 计算"""
while n > 0:
n -= 1
# 单线程耗时
start = time.time()
cpu_hog(50_000_000)
print(f"单线程: {time.time() - start:.2f}s")
# 多线程(受 GIL 限制,几乎无加速)
start = time.time()
t1 = Thread(target=cpu_hog, args=(25_000_000,))
t2 = Thread(target=cpu_hog, args=(25_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"双线程: {time.time() - start:.2f}s")
输出大致为:
单线程: 2.15s
双线程: 2.20s ← 几乎没有加速
1.2 进程池的优势
进程池使用 多进程 而非多线程,每个进程拥有独立的 Python 解释器和内存空间,彻底绕开 GIL。
| 对比维度 | 线程池 (ThreadPool) | 进程池 (ProcessPool) |
|---|---|---|
| 适用场景 | I/O 密集型 | CPU 密集型 |
| 内存共享 | 共享(需加锁) | 独立(需序列化) |
| GIL 限制 | 受限制 | 无限制 |
| 创建开销 | 较小 | 较大 |
| 通信成本 | 低(直接读内存) | 高(pickle 序列化) |
二、multiprocessing.Pool 经典方案
multiprocessing 是 Python 标准库提供的多进程模块,其中的 Pool 类提供了最经典的进程池实现。
2.1 基本使用
from multiprocessing import Pool
import os
def square(x):
return x * x
# 创建进程池(默认使用 CPU 核心数)
with Pool() as pool:
results = pool.map(square, range(10))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
2.2 核心方法详解
pool.map() —— 批量同步映射
类似内置 map(),将可迭代对象中的每个元素分配给进程池处理,阻塞等待全部结果返回。
import time
from multiprocessing import Pool
def slow_square(x):
time.sleep(0.5)
return x ** 2
start = time.time()
with Pool(4) as pool:
# 并行处理 8 个任务(4 个 worker 各处理 2 个)
results = pool.map(slow_square, range(8))
print(f"耗时: {time.time() - start:.2f}s") # ~1.0s(顺序需 4s)
print(results)
map的变体:starmap
如果函数接收多个参数,使用starmap:def add(a, b): return a + b with Pool() as pool: print(pool.starmap(add, [(1,2), (3,4)])) # [3, 7]
pool.apply() —— 同步执行单个任务
apply 在某个 worker 进程上执行一次函数调用,阻塞等待结果。
from multiprocessing import Pool
def factorial(n):
result = 1
for i in range(2, n + 1):
result *= i
return result
with Pool(2) as pool:
# 同步执行单个任务
result = pool.apply(factorial, args=(10,))
print(result) # 3628800
pool.apply_async() —— 异步执行的核心
apply_async 是异步非阻塞版本,立即返回一个 AsyncResult 对象,主进程可继续做其他事,稍后通过 .get() 获取结果。
from multiprocessing import Pool
import time
def slow_fib(n):
if n <= 1:
return n
a, b = 0, 1
for _ in range(n - 1):
a, b = b, a + b
return b
with Pool(4) as pool:
# 异步提交任务
results = [pool.apply_async(slow_fib, args=(i,)) for i in range(10)]
# 主进程可以在这里做其他事情
print("任务已提交,主进程做其他事...")
time.sleep(0.5)
# 获取结果
fibs = [r.get() for r in results]
print(fibs) # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
pool.map_async() —— 异步版本的 map
from multiprocessing import Pool
with Pool(4) as pool:
# 异步 map,返回 MapResult 对象
result_obj = pool.map_async(slow_square, range(16))
# 检查是否完成
if not result_obj.ready():
print("计算中...")
# 等待并获取结果
squares = result_obj.get()
print(squares)
2.3 Callback 回调函数
apply_async 和 map_async 支持回调,任务完成后自动执行回调函数,不需要手动 .get()。
from multiprocessing import Pool
import time
def task(x):
time.sleep(0.5)
return x * 10
def on_success(result):
print(f"任务完成,结果: {result}")
def on_error(exc):
print(f"任务失败,异常: {exc}")
with Pool(3) as pool:
for i in range(6):
pool.apply_async(
task,
args=(i,),
callback=on_success, # 成功时回调
error_callback=on_error # 失败时回调
)
pool.close()
pool.join()
回调的实际应用场景:
import requests
from multiprocessing import Pool
def fetch_url(url):
"""下载网页内容"""
resp = requests.get(url, timeout=10)
return (url, resp.status_code, len(resp.text))
def save_result(result):
"""下载完成后写入日志(回调函数)"""
url, status, size = result
with open("download_log.txt", "a") as f:
f.write(f"[{status}] {url} ({size} bytes)\n")
print(f"✓ {url} 已记录")
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
]
with Pool(3) as pool:
for url in urls:
pool.apply_async(fetch_url, args=(url,), callback=save_result)
pool.close()
pool.join()
print("所有任务完成")
2.4 进程池生命周期管理
from multiprocessing import Pool
pool = Pool(4)
# 提交若干任务
for _ in range(100):
pool.apply_async(do_work)
# 关闭:不再接受新任务
pool.close()
# 等待所有 worker 结束
pool.join()
# 强制终止所有 worker(不等待)
pool.terminate()
三、concurrent.futures.ProcessPoolExecutor 现代方案
Python 3.2+ 引入的 concurrent.futures 模块提供了更现代、更统一的接口。ProcessPoolExecutor 与 ThreadPoolExecutor 使用相同的 API,切换成本极低。
3.1 基本使用
from concurrent.futures import ProcessPoolExecutor
import os
def square(x):
return x * x
with ProcessPoolExecutor() as executor:
futures = [executor.submit(square, i) for i in range(10)]
results = [f.result() for f in futures]
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
3.2 核心方法
executor.submit() —— 提交单个任务
返回 Future 对象,通过 .result() 获取结果(可设置超时)。
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def heavy_task(n):
time.sleep(n)
return f"任务 {n} 完成"
with ProcessPoolExecutor(max_workers=4) as executor:
# 提交任务
future_map = {executor.submit(heavy_task, i): i for i in [3, 1, 2]}
# as_completed 按完成顺序遍历
for future in as_completed(future_map):
task_id = future_map[future]
try:
result = future.result()
print(f"任务 {task_id}: {result}")
except Exception as e:
print(f"任务 {task_id} 出错: {e}")
executor.map() —— 批量提交
类似 Pool.map(),但更简洁。
from concurrent.futures import ProcessPoolExecutor
def cube(x):
return x ** 3
with ProcessPoolExecutor(4) as executor:
# map 返回迭代器,按输入顺序产出结果
results = executor.map(cube, range(20))
print(list(results))
# 支持多参数——使用 starmap 风格
def add(a, b, c):
return a + b + c
# 多个可迭代对象对应多个参数
results = executor.map(add, [1,2,3], [4,5,6], [7,8,9])
print(list(results)) # [12, 15, 18]
executor.map()支持timeout和chunksize参数:results = executor.map( heavy_func, range(1000), chunksize=50, # 将任务切块分发,减少通信开销 timeout=30 # 每个任务超时 30 秒 )
3.3 Callback 回调
concurrent.futures 通过 Future.add_done_callback() 实现回调:
from concurrent.futures import ProcessPoolExecutor
import time
def work(n):
time.sleep(0.5)
return n * 2
def on_done(future):
try:
result = future.result()
print(f"回调: 结果 = {result}")
except Exception as e:
print(f"回调: 出错 = {e}")
with ProcessPoolExecutor(4) as executor:
for i in range(6):
future = executor.submit(work, i)
future.add_done_callback(on_done)
# 与 multiprocessing 不同,这里退出上下文管理器会自动 join
3.4 上下文管理器
ProcessPoolExecutor 支持 with 语句,退出时自动调用 shutdown(wait=True)。
# 等价写法
executor = ProcessPoolExecutor(4)
try:
futures = [executor.submit(task, i) for i in range(10)]
results = [f.result() for f in futures]
finally:
executor.shutdown(wait=True)
四、map / apply / apply_async 对比总结
| 方法 | 所属模块 | 是否阻塞 | 批量 | 回调支持 | 适用场景 |
|---|---|---|---|---|---|
Pool.map() |
multiprocessing | ✅ 阻塞 | ✅ 是 | ❌ | 批量同步处理,结果顺序相关 |
Pool.apply() |
multiprocessing | ✅ 阻塞 | ❌ 单次 | ❌ | 执行单个函数等待结果 |
Pool.apply_async() |
multiprocessing | ❌ 非阻塞 | ❌ 单次 | ✅ callback/error_callback | 大量独立异步任务 |
Pool.map_async() |
multiprocessing | ❌ 非阻塞 | ✅ 是 | ✅ callback | 批量异步,需要回调 |
executor.submit() |
concurrent.futures | ❌ 非阻塞 | ❌ 单次 | ✅ add_done_callback | 现代异步任务,控制灵活 |
executor.map() |
concurrent.futures | ✅ 阻塞(内部) | ✅ 是 | ❌(但可配合 as_completed) | 批量任务,简洁语法 |
五、实战:爬虫 + 数据处理完整示例
import time
import math
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, as_completed
import random
# ------------------------------------------------------------
# 场景:模拟爬取 URL,然后对数据进行 CPU 密集型处理
# ------------------------------------------------------------
def fetch_url(url_id):
"""模拟网络请求(I/O 等待)"""
delay = random.uniform(0.2, 0.8)
time.sleep(delay)
return url_id, delay # 模拟返回数据
def process_data(data):
"""CPU 密集型数据处理——计算质数"""
url_id, delay = data
# 找大量质数模拟计算
primes = []
num = 2
while len(primes) < 2000:
is_prime = True
for i in range(2, int(math.sqrt(num)) + 1):
if num % i == 0:
is_prime = False
break
if is_prime:
primes.append(num)
num += 1
return url_id, len(primes), sum(primes)
def log_callback(result):
"""回调:日志记录"""
url_id, count, total = result
print(f"[完成] URL #{url_id}: 找到 {count} 个质数, 和={total}")
def demo_multiprocessing_pool():
"""使用 multiprocessing.Pool 的版本"""
print("=" * 50)
print("方案一: multiprocessing.Pool")
print("=" * 50)
start = time.time()
with Pool(4) as pool:
# 阶段 1: 模拟爬取
fetch_results = pool.map(fetch_url, range(10))
# 阶段 2: 数据处理 + 回调
for data in fetch_results:
pool.apply_async(process_data, args=(data,), callback=log_callback)
pool.close()
pool.join()
print(f"总耗时: {time.time() - start:.2f}s\n")
def demo_concurrent_futures():
"""使用 concurrent.futures.ProcessPoolExecutor 的版本"""
print("=" * 50)
print("方案二: ProcessPoolExecutor")
print("=" * 50)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
# 阶段 1: 模拟爬取
fetch_futures = {executor.submit(fetch_url, i): i for i in range(10)}
fetch_results = []
for future in as_completed(fetch_futures):
fetch_results.append(future.result())
# 阶段 2: 数据处理
for data in fetch_results:
future = executor.submit(process_data, data)
future.add_done_callback(
lambda f: log_callback(f.result())
)
print(f"总耗时: {time.time() - start:.2f}s\n")
if __name__ == "__main__":
demo_multiprocessing_pool()
demo_concurrent_futures()
六、最佳实践与注意事项
6.1 何时选择哪个?
| 场景 | 推荐方案 |
|---|---|
需要 error_callback |
Pool.apply_async |
| 希望与现代 async/await 风格一致 | ProcessPoolExecutor |
| 需要在进程池和线程池之间轻松切换 | concurrent.futures |
需要用 chunksize 优化大量小任务 |
Pool.map / ProcessPoolExecutor.map |
| 需要按完成顺序处理结果 | as_completed + ProcessPoolExecutor |
6.2 注意事项
- 任务函数必须可 pickle 序列化,即不能是 lambda、类内方法等
- 避免在 worker 进程内访问全局状态——各进程内存独立
- 使用
if __name__ == "__main__"保护主模块,尤其在 Windows 上 - 合理设置
chunksize:任务数远大于进程数时,使用 chunksize 减少进程间通信次数 - 大数据量传递开销大:传递给工作进程的数据会通过 pickle 序列化/反序列化
- 子进程中的 print 可能乱序,建议使用日志模块
# chunksize 调节示例
import math
from multiprocessing import Pool
def is_prime(n):
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
# 100 万个候选数,4 个进程
with Pool(4) as pool:
# chunksize 自动计算为 task_count / workers / 4
result = pool.map(is_prime, range(2, 1000002), chunksize=62500)
print(f"质数个数: {sum(result)}")
七、总结
- GIL 导致 Python 线程无法并行执行 CPU 密集任务,进程池是真正的并行方案
multiprocessing.Pool经典可靠,提供 map / apply / apply_async 等丰富方法,回调灵活concurrent.futures.ProcessPoolExecutor现代统一 API,与线程池可互换,Future 模式更易管理apply_async+ callback 是构建异步工作流的核心模式- 选择哪种方案取决于场景偏好,但核心思路一致:将 CPU 密集型任务分发给多个独立进程,绕开 GIL,榨干多核 CPU 的性能
进程池是 Python 高并发编程的基石之一,掌握了它,你就拥有了有效利用多核 CPU 的能力。