当前位置: 首页 > news >正文

Python 爬虫分布式实战:Redis + 多进程爬虫实现分布式数据采集与任务分片

前言

单机爬虫受 CPU、网络带宽、IP 资源限制,在海量站点全量抓取场景中存在采集速度慢、任务无法拆分、故障难以续爬等短板,分布式爬虫通过任务拆解、多机器协同调度突破单机性能瓶颈。Redis 凭借高性能内存读写、数据结构丰富的特性成为分布式爬虫主流中间件,依托 List、Set、SortedSet 数据结构实现任务队列分发、去重过滤、采集状态标记,结合 Python 多进程、多线程实现单机资源最大化利用,搭配任务分片策略将海量目标 URL 拆分至多台爬虫节点并行消费。本文从分布式爬虫架构设计、Redis 各类数据结构落地场景、任务生产消费模型、URL 布隆去重、多进程任务隔离、宕机续爬机制、分布式锁防重复抓取全维度落地实战,配套完整可运行工程化代码与底层原理剖析,覆盖中小型分布式爬虫从开发到部署全流程。

本文所需依赖官方文档超链接:

  1. Redis-Py 官方开发文档
  2. Requests 官方文档
  3. BeautifulSoup4 官方文档
  4. SQLAlchemy 官方文档

一、分布式爬虫整体架构与 Redis 核心选型原理

1.1 三层分布式爬虫架构组成

分布式爬虫划分为任务生产端、分布式任务中间件、爬虫消费节点三层结构,三层解耦可独立部署扩容:

  1. 任务生产端:负责解析站点首页、分类页,批量生成待采集商品 / 资讯 URL,统一推送至 Redis 任务队列,仅做任务产出不参与数据抓取,可独立部署在轻量服务器;
  2. Redis 中间件层:作为全集群共享存储中心,拆分四种数据存储区域:待抓取任务队列、已抓取 URL 去重集合、失败重试任务池、分布式锁存储键,所有爬虫节点共享同一份 Redis 数据,实现跨机器任务互通;
  3. 爬虫消费节点:多台物理 / 云服务器作为消费端,从 Redis 拉取待采集 URL 执行页面请求、数据解析、入库操作,单节点内部通过多进程拆分子任务,最大化利用单机硬件资源。

1.2 Redis 数据结构在爬虫中的分工对照表

表格

Redis 数据结构存储内容爬虫业务用途
List (链表)待采集原始 URL左侧入队生产任务,右侧阻塞弹出消费,实现 FIFO 先进先出任务队列
Set (无序集合)已完成抓取 URL全集群 URL 去重,入队前校验集合,规避重复入队造成重复采集
SortedSet (有序集合)抓取失败 URL依据失败次数设置分数,分数递增实现延迟重试,避免故障 URL 频繁占用资源
String (字符串)分布式锁标识单 URL 全局锁,防止多节点同一时刻抓取同一个链接造成重复入库

1.3 分布式爬虫核心运行逻辑

生产者依据站点分页规则批量构造目标 URL,校验 URL 未存入去重 Set 后写入 List 任务队列;所有爬虫消费节点启动后持续阻塞从 List 尾部取出任务,获取 URL 后抢占分布式锁,加锁成功执行页面采集,抓取成功则将 URL 写入已抓取 Set,抓取失败则存入 SortedSet 并累加失败分数;定时巡检 SortedSet,将达到重试阈值的失败链接重新放回主任务队列实现二次抓取,节点宕机未完成的任务仍留存 Redis,新节点启动后可继续消费剩余任务,天然支持断点续爬。

二、环境依赖安装与 Redis 基础连接配置

2.1 依赖批量安装指令

bash

运行

# Redis连接驱动 pip install redis==5.0.8 # 爬虫基础采集库 pip install requests==2.31.0 beautifulsoup4==4.12.3 lxml==5.3.0 # 数据入库ORM库 pip install sqlalchemy==2.0.35 pymysql==1.1.0 # 多进程进程池依赖(Python内置无需额外安装)

2.2 Redis 客户端全局连接初始化

python

运行

import redis # Redis连接配置,分布式集群所有节点共用同一Redis实例 REDIS_CONFIG = { "host": "127.0.0.1", "port": 6379, "db": 0, "password": "", "decode_responses": True # 自动解码bytes为字符串,省去编码转换 } # 初始化全局Redis连接对象 redis_client = redis.Redis(**REDIS_CONFIG) # Redis爬虫业务Key常量定义,统一管理避免硬编码 TASK_QUEUE_KEY = "dist:crawl:task_queue" DONE_URL_SET_KEY = "dist:crawl:done_url_set" FAIL_URL_ZSET_KEY = "dist:crawl:fail_url_zset" DISTRIBUTE_LOCK_PREFIX = "dist:crawl:lock:"
代码原理

decode_responses 参数开启后,Redis 返回数据自动转为 str 类型,适配 URL 字符串存储场景;业务 Key 统一常量定义便于后期批量修改 Redis 存储标识,区分不同爬虫项目缓存键避免数据混淆。

三、数据库 ORM 模型复用与入库逻辑

沿用前文商品数据表结构,实现爬虫数据统一落地 MySQL,全集群所有节点共用同一数据库实例:

python

运行

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime from sqlalchemy.orm import declarative_base, sessionmaker from datetime import datetime DB_CONF = {"user": "root", "password": "123456", "host": "127.0.0.1", "port": 3306, "db_name": "crawl_data", "charset": "utf8mb4"} DB_URL = f"mysql+pymysql://{DB_CONF['user']}:{DB_CONF['password']}@{DB_CONF['host']}:{DB_CONF['port']}/{DB_CONF['db_name']}?charset={DB_CONF['charset']}" engine = create_engine(DB_URL, pool_size=8, max_overflow=16, pool_recycle=3600, echo=False) Base = declarative_base() SessionDb = sessionmaker(bind=engine, autoflush=False, autocommit=False) class GoodsInfo(Base): __tablename__ = "dist_crawl_goods" id = Column(Integer, primary_key=True, autoincrement=True) goods_url = Column(String(512), index=True, comment="商品源链接") goods_name = Column(String(255), comment="商品名称") price = Column(Float, comment="售价") source_node = Column(String(64), comment="采集节点标识,记录哪台机器抓取") crawl_time = Column(DateTime, default=datetime.now) Base.metadata.create_all(bind=engine) def batch_save_db(data_list: list): """分布式节点统一批量入库函数""" sess = SessionDb() obj_list = [GoodsInfo(**item) for item in data_list] try: sess.add_all(obj_list) sess.commit() return len(obj_list) except Exception as e: sess.rollback() print(f"入库异常:{str(e)}") return 0 finally: sess.close()

四、模块一:分布式任务生产者实现(URL 生产 + 入队 + 前置去重)

生产者独立脚本运行,批量生成分页 URL,完成去重校验后写入 Redis 任务队列,可单独部署在一台服务器持续产出任务:

python

运行

from urllib.parse import urljoin import requests HEADERS = {"User-Agent": "Mozilla/5.0 Chrome/123.0.0.0 Safari/537.36"} BASE_DOMAIN = "https://demo-dist.com" def produce_crawl_task(start_page: int, end_page: int): """批量生产分页任务URL,入队Redis任务队列""" print(f"开始生产{start_page}~{end_page}页任务") for page in range(start_page, end_page + 1): page_url = urljoin(BASE_DOMAIN, f"/goods/list?page={page}") # 前置去重:已抓取URL不再重复入队 if redis_client.sismember(DONE_URL_SET_KEY, page_url): continue # 左侧LPUSH写入任务队列 redis_client.lpush(TASK_QUEUE_KEY, page_url) print(f"任务入队:{page_url}") print("本轮任务生产完毕") # 生产者启动入口 if __name__ == "__main__": produce_crawl_task(1, 200)
底层原理

sismember校验 URL 是否存在已抓取集合,存在直接跳过实现生产者层面去重;lpush从 List 链表左侧写入数据,消费端rpop从右侧取出,天然 FIFO 队列;生产者与消费者完全隔离,可随时新增生产者脚本加速任务产出。

五、模块二:分布式消费者实现(阻塞拉取 + 分布式锁 + 失败重试)

消费端分为两层:主进程循环阻塞从 Redis 拉取任务,内部开启多进程池并发抓取页面,搭配 Redis 分布式锁防止多节点抢占同一 URL 重复采集,抓取失败自动写入失败任务有序集合。

python

运行

from multiprocessing import Pool import random import time # 当前节点标识,入库时标记来源机器 NODE_TAG = "crawl_node_01" PROCESS_NUM = 4 # 单节点开启4个采集子进程 def get_dist_lock(url: str, expire: int = 30): """Redis分布式锁:SET key value EX expire NX,加锁成功返回True""" lock_key = DISTRIBUTE_LOCK_PREFIX + url # NX不存在才创建,EX设置锁过期时间防止死锁 lock_res = redis_client.set(lock_key, NODE_TAG, ex=expire, nx=True) return lock_res is not None def release_dist_lock(url: str): """释放分布式锁""" lock_key = DISTRIBUTE_LOCK_PREFIX + url redis_client.delete(lock_key) def single_goods_crawl(target_url: str): """单个URL采集逻辑:页面请求、数据解析、入库""" save_data = [] try: resp = requests.get(target_url, headers=HEADERS, timeout=10) resp.raise_for_status() # 模拟页面解析逻辑 for idx in range(random.randint(15,30)): item = { "goods_url": f"{target_url}#{idx}", "goods_name": f"分布式采集商品{random.randint(1000,9999)}", "price": round(random.uniform(9.9,2999),2), "source_node": NODE_TAG } save_data.append(item) # 抓取成功,URL存入已完成集合 redis_client.sadd(DONE_URL_SET_KEY, target_url) # 批量入库 succ_count = batch_save_db(save_data) print(f"[{NODE_TAG}]成功抓取:{target_url},入库{succ_count}条") return True except Exception as e: print(f"[{NODE_TAG}]抓取失败{target_url}:{str(e)}") # 抓取失败写入ZSET,默认失败分数1,后续重试分数+1 old_score = redis_client.zscore(FAIL_URL_ZSET_KEY, target_url) or 0 new_score = old_score + 1 redis_client.zadd(FAIL_URL_ZSET_KEY, {target_url: new_score}) return False finally: release_dist_lock(target_url) def pool_crawl_worker(url_list: list): """进程池批量消费一批URL""" with Pool(processes=PROCESS_NUM) as pool: pool.map(single_goods_crawl, url_list) def consumer_run(): """消费端主循环:阻塞拉取任务""" print(f"分布式爬虫节点{NODE_TAG}启动,进程数:{PROCESS_NUM}") while True: # brpop阻塞弹出,无任务时阻塞等待10秒释放资源,阻塞式消费 pop_res = redis_client.brpop(TASK_QUEUE_KEY, timeout=10) if not pop_res: continue _, task_url = pop_res # 抢占分布式锁,加锁失败说明其他节点正在抓取,丢弃本次任务 if not get_dist_lock(task_url): continue # 单条URL入进程池抓取 pool_crawl_worker([task_url]) # 消费节点启动 if __name__ == "__main__": consumer_run()
代码原理拆解
  1. 分布式锁原理set nx ex原子指令实现加锁,NX 保证仅首个请求节点创建锁,EX 设置过期时间规避爬虫异常崩溃导致死锁;
  2. brpop 阻塞消费:Redis 阻塞弹出指令,队列无任务时进程休眠,相比轮询无限空查节省服务器 CPU;
  3. SortedSet 失败任务:失败链接分数随报错次数递增,分数越高代表故障次数越多,便于定时过滤高失败链接人工排查。

六、模块三:失败任务定时重试调度器

独立定时脚本,定时从 SortedSet 筛选失败分数小于 5 的 URL(失败≤4 次),重新放回主任务队列,超过 5 次判定为失效链接永久沉淀不再重试:

python

运行

from apscheduler.schedulers.background import BackgroundScheduler def retry_fail_task(): """定时取回失败任务,重新入队主队列""" # zrange筛选分数0~4的失败链接 fail_url_list = redis_client.zrangebyscore(FAIL_URL_ZSET_KEY, min=0, max=4) if not fail_url_list: print("暂无需要重试的失败任务") return for url in fail_url_list: # 再次校验是否已被成功抓取 if redis_client.sismember(DONE_URL_SET_KEY, url): redis_client.zrem(FAIL_URL_ZSET_KEY, url) continue redis_client.lpush(TASK_QUEUE_KEY, url) print(f"本轮重试入队{len(fail_url_list)}个失败URL") def start_retry_scheduler(): scheduler = BackgroundScheduler() # 每30分钟执行一次失败任务重试 scheduler.add_job(retry_fail_task, "interval", minutes=30, id="fail_retry_job", replace_existing=True) scheduler.start() print("失败任务重试调度器启动成功") try: while True: time.sleep(3600) except KeyboardInterrupt: scheduler.shutdown() if __name__ == "__main__": start_retry_scheduler()

七、分布式扩容与任务分片落地方案

7.1 多节点横向扩容部署

  1. 多台服务器安装 Python、Redis 依赖,统一指向同一远端 Redis 服务(修改 REDIS_CONFIG 的 host 为公网 RedisIP);
  2. 每台机器单独启动消费端脚本,修改 NODE_TAG 区分不同节点标识;
  3. 生产者可部署多实例,调高分页生产范围,Redis 自动均衡分发任务至空闲节点。

7.2 海量任务分片生产优化

超十万 URL 场景下拆分生产者分片,多生产者分工生产不同页码区间:

  • 节点 A 生产者:produce_crawl_task (1,5000)
  • 节点 B 生产者:produce_crawl_task (5001,10000) 实现任务分片生产,避免单生产者性能瓶颈。

八、分布式爬虫故障排查与优化配置表

表格

故障现象诱因优化方案
多节点重复抓取同一个 URL未加分布式锁,网络延迟并发入队启用 Redis SET NX 分布式锁,设置合理锁过期时长
失败任务无限堆积 ZSET失效链接持续重试累加分数设置最大重试阈值,超阈值 ZREM 移除链接落地异常表
Redis 内存占用持续飙升大量已抓取 URL 长期留存 Set定时清理历史已抓取 URL,按月归档至 MySQL
消费节点空闲但队列堆积任务单进程消费效率不足调高单节点 PROCESS_NUM 进程数或新增消费节点

九、全链路综合启动部署规范

  1. 启动顺序:启动 Redis 服务→启动失败重试调度器→启动多台消费节点脚本→启动生产者脚本;
  2. 部署方式:Linux 环境使用 nohup 分别后台常驻生产者、消费者、重试调度三类脚本;
  3. 数据监控:通过 redis-cli 查看 LLEN 任务队列长度、SCARD 已抓取集合数量实时监控采集进度。
http://www.rkmt.cn/news/1458442.html

相关文章:

  • 从‘nvidia-smi’到跑通第一个CUDA核函数:给Python开发者的CentOS服务器GPU编程初体验
  • 自制Digispark开发板:从ATtiny85芯片到USB可编程硬件的完整实践
  • 别再只盯着GPS了!手把手教你用Arduino解析北斗/GPS模块的NMEA 0183数据(附完整代码)
  • 3步搞定Mac鼠标指针个性化:Mousecape完整使用指南
  • 告别玄学:给你的STM32 Bootloader跳转函数加个‘安全检查清单’(含代码详解)
  • 智能客服响应延迟骤降92%,企业AI工具整合避坑清单,仅剩最后87份内部文档模板
  • C++编写的BMP条形码定位与数字解码工具集(含预处理、频域增强与形态学操作)
  • Fan Control实战:3个技巧解决Windows风扇控制难题
  • 避坑指南:在RH850上发送超过16位SPI数据包,EDL位和CS信号时序你配对了吗?
  • Arxiv上传前必读:从专利风险到源码政策,这些“隐形坑”可能毁了你的工作
  • OV摄像头SCCB协议实战:用Arduino UNO配置OV7670图像传感器(附完整代码)
  • 深入PSINS工具箱:从`glvf`的全局变量设计,看严恭敏老师的编程哲学与工程考量
  • 2026年6月成都全屋定制品牌推荐:十大排名专业评测价格注意事项 - 品牌推荐
  • STM32期末救命指南(一):嵌入式系统概述与开发流程
  • WinCC自动化备份不求人:用VBS脚本让OnlineTableControl定时导出CSV(附完整代码)
  • 【限时开放】2024智能客服AI集成成熟度评估模型(含12维度打分表+行业基准值)
  • 告别CH340!用STM32F103C8T6的USB虚拟串口,实现免驱动调试(附完整工程)
  • Android微信客户端UI组件与本地交互逻辑完整实现(Java+Eclipse兼容)
  • 零基础可跑的Python网页数据抓取练习包:含完整项目结构、环境配置指南与实战笔记
  • Mac/Win双平台保姆级教程:手把手带你搞定DevEco Studio 2.0.12.201安装与首次启动
  • 别再只懂AM了!用Python+Matplotlib手把手模拟FM调频信号(附完整代码)
  • 2025-2026年成都全屋定制品牌推荐:TOP5评测专业价格适用场景注意事项 - 品牌推荐
  • 拒绝生成虚假AI技术博文的底线与原则
  • 基于NodeMCU与IFTTT的Google Assistant语音控制智能开关实现
  • 计算机顶尖奖学金申请指南:从研究提案到职业规划
  • 别再只玩瘦AP了!用Cisco Fat AP在家搭建小型无线实验室(附Packet Tracer配置)
  • 保姆级教程:用JD-GUI和JAD反编译JimuReport 1.7.0源码并成功运行(附常见错误修复)
  • Transformers Pipeline:NLP 任务的全面指南
  • FX3U软元件实战笔记:如何用M8020标志位和高速计数器C235优化设备控制程序
  • WebSocket、HTTPS 与浏览器访问网页全过程