尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

Python 并发编程:concurrent.futures

Python 并发编程:concurrent.futures
📅 发布时间:2026/6/18 16:50:56

一、模块简介

concurrent.futures 是 Python 标准库提供的 高级并发接口,用来执行多线程或多进程任务。
特点:

  • 简化线程/进程管理;

  • 提供统一接口 Executor;

  • 支持异步结果 Future 对象;

  • 支持任务异常捕获和超时控制。

核心执行器类:

执行器类 说明
ThreadPoolExecutor 使用线程池并发执行,适合 I/O 密集型任务(如网络请求、文件操作)
ProcessPoolExecutor 使用进程池并发执行,适合 CPU 密集型任务(如数据计算、图像处理)

二、核心组件

1️⃣ Executor(执行器)

Executor 是一个抽象类,不能直接使用。
它有两个主要子类:

类名 说明 适用场景
ThreadPoolExecutor 线程池执行器 I/O 密集型任务(网络请求、文件操作)
ProcessPoolExecutor 进程池执行器 CPU 密集型任务(数据计算、图像处理)

Executor 方法

方法 说明
submit(fn, *args, **kwargs) 提交函数异步执行,返回 Future 对象
map(func, *iterables) 并发执行多个任务,返回结果迭代器
shutdown(wait=True) 关闭池,释放资源,通常用 with 自动调用

2️⃣ Future(未来对象)

Future 对象代表一个异步执行的任务结果。当任务提交后立即返回一个 Future,结果可能还没准备好。我们可以用它来查询、等待或获取任务执行的结果。

常用方法:

方法 说明
.result(timeout=None) 获取结果(阻塞直到完成)
.done() 检查任务是否完成
.running() 检查任务是否正在执行
.cancel() 尝试取消任务
.add_done_callback(fn) 任务完成后自动调用回调函数
exception() 获取任务执行中抛出的异常

3️⃣ 辅助函数

函数 说明
as_completed(fs) 按任务完成顺序返回 Future 对象
wait(fs, timeout=None) 等待所有或部分 Future 完成

三、ThreadPoolExecutor 示例(多线程)

🌟 示例 1:简单线程池执行

from concurrent.futures import ThreadPoolExecutor
import timedef download_file(name):print(f"{name} 开始下载...")time.sleep(2)print(f"{name} 下载完成!")return f"{name} 完成"# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:futures = [executor.submit(download_file, f"文件{i}") for i in range(5)]# 等待任务结果for f in futures:print(f.result())

输出:

文件0 开始下载...
文件1 开始下载...
文件2 开始下载...
文件3 开始下载...
文件4 开始下载...
文件0 下载完成!
文件1 下载完成!
...

🧩 说明:

  • max_workers:同时运行的最大线程数。

  • submit(func, *args):提交任务,立即返回一个 Future。

  • result():阻塞直到任务完成,返回函数的返回值。


🌟 示例 2:使用 map() 简化写法

from concurrent.futures import ThreadPoolExecutor
import timedef square(n):time.sleep(1)return n * nwith ThreadPoolExecutor(max_workers=4) as executor:results = executor.map(square, [1, 2, 3, 4, 5])for r in results:print(r)

🧩 特点:

  • map() 自动调度任务,无需手动管理 Future。

  • 返回一个迭代器,按任务提交顺序返回结果。

🌟 示例 3:线程池并发计算

from concurrent.futures import ThreadPoolExecutor, as_completed
import timedef compute_square(n):time.sleep(1)  # 模拟耗时任务return n * nnums = [1, 2, 3, 4, 5]
results = {}with ThreadPoolExecutor(max_workers=3) as executor:# 提交任务future_to_num = {executor.submit(compute_square, n): n for n in nums}# 按完成顺序获取结果for future in as_completed(future_to_num):num = future_to_num[future]              # 找到当前任务对应的输入值try:results[num] = future.result()      # 获取任务返回值并保存到字典except Exception as e:print(f'Task {num} generated an exception: {e}')print(results)

🔍 关键点解析

1️⃣ future_to_num[future]
  • future_to_num 是一个字典:
{Future1: 1, Future2: 2, Future3: 3, ...}
  • key:提交任务的 Future 对象

  • value:任务对应的标识(这里是输入数字)

  • 用途:按任务完成顺序识别是哪一个任务完成。

2️⃣ results[num] = future.result()
  • future.result():阻塞等待当前任务完成,获取返回值

  • 将结果保存到 results 字典中,用任务输入作为 key。

  • 如果任务执行失败,会抛出异常;通过 try/except 捕获。

3️⃣ as_completed(future_to_num)
  • 按任务 完成顺序 返回 Future 对象,而不是提交顺序

  • 可实现 最快先处理 的策略,提高响应效率


🔁 逻辑流程图

提交任务:
Future1 → compute_square(1)
Future2 → compute_square(2)
Future3 → compute_square(3)as_completed 遍历顺序(可能不同于提交顺序):
Future2 → Future1 → Future3循环:
future = Future2
num = 2
results[2] = Future2.result()  # 4最终 results:
{1: 1, 2: 4, 3: 9}

四、ProcessPoolExecutor 示例(多进程)

用于CPU 密集型任务(例如:图像处理、大数据计算、加密解密)。

from concurrent.futures import ProcessPoolExecutor
import math, timedef intensive_task(n):print(f"计算 {n} 的阶乘")return math.factorial(n)if __name__ == '__main__':with ProcessPoolExecutor(max_workers=4) as executor:results = executor.map(intensive_task, [100000, 120000, 150000, 200000])for r in results:print(len(str(r)))  # 输出结果长度

🧩 注意:

  • 必须放在 if __name__ == '__main__': 保护下(Windows 系统必须,否则进程会无限递归创建)。

  • 每个子进程是独立的 Python 进程,不共享内存。


五、回调函数使用(add_done_callback)

from concurrent.futures import ThreadPoolExecutor
import timedef task(n):time.sleep(1)return n * 2def callback(future):print(f"任务完成,结果:{future.result()}")with ThreadPoolExecutor() as executor:future = executor.submit(task, 10)future.add_done_callback(callback)

🧩 回调函数常用于:

  • 异步通知任务完成

  • 自动写日志、更新UI、触发后续操作等


六、等待任务:as_completed() 与 wait()

1️⃣ as_completed(谁先完成谁先返回)

from concurrent.futures import ThreadPoolExecutor, as_completed
import time, randomdef work(x):time.sleep(random.randint(1, 3))return x * xwith ThreadPoolExecutor(max_workers=3) as executor:futures = [executor.submit(work, i) for i in range(5)]for f in as_completed(futures):print("完成:", f.result())

🧩 输出顺序不固定,取决于任务完成时间。


2️⃣ wait(等待所有或部分完成)

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import timedef task(x):time.sleep(x)return xwith ThreadPoolExecutor(max_workers=3) as executor:futures = [executor.submit(task, i) for i in [1, 2, 3]]done, not_done = wait(futures, return_when=FIRST_COMPLETED)print("已完成任务数:", len(done))

七、实战案例:多接口数据抓取(模拟网络请求)

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import randomdef fetch_api_data(api_name):"""模拟 API 请求"""time.sleep(random.uniform(0.5, 2))if random.random() < 0.2:  # 模拟请求失败raise ValueError("API Error")return f"{api_name} data"api_list = ["users", "posts", "comments"]
results = {}with ThreadPoolExecutor(max_workers=5) as executor:future_to_api = {executor.submit(fetch_api_data, api): api for api in api_list}for future in as_completed(future_to_api):api_name = future_to_api[future]try:results[api_name] = future.result()print(f"{api_name} 请求成功")except Exception as e:print(f"{api_name} 请求失败: {e}")results[api_name] = Noneprint(results)
  • future_to_api = 任务映射表

  • future.result() = 获取返回值或抛出异常

  • 通过字典存储结果,保证每个接口都能被记录,即使失败也不会中断程序


八、注意事项与优化建议

问题 原因 解决方案
任务执行顺序与提交顺序不一致 as_completed 按完成顺序返回 正常现象,非错误
CPU 密集型任务低效 使用线程池,受 GIL 限制 改用 ProcessPoolExecutor
任务异常未捕获 future.result() 抛出异常 使用 try/except 捕获
任务过多导致系统负荷 max_workers 过大 控制线程/进程数,合理设置池大小
结果管理混乱 没有映射 Future → 任务标识 使用字典映射 (future_to_func)

九、总结

  • concurrent.futures 提供了 线程池/进程池 + Future 的统一接口

  • 核心思想:

    1. 提交任务 → executor.submit()

    2. 获取 Future → future

    3. 按完成顺序获取结果 → as_completed()

    4. 异常处理 + 保存结果 → future.result() + try/except

  • 小技巧:

    • future_to_func / future_to_api 映射字典是管理多个异步任务的最佳实践

    • 遇到 I/O 密集型用线程池,CPU 密集型用进程池

    • 异常捕获保证程序健壮性


相关新闻

  • 2025/10/14 模拟赛总结 - sb
  • 记一次因对象构造顺序引发的踩内存问题
  • 恒流电路的震荡问题

最新新闻

  • Claude Opus-4.7 实测:视觉语义理解与分步推理协作新范式
  • ATmega329系列MCU选型、硬件设计与中断编程实战指南
  • 2026华南GEO榜单TOP5横向对比 - 热点速览
  • 如何快速掌握Azure Data Studio:面向新手的跨平台数据库管理完整指南
  • 2026兰州物流仓库快速堆积门生产厂 - 精选优质企业推荐官
  • 2026安徽省淮北市中考500分左右怎么办?冲刺高中补充方案最新发布 - 小张zc

日新闻

  • 2026年不锈钢卷板厂家推荐排行榜:冷轧热轧/304/201不锈钢卷板,高颜值耐腐蚀源头厂家实力精选 - 企业推荐官【官方】
  • FLUX.1-dev FP8模型实战指南:24GB以下显卡高效部署方案
  • 2026佛山长途搬家价目表:跨省跨市搬家费用完整计算指南 - 从来都是英雄出少年

周新闻

  • 3步解锁iOS设备:applera1n激活锁绕过完全指南
  • 39 2026 人工智能证书终极盘点,普通人选 AI 证书可以从这些方向入手
  • Redis 暴露公网有多危险?从端口检查到补救步骤

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号