菜单

Administrator
发布于 2026-05-18 / 2 阅读
0
0

Python 进程池

Python 进程池完全指南:从原理到实战

一、为什么需要进程池?

在 Python 中,当我们处理 CPU 密集型任务时,进程池(Process Pool) 是绕不开的核心工具。要理解它的价值,得先从 Python 的 GIL(全局解释器锁)说起。

1.1 GIL 与线程池的局限

Python 的 GIL 确保同一时刻只有一个线程在执行 Python 字节码。这意味着:

  • 线程适合 I/O 密集型任务(文件读写、网络请求等等待场景)
  • 线程对 CPU 密集型任务几乎无效,多个线程抢一把锁,实际是"伪并行"
# 演示 GIL 对 CPU 密集任务的限制
import time
from threading import Thread

def cpu_hog(n):
    """纯 CPU 计算"""
    while n > 0:
        n -= 1

# 单线程耗时
start = time.time()
cpu_hog(50_000_000)
print(f"单线程: {time.time() - start:.2f}s")

# 多线程(受 GIL 限制,几乎无加速)
start = time.time()
t1 = Thread(target=cpu_hog, args=(25_000_000,))
t2 = Thread(target=cpu_hog, args=(25_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"双线程: {time.time() - start:.2f}s")

输出大致为:

单线程: 2.15s
双线程: 2.20s  ← 几乎没有加速

1.2 进程池的优势

进程池使用 多进程 而非多线程,每个进程拥有独立的 Python 解释器和内存空间,彻底绕开 GIL

对比维度 线程池 (ThreadPool) 进程池 (ProcessPool)
适用场景 I/O 密集型 CPU 密集型
内存共享 共享(需加锁) 独立(需序列化)
GIL 限制 受限制 无限制
创建开销 较小 较大
通信成本 低(直接读内存) 高(pickle 序列化)

二、multiprocessing.Pool 经典方案

multiprocessing 是 Python 标准库提供的多进程模块,其中的 Pool 类提供了最经典的进程池实现。

2.1 基本使用

from multiprocessing import Pool
import os

def square(x):
    return x * x

# 创建进程池(默认使用 CPU 核心数)
with Pool() as pool:
    results = pool.map(square, range(10))
    print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.2 核心方法详解

pool.map() —— 批量同步映射

类似内置 map(),将可迭代对象中的每个元素分配给进程池处理,阻塞等待全部结果返回

import time
from multiprocessing import Pool

def slow_square(x):
    time.sleep(0.5)
    return x ** 2

start = time.time()
with Pool(4) as pool:
    # 并行处理 8 个任务(4 个 worker 各处理 2 个)
    results = pool.map(slow_square, range(8))
print(f"耗时: {time.time() - start:.2f}s")  # ~1.0s(顺序需 4s)
print(results)

map 的变体:starmap
如果函数接收多个参数,使用 starmap

def add(a, b):
    return a + b
with Pool() as pool:
    print(pool.starmap(add, [(1,2), (3,4)]))  # [3, 7]

pool.apply() —— 同步执行单个任务

apply 在某个 worker 进程上执行一次函数调用,阻塞等待结果

from multiprocessing import Pool

def factorial(n):
    result = 1
    for i in range(2, n + 1):
        result *= i
    return result

with Pool(2) as pool:
    # 同步执行单个任务
    result = pool.apply(factorial, args=(10,))
    print(result)  # 3628800

pool.apply_async() —— 异步执行的核心

apply_async异步非阻塞版本,立即返回一个 AsyncResult 对象,主进程可继续做其他事,稍后通过 .get() 获取结果。

from multiprocessing import Pool
import time

def slow_fib(n):
    if n <= 1:
        return n
    a, b = 0, 1
    for _ in range(n - 1):
        a, b = b, a + b
    return b

with Pool(4) as pool:
    # 异步提交任务
    results = [pool.apply_async(slow_fib, args=(i,)) for i in range(10)]
    
    # 主进程可以在这里做其他事情
    print("任务已提交,主进程做其他事...")
    time.sleep(0.5)
    
    # 获取结果
    fibs = [r.get() for r in results]
    print(fibs)  # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

pool.map_async() —— 异步版本的 map

from multiprocessing import Pool

with Pool(4) as pool:
    # 异步 map,返回 MapResult 对象
    result_obj = pool.map_async(slow_square, range(16))
    
    # 检查是否完成
    if not result_obj.ready():
        print("计算中...")
    
    # 等待并获取结果
    squares = result_obj.get()
    print(squares)

2.3 Callback 回调函数

apply_asyncmap_async 支持回调,任务完成后自动执行回调函数,不需要手动 .get()

from multiprocessing import Pool
import time

def task(x):
    time.sleep(0.5)
    return x * 10

def on_success(result):
    print(f"任务完成,结果: {result}")

def on_error(exc):
    print(f"任务失败,异常: {exc}")

with Pool(3) as pool:
    for i in range(6):
        pool.apply_async(
            task, 
            args=(i,),
            callback=on_success,       # 成功时回调
            error_callback=on_error    # 失败时回调
        )
    pool.close()
    pool.join()

回调的实际应用场景

import requests
from multiprocessing import Pool

def fetch_url(url):
    """下载网页内容"""
    resp = requests.get(url, timeout=10)
    return (url, resp.status_code, len(resp.text))

def save_result(result):
    """下载完成后写入日志(回调函数)"""
    url, status, size = result
    with open("download_log.txt", "a") as f:
        f.write(f"[{status}] {url} ({size} bytes)\n")
    print(f"✓ {url} 已记录")

urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/3",
]

with Pool(3) as pool:
    for url in urls:
        pool.apply_async(fetch_url, args=(url,), callback=save_result)
    pool.close()
    pool.join()
print("所有任务完成")

2.4 进程池生命周期管理

from multiprocessing import Pool

pool = Pool(4)

# 提交若干任务
for _ in range(100):
    pool.apply_async(do_work)

# 关闭:不再接受新任务
pool.close()

# 等待所有 worker 结束
pool.join()

# 强制终止所有 worker(不等待)
pool.terminate()

三、concurrent.futures.ProcessPoolExecutor 现代方案

Python 3.2+ 引入的 concurrent.futures 模块提供了更现代、更统一的接口。ProcessPoolExecutorThreadPoolExecutor 使用相同的 API,切换成本极低。

3.1 基本使用

from concurrent.futures import ProcessPoolExecutor
import os

def square(x):
    return x * x

with ProcessPoolExecutor() as executor:
    futures = [executor.submit(square, i) for i in range(10)]
    results = [f.result() for f in futures]
    print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

3.2 核心方法

executor.submit() —— 提交单个任务

返回 Future 对象,通过 .result() 获取结果(可设置超时)。

from concurrent.futures import ProcessPoolExecutor, as_completed
import time

def heavy_task(n):
    time.sleep(n)
    return f"任务 {n} 完成"

with ProcessPoolExecutor(max_workers=4) as executor:
    # 提交任务
    future_map = {executor.submit(heavy_task, i): i for i in [3, 1, 2]}
    
    # as_completed 按完成顺序遍历
    for future in as_completed(future_map):
        task_id = future_map[future]
        try:
            result = future.result()
            print(f"任务 {task_id}: {result}")
        except Exception as e:
            print(f"任务 {task_id} 出错: {e}")

executor.map() —— 批量提交

类似 Pool.map(),但更简洁。

from concurrent.futures import ProcessPoolExecutor

def cube(x):
    return x ** 3

with ProcessPoolExecutor(4) as executor:
    # map 返回迭代器,按输入顺序产出结果
    results = executor.map(cube, range(20))
    print(list(results))
    
    # 支持多参数——使用 starmap 风格
    def add(a, b, c):
        return a + b + c
    # 多个可迭代对象对应多个参数
    results = executor.map(add, [1,2,3], [4,5,6], [7,8,9])
    print(list(results))  # [12, 15, 18]

executor.map() 支持 timeoutchunksize 参数:

results = executor.map(
    heavy_func, 
    range(1000), 
    chunksize=50,   # 将任务切块分发,减少通信开销
    timeout=30      # 每个任务超时 30 秒
)

3.3 Callback 回调

concurrent.futures 通过 Future.add_done_callback() 实现回调:

from concurrent.futures import ProcessPoolExecutor
import time

def work(n):
    time.sleep(0.5)
    return n * 2

def on_done(future):
    try:
        result = future.result()
        print(f"回调: 结果 = {result}")
    except Exception as e:
        print(f"回调: 出错 = {e}")

with ProcessPoolExecutor(4) as executor:
    for i in range(6):
        future = executor.submit(work, i)
        future.add_done_callback(on_done)
    # 与 multiprocessing 不同,这里退出上下文管理器会自动 join

3.4 上下文管理器

ProcessPoolExecutor 支持 with 语句,退出时自动调用 shutdown(wait=True)

# 等价写法
executor = ProcessPoolExecutor(4)
try:
    futures = [executor.submit(task, i) for i in range(10)]
    results = [f.result() for f in futures]
finally:
    executor.shutdown(wait=True)

四、map / apply / apply_async 对比总结

方法 所属模块 是否阻塞 批量 回调支持 适用场景
Pool.map() multiprocessing ✅ 阻塞 ✅ 是 批量同步处理,结果顺序相关
Pool.apply() multiprocessing ✅ 阻塞 ❌ 单次 执行单个函数等待结果
Pool.apply_async() multiprocessing ❌ 非阻塞 ❌ 单次 ✅ callback/error_callback 大量独立异步任务
Pool.map_async() multiprocessing ❌ 非阻塞 ✅ 是 ✅ callback 批量异步,需要回调
executor.submit() concurrent.futures ❌ 非阻塞 ❌ 单次 ✅ add_done_callback 现代异步任务,控制灵活
executor.map() concurrent.futures ✅ 阻塞(内部) ✅ 是 ❌(但可配合 as_completed) 批量任务,简洁语法

五、实战:爬虫 + 数据处理完整示例

import time
import math
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, as_completed
import random

# ------------------------------------------------------------
# 场景:模拟爬取 URL,然后对数据进行 CPU 密集型处理
# ------------------------------------------------------------

def fetch_url(url_id):
    """模拟网络请求(I/O 等待)"""
    delay = random.uniform(0.2, 0.8)
    time.sleep(delay)
    return url_id, delay  # 模拟返回数据

def process_data(data):
    """CPU 密集型数据处理——计算质数"""
    url_id, delay = data
    # 找大量质数模拟计算
    primes = []
    num = 2
    while len(primes) < 2000:
        is_prime = True
        for i in range(2, int(math.sqrt(num)) + 1):
            if num % i == 0:
                is_prime = False
                break
        if is_prime:
            primes.append(num)
        num += 1
    return url_id, len(primes), sum(primes)

def log_callback(result):
    """回调:日志记录"""
    url_id, count, total = result
    print(f"[完成] URL #{url_id}: 找到 {count} 个质数, 和={total}")

def demo_multiprocessing_pool():
    """使用 multiprocessing.Pool 的版本"""
    print("=" * 50)
    print("方案一: multiprocessing.Pool")
    print("=" * 50)
    start = time.time()
    
    with Pool(4) as pool:
        # 阶段 1: 模拟爬取
        fetch_results = pool.map(fetch_url, range(10))
        
        # 阶段 2: 数据处理 + 回调
        for data in fetch_results:
            pool.apply_async(process_data, args=(data,), callback=log_callback)
        
        pool.close()
        pool.join()
    
    print(f"总耗时: {time.time() - start:.2f}s\n")

def demo_concurrent_futures():
    """使用 concurrent.futures.ProcessPoolExecutor 的版本"""
    print("=" * 50)
    print("方案二: ProcessPoolExecutor")
    print("=" * 50)
    start = time.time()
    
    with ProcessPoolExecutor(max_workers=4) as executor:
        # 阶段 1: 模拟爬取
        fetch_futures = {executor.submit(fetch_url, i): i for i in range(10)}
        fetch_results = []
        for future in as_completed(fetch_futures):
            fetch_results.append(future.result())
        
        # 阶段 2: 数据处理
        for data in fetch_results:
            future = executor.submit(process_data, data)
            future.add_done_callback(
                lambda f: log_callback(f.result())
            )
    
    print(f"总耗时: {time.time() - start:.2f}s\n")

if __name__ == "__main__":
    demo_multiprocessing_pool()
    demo_concurrent_futures()

六、最佳实践与注意事项

6.1 何时选择哪个?

场景 推荐方案
需要 error_callback Pool.apply_async
希望与现代 async/await 风格一致 ProcessPoolExecutor
需要在进程池和线程池之间轻松切换 concurrent.futures
需要用 chunksize 优化大量小任务 Pool.map / ProcessPoolExecutor.map
需要按完成顺序处理结果 as_completed + ProcessPoolExecutor

6.2 注意事项

  1. 任务函数必须可 pickle 序列化,即不能是 lambda、类内方法等
  2. 避免在 worker 进程内访问全局状态——各进程内存独立
  3. 使用 if __name__ == "__main__" 保护主模块,尤其在 Windows 上
  4. 合理设置 chunksize:任务数远大于进程数时,使用 chunksize 减少进程间通信次数
  5. 大数据量传递开销大:传递给工作进程的数据会通过 pickle 序列化/反序列化
  6. 子进程中的 print 可能乱序,建议使用日志模块
# chunksize 调节示例
import math
from multiprocessing import Pool

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

# 100 万个候选数,4 个进程
with Pool(4) as pool:
    # chunksize 自动计算为 task_count / workers / 4
    result = pool.map(is_prime, range(2, 1000002), chunksize=62500)
    print(f"质数个数: {sum(result)}")

七、总结

  • GIL 导致 Python 线程无法并行执行 CPU 密集任务,进程池是真正的并行方案
  • multiprocessing.Pool 经典可靠,提供 map / apply / apply_async 等丰富方法,回调灵活
  • concurrent.futures.ProcessPoolExecutor 现代统一 API,与线程池可互换,Future 模式更易管理
  • apply_async + callback 是构建异步工作流的核心模式
  • 选择哪种方案取决于场景偏好,但核心思路一致:将 CPU 密集型任务分发给多个独立进程,绕开 GIL,榨干多核 CPU 的性能

进程池是 Python 高并发编程的基石之一,掌握了它,你就拥有了有效利用多核 CPU 的能力。


评论