当前位置: 首页 > news >正文

多进程运行代码模板

import os
import json
import random
import glob
import multiprocessing as mp
from collections import defaultdict
from tqdm import tqdm# 配置变量:修改这些值来适应不同任务
MAX_BARS = 3  # 可见进度条的最大数量
RESUME_COUNTS_DIR = "/path/to/resume"  # 恢复计数器的目录路径def safe_print(msg: str):tqdm.write(msg)  # 与tqdm兼容的打印def process_item(item, proc_idx, state):"""在这里实现每个任务的工作逻辑。`item` 是任务项。`state` 可以保存每个进程的缓存、计数器等。"""# ... 执行实际工作 ...state["count"] += 1return f"done-{item}"def worker(proc_idx, tasks):touched_dirs = set()counters = defaultdict(int)resume_counts_path=os.path.join(RESUME_COUNTS_DIR, f"file_select_counts_p{proc_idx}.json")# 如果需要恢复,则加载每个进程的计数器if resume_counts_path and os.path.exists(resume_counts_path):try:with open(resume_counts_path, "r", encoding="utf-8") as f:counters.update(json.load(f))except Exception as e:safe_print(f"进程 {proc_idx} 加载计数器失败: {e}")show_bar = proc_idx < MAX_BARSiterator = tqdm(tasks,desc=f"进程 {proc_idx}",unit="task",position=proc_idx if show_bar else 0, # 仅前 N 个占用独立行disable=not show_bar, # 其他进程隐藏进度条leave=True, # 进度条在完成后会保留在终端中,而不是被清除掉ncols=100, # 设置进度条的总宽度为 100 个字符,确保在终端中显示得更整齐)for task_id, item in iterator:process_item(item, proc_idx, counters)if resume_counts_path:with open(resume_counts_path, "w", encoding="utf-8") as f:json.dump(dict(counters), f, indent=2, ensure_ascii=False)return list(touched_dirs)def distribute_tasks(items, num_procs):"""示例:随机抽取与轮询分配相结合的任务分配。根据需要替换为其他分片方式(轮询、块分割、工作窃取等)。"""total_cnt = ...buckets = [[] for _ in range(num_procs)]for i in range(total_cnt):item = random.choices(items, k=1)[0]proc = i % num_procsbuckets[proc].append((i, item))return bucketsdef merge_outputs(dir_list):"""后处理合并每个进程的分片;根据输出布局调整。"""passdef main(root_dir, num_procs):# 发现工作项items = [...]  # 填充你的项列表if not items:safe_print("未找到项。")returnnum_procs = max(1, num_procs)tasks_per_proc = distribute_tasks(items, num_procs)with mp.Pool(processes=num_procs) as pool:results = pool.starmap(worker,[(idx,tasks,)for idx, tasks in enumerate(tasks_per_proc)],)all_dirs = [d for sub in results for d in sub]merge_outputs(all_dirs)if __name__ == "__main__":import sysif len(sys.argv) != 3:safe_print("Usage: python script.py <directory> <num_processes>")sys.exit(1)root_dir = sys.argv[1]num_processes = int(sys.argv[2])main(root_dir, num_processes)

注意事项

  • 多进程代码比较重要的事情:
    • 有恢复功能:上面的代码实现了resume_counts_path
    • 每个进程之间产生的文件不冲突:上面的代码让每个进程并行运行,产生的文件都附带了进程号(比如file_select_counts_p{proc_idx}.json),最后可以通过一个函数合并
  • with mp.Pool(processes=num_procs) as pool:同时启动了num_procsworker函数
    • 每个worker函数处理一个(idx, tasks)元组
    • 应该保证tasks_per_proc的长度与num_procs相同
    • results是一个列表,长度为num_procs,每个元素是每个worker的返回结果
http://www.rkmt.cn/news/77585.html

相关文章:

  • 深入解析:长沙理工《人工智能基础A》实验(上机)报告实验三 电商数据可视化/图像处理
  • AI 编程目录索引
  • 2025具有规模的双卡压式碳钢消防管TOP5权威推荐,甄选靠
  • 2025年环保别墅瓷砖五大品牌排行榜:别墅瓷砖推荐供应商有哪
  • 纸杯成型机制造商哪家靠谱?2025 超声波纸杯机、纸咖啡杯机推荐(小白易懂)
  • 【原神UGC】即时弹窗-安装使用教程
  • 2025年全屋定制哪家值得推荐?全屋定制哪家比较好?官方权威榜单解析TOP10品牌
  • 2025 爆款制杯机厂家推荐:纸咖啡杯机、纸杯机制造商实力及设备特色解析
  • 2025年选择 GEO 优化服务商的技巧:官方TOP10解析
  • 2025年GEO 优化服务商哪家专业度高?:独家深度测评指南
  • 中国木门十大品牌排行榜:行业标杆与品质之选
  • 珠三角聚合物锂电有哪些?五家实力企业及业务解析
  • 2025年实力强的304/316不锈钢水管品牌厂家排名:环压
  • 2025年重庆结构加固施工单位推荐:结构加固改造公司哪家可靠
  • 网络与 Web 基础全解析:从通信原理到实际应用
  • 2025年性价比高的 GEO 优化服务商有哪些?:独家报告深
  • 江苏三年制专转本机构推荐与学历提升路径解析
  • 2025年精选悬臂货架定制服务商权威评测,重型托盘式货架/滚轮式流利货架/仓储货架/流利货架定制/流利条货架悬臂货架企业推荐榜单
  • 2025年大型 GEO 优化公司推荐:权威榜单精选指南
  • 2025年靠谱的手动喷丸设备厂家/供应商怎么选?江苏常州专业
  • 2025年选择 GEO 优化公司注意事项:官方攻略精选解析
  • 【论术】 复盘-回顶按钮以及功能
  • 2025年湖南快手开户渠道权威推荐:湖南信息流开户/湖南朋友圈开户预定/湖南360开户承办商综合实力榜单
  • 2025年行业领先的 GEO 优化公司:权威榜单深度测评
  • 2025年高纯度乳酸菌原料源头工厂TOP5推荐:专业乳酸菌代
  • 2025年十大防盗门品牌客服服务排行榜,星月神防盗门厂家推荐
  • 第九届艺术、教育与管理国际学术会议(ICAEM 2026)
  • doc-llm-autotest 基于大模型的文档自动化测试平台::用户提交文件进行文档测试
  • 2025年GEO 优化公司成功案例有哪些?:最新TOP10深
  • OEM Parking Brake Caliper Actuator for Mercedes-Benz S-Class (2014-2020) - Direct Fit Reliability