当前位置: 首页 > news >正文

Spark RDD基础编程详解(一):创建与转换操作

一、RDD简介

1.1 什么是RDD

RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark中最基本的数据抽象。通俗来讲,RDD是一种用于表示分布式计算中数据集合的不可变数据结构。

RDD的核心特征:

特征说明
弹性(Resilient)具备容错能力,节点故障时可自动恢复数据和计算
分布式(Distributed)数据分散存储在集群多个节点上,支持并行处理
数据集(Dataset)由一系列记录组成,可看作不可变的、可分区的数据集合
惰性计算转换操作仅记录轨迹,遇到行动操作才触发真正计算
可缓存可缓存在内存中,在多次计算中重用

1.2 RDD的五大特性

  1. 一组分区(A list of partitions):RDD由多个分区组成,每个分区是数据集的一个子集
  2. 计算函数(A function for computing each split):每个分区都有对应的计算函数
  3. 依赖关系(A list of dependencies on other RDDs):RDD之间通过依赖关系形成血统(Lineage)
  4. 分区器(Optionally, a Partitioner for key-value RDDs):对于KV型RDD,可指定分区方式
  5. 优先位置(Optionally, a list of preferred locations):数据本地性,优先在数据所在节点计算

二、RDD的创建

Spark主要通过SparkContexttextFile()parallelize()方法创建RDD。

2.1 环境准备

importorg.apache.spark.{SparkConf,SparkContext}// 创建SparkConf对象,配置应用名称和运行模式valconf=newSparkConf().setAppName("RDD-Demo").setMaster("local")// local表示本地单线程执行,local[*]表示本地多线程// 创建SparkContext对象,这是Spark程序的入口valsc=newSparkContext(conf)

注意:Spark 2.0+推荐使用SparkSession作为统一入口,但在RDD编程中仍需要SparkContext

2.2 从文件系统加载数据创建RDD

2.2.1 从本地文件系统加载
objectCreateRddByFileScala{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("CreateRddByFileScala").setMaster("local")valsc=newSparkContext(conf)// Windows系统路径valpath="D:\test\data"// Linux/Mac系统路径// val path = "file:///usr/local/test/data/"// textFile(path, minPartitions) 第二个参数指定最小分区数valrdd=sc.textFile(path,2)// 统计文件内数据的总长度vallength=rdd.map(_.length).reduce(_+_)println(s"文件总字符数:$length")sc.stop()}}

textFile()方法详解:

参数类型说明
pathString文件路径,支持本地文件、HDFS、S3等
minPartitionsInt最小分区数(可选),默认根据文件大小和块大小计算
2.2.2 从HDFS加载数据
// 只需修改路径为HDFS路径valpath="hdfs://hadoop101:9000/test/"valrdd=sc.textFile(path,2)
2.2.3 从多个文件加载
// 加载目录下所有文件valrdd=sc.textFile("hdfs://hadoop101:9000/logs/")// 加载匹配通配符的文件valrdd=sc.textFile("hdfs://hadoop101:9000/logs/2024-*.log")// 加载整个目录(包括子目录)valrdd=sc.wholeTextFiles("hdfs://hadoop101:9000/logs/")

2.3 通过并行集合(数组)创建RDD

调用SparkContext.parallelize()方法,将已存在的集合转换为RDD。

objectCreateRddByArrayScala{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("CreateRddByArrayScala").setMaster("local")valsc=newSparkContext(conf)// 创建Scala集合valarr=Array(1,2,3,4,5)// 基于集合创建RDD,可指定分区数valrdd=sc.parallelize(arr,2)// 查看RDD的分区数println(s"分区数:${rdd.getNumPartitions}")// 查看各分区数据rdd.glom().collect().foreach(partition=>println(partition.mkString(",")))sc.stop()}}

parallelize()方法详解:

参数类型说明
seqSeq[T]要并行化的集合
numSlicesInt分区数(可选),默认根据集群配置计算

其他创建方式:

// 从List创建vallistRdd=sc.parallelize(List(1,2,3,4,5))// 从Range创建valrangeRdd=sc.parallelize(1to100)// 从Seq创建valseqRdd=sc.parallelize(Seq("a","b","c"))// makeRDD是parallelize的别名,更语义化valrdd=sc.makeRDD(Array(1,2,3,4,5),2)

2.4 创建方式对比

创建方式适用场景特点
textFile()读取文件数据支持本地、HDFS、S3等多种文件系统
parallelize()测试数据、小数据集将内存集合转换为分布式RDD
wholeTextFiles()读取小文件返回(K,V)对,K是文件名,V是文件内容
sequenceFile()读取SequenceFileHadoop生态的二进制键值对文件

三、RDD操作概述

RDD的操作分为两大类:

操作类型特点返回值示例
转换操作(Transformation)惰性执行,记录转换轨迹新的RDDmapfilterflatMap
行动操作(Action)触发真正计算值或结果countcollectreduce

惰性机制图解:

RDD1 → map() → RDD2 → filter() → RDD3 → reduce() → 结果 ↓ ↓ ↓ 记录轨迹 记录轨迹 记录轨迹 (不计算) (不计算) (触发计算)

四、核心转换操作详解

4.1 filter() — 过滤操作

功能:对RDD中的每个元素应用过滤函数,保留返回true的元素。

签名def filter(f: T => Boolean): RDD[T]

示例:筛选包含"Spark"的行

objectFilterDemo{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("filter-test").setMaster("local")valsc=newSparkContext(conf)// 加载数据valrdd:RDD[String]=sc.textFile("data/word.txt")// filter参数是一个函数,返回Boolean类型// true - 保留该元素,false - 过滤掉vallineWithSpark:RDD[String]=rdd.filter(line=>{line.contains("Spark")})// 输出结果lineWithSpark.foreach(println)sc.stop()}}

输入数据(word.txt):

Hadoop is good Spark is better Spark is fast

运行结果:

Spark is better Spark is fast

简写形式:

// 当函数体只有一行时,可省略花括号valresult=rdd.filter(_.contains("Spark"))

4.2 map() — 映射操作

功能:对RDD中的每个元素应用映射函数,返回一个新的RDD。

签名def map[U: ClassTag](f: T => U): RDD[U]

示例1:数值翻倍
valarr=Array(1,2,3,4,5)valrdd1=sc.parallelize(arr)// 每个元素乘以2valrdd2=rdd1.map(num=>num*2)rdd2.foreach(println)// 输出: 2, 4, 6, 8, 10
示例2:字符串拆分
valrdd1=sc.textFile("data/word.txt")// 将每行按空格拆分为数组valrdd2=rdd1.map(line=>line.split(" "))rdd2.foreach(arr=>println(arr.mkString(",")))

数据转换过程:

输入RDD: "Hadoop is good" "Spark is better" "Spark is fast" map(line => line.split(" "))后: Array("Hadoop", "is", "good") Array("Spark", "is", "better") Array("Spark", "is", "fast")

注意map()操作后,RDD的元素类型从String变成了Array[String],这是一个二维结构。

4.3 flatMap() — 扁平化映射

功能:先对RDD中的每个元素执行map()操作,再对结果执行flatten()扁平化操作,将多维结构展平为一维。

签名def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

示例:单词拆分扁平化

valrdd1=sc.textFile("data/word.txt")// flatMap = map + flattenvalrdd2=rdd1.flatMap(line=>line.split(" "))rdd2.foreach(println)

数据转换过程:

输入RDD: "Hadoop is good" "Spark is better" "Spark is fast" Step 1 - map(line => line.split(" ")): Array("Hadoop", "is", "good") Array("Spark", "is", "better") Array("Spark", "is", "fast") Step 2 - flatten: "Hadoop" "is" "good" "Spark" "is" "better" "Spark" "is" "fast"

map() vs flatMap() 对比:

操作输入输出维度变化
mapRDD[String]RDD[Array[String]]1维 → 2维
flatMapRDD[String]RDD[String]1维 → 1维(展平)

4.4 groupByKey() — 按键分组

功能:将RDD中相同Key的元素分组,返回(Key, Iterable[Value])形式的RDD。

签名def groupByKey(): RDD[(K, Iterable[V])]

示例:单词分组

valrdd1=sc.textFile("data/word.txt")valrdd2=rdd1.flatMap(_.split(" "))// 将单词映射为 (word, 1) 的键值对valrdd3=rdd2.map(word=>(word,1))// RDD(("Hadoop",1), ("is",1), ("good",1), ("Spark",1), ("is",1), ...)// 按Key分组,相同Key的Value放入Iterablevalrdd4=rdd3.groupByKey()// RDD(("Hadoop", Iterable(1)), ("is", Iterable(1,1,1)), ...)// 统计每个单词出现的次数valrdd5=rdd4.map(t=>(t._1,t._2.size))// RDD(("Hadoop",1), ("is",3), ("good",1), ("Spark",2), ...)rdd5.foreach(println)

运行结果:

(Spark,2) (is,3) (fast,1) (good,1) (better,1) (Hadoop,1)

groupByKey()的数据流转:

rdd3: RDD[(String, Int)] ("Hadoop", 1) ("is", 1) ("good", 1) ("Spark", 1) ("is", 1) ("better", 1) ("Spark", 1) ("is", 1) ("fast", 1) ↓ groupByKey() rdd4: RDD[(String, Iterable[Int])] ("Hadoop", Iterable(1)) ("is", Iterable(1, 1, 1)) ("good", Iterable(1)) ("Spark", Iterable(1, 1)) ("better", Iterable(1)) ("fast", Iterable(1))

4.5 reduceByKey() — 按键聚合

功能:将RDD中相同Key的Value进行聚合计算,返回(Key, AggregatedValue)形式的RDD。

签名def reduceByKey(func: (V, V) => V): RDD[(K, V)]

示例:WordCount优化版

valrdd1=sc.textFile("data/word.txt")valrdd2=rdd1.flatMap(_.split(" "))valrdd3=rdd2.map(word=>(word,1))// reduceByKey在Map端先进行预聚合,再Shuffle,效率高于groupByKeyvalrdd4=rdd3.reduceByKey(_+_)// 等价于: rdd3.reduceByKey((v1, v2) => v1 + v2)rdd4.foreach(println)

运行结果:

(Spark,2) (is,3) (fast,1) (good,1) (better,1) (Hadoop,1)

reduceByKey() vs groupByKey() 对比:

特性reduceByKeygroupByKey
返回类型RDD[(K, V)]RDD[(K, Iterable[V])]
Map端预聚合✅ 有❌ 无
Shuffle数据量少(已预聚合)多(全量传输)
内存占用高(需存储Iterable)
适用场景聚合计算(求和、计数等)需要保留所有原始值

reduceByKey工作原理:

Map端预聚合: 分区1: ("is",1), ("is",1) → ("is",2) 分区2: ("is",1), ("Spark",1), ("Spark",1) → ("is",1), ("Spark",2) Shuffle后Reduce端: ("is",2) + ("is",1) → ("is",3) ("Spark",2) → ("Spark",2)

五、完整实战:WordCount

综合以上转换操作,实现经典的WordCount程序:

importorg.apache.spark.{SparkConf,SparkContext}objectWordCount{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("WordCount").setMaster("local")valsc=newSparkContext(conf)// 1. 读取文件创建RDDvallines=sc.textFile("data/word.txt")// 2. flatMap拆分单词valwords=lines.flatMap(_.split(" "))// 3. map转换为 (word, 1) 键值对valwordPairs=words.map((_,1))// 4. reduceByKey聚合统计valwordCounts=wordPairs.reduceByKey(_+_)// 5. 按词频降序排序(行动操作触发计算)valsorted=wordCounts.sortBy(_._2,ascending=false)// 6. collect收集结果到Driver并打印sorted.collect().foreach(println)sc.stop()}}

输出结果:

(is,3) (Spark,2) (Hadoop,1) (good,1) (better,1) (fast,1)

六、转换操作速查表

转换操作功能输入 → 输出是否触发Shuffle
map(f)一对一映射RDD[T]RDD[U]
filter(f)条件过滤RDD[T]RDD[T]
flatMap(f)映射后扁平化RDD[T]RDD[U]
mapPartitions(f)按分区映射RDD[T]RDD[U]
sample(fraction)随机采样RDD[T]RDD[T]
union(other)合并两个RDDRDD[T]+RDD[T]RDD[T]
distinct()去重RDD[T]RDD[T]
groupByKey()按键分组RDD[(K,V)]RDD[(K, Iterable[V])]
reduceByKey(f)按键聚合RDD[(K,V)]RDD[(K,V)]
sortByKey()按键排序RDD[(K,V)]RDD[(K,V)]
join(other)内连接RDD[(K,V)]+RDD[(K,W)]RDD[(K,(V,W))]
coalesce(n)减少分区RDD[T]RDD[T]❌(窄依赖)
repartition(n)重新分区RDD[T]RDD[T]

七、总结

本文系统讲解了RDD的创建方式和核心转换操作:

  1. RDD创建:通过textFile()从文件系统加载,或通过parallelize()从集合创建
  2. 惰性机制:转换操作记录轨迹不计算,行动操作触发真正执行
  3. 核心算子
    • filter():条件过滤,保留满足条件的元素
    • map():一对一映射,转换元素类型或值
    • flatMap():映射+扁平化,将多维结构展平
    • groupByKey():按键分组,返回(Key, Iterable[Value])
    • reduceByKey():按键聚合,Map端预聚合减少Shuffle
  4. 最佳实践:聚合场景优先使用reduceByKey替代groupByKey
http://www.rkmt.cn/news/1423201.html

相关文章:

  • 别再只盯着Arduino了!用IPM模块驱动三相电机,手把手教你从硬件选型到PCB布局(附士兰微/英飞凌型号对比)
  • 精准攻克污水治理难题 科净环保多元化设备赋能多行业绿色发展 - 资讯纵览
  • 告别繁琐密码!9大渠道服崩坏3一键扫码登录神器详解
  • 2026年高压清洗机厂家推荐榜:工业级/380V/220V/移动式/管道疏通/推车式品牌深度解析 - 品牌企业推荐师(官方)
  • AI与自动化如何重塑智慧物流:从数据感知到自动化执行的全链路解析
  • 深度学习在MRI重建中的挑战与优化实践
  • 安阳高考志愿填报推荐:安阳高途志愿川儿老师如何服务河南考生和家长 - 行业深度观察
  • Windows系统快速安装苹果USB网络共享驱动:告别iTunes臃肿安装
  • 2026年AI论文平台推荐
  • 从AMS1117到MP1584:手把手教你用立创EDA搞定两种稳压电源的PCB布局布线(避坑指南)
  • 2026 年 AI 写论文工具排行 TOP6:一键生成 + 真实文献 + 降 AIGC,全网最硬核横向横评
  • 2026年,东营老牌装修团队,实力保障,让您家装无忧! - 资讯快报
  • Windows挂载WebDAV总失败?除了改注册表,试试RaiDrive一键映射(避坑指南)
  • 2026年面试复盘神器:用智在记录精准还原与AI智能总结
  • 2026AI漫剧制作平台口碑排行头部玩家盘点 - 资讯纵览
  • 义乌汽车音响哪家靠谱?2026年亲测义乌繁声汽车音响旗舰店 - 资讯纵览
  • 众智商学院的考试通过率参考 - 众智商学院官方
  • 别再只会用公式了!手把手教你用MATLAB实现一阶数字低通滤波器(附完整代码)
  • Hermes 智能体完整安装教程:环境配置 + 依赖解决 + 验证测试
  • 终极指南:如何用Ice快速打造清爽高效的Mac菜单栏
  • 2026年华药优牧肥满星厂家揭秘:养殖户为何争相引进? - 资讯快报
  • 2026东莞二手房翻新改造靠谱企业盘点 本土专业品牌引领品质焕新 - 资讯纵览
  • 一文看懂: 行空板 M10 + 扩展板 DFR1216
  • 大语言模型在全球健康领域的基准测试与选型指南
  • 应用自动化实践:从CI/CD到GitOps的完整技术栈解析
  • 保姆级教程:用EasyExcel 3.0.2和Hutool搞定带复杂表格和图片的周报自动生成
  • 5.29 构建之法阅读笔记05 - GENGAR
  • 2026局域网即时通讯横评:3款私有化部署IM对比 - 小天互连即时通讯
  • 基于合成数据与混合检索的生物医学语义搜索系统构建实践
  • 保姆级教程:用熵简FinBERT-Base模型快速搞定金融文本分类(附代码)