Spark RDD基础编程详解(一):创建与转换操作
一、RDD简介
1.1 什么是RDD
RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark中最基本的数据抽象。通俗来讲,RDD是一种用于表示分布式计算中数据集合的不可变数据结构。
RDD的核心特征:
| 特征 | 说明 |
|---|---|
| 弹性(Resilient) | 具备容错能力,节点故障时可自动恢复数据和计算 |
| 分布式(Distributed) | 数据分散存储在集群多个节点上,支持并行处理 |
| 数据集(Dataset) | 由一系列记录组成,可看作不可变的、可分区的数据集合 |
| 惰性计算 | 转换操作仅记录轨迹,遇到行动操作才触发真正计算 |
| 可缓存 | 可缓存在内存中,在多次计算中重用 |
1.2 RDD的五大特性
- 一组分区(A list of partitions):RDD由多个分区组成,每个分区是数据集的一个子集
- 计算函数(A function for computing each split):每个分区都有对应的计算函数
- 依赖关系(A list of dependencies on other RDDs):RDD之间通过依赖关系形成血统(Lineage)
- 分区器(Optionally, a Partitioner for key-value RDDs):对于KV型RDD,可指定分区方式
- 优先位置(Optionally, a list of preferred locations):数据本地性,优先在数据所在节点计算
二、RDD的创建
Spark主要通过SparkContext的textFile()和parallelize()方法创建RDD。
2.1 环境准备
importorg.apache.spark.{SparkConf,SparkContext}// 创建SparkConf对象,配置应用名称和运行模式valconf=newSparkConf().setAppName("RDD-Demo").setMaster("local")// local表示本地单线程执行,local[*]表示本地多线程// 创建SparkContext对象,这是Spark程序的入口valsc=newSparkContext(conf)注意:Spark 2.0+推荐使用
SparkSession作为统一入口,但在RDD编程中仍需要SparkContext。
2.2 从文件系统加载数据创建RDD
2.2.1 从本地文件系统加载
objectCreateRddByFileScala{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("CreateRddByFileScala").setMaster("local")valsc=newSparkContext(conf)// Windows系统路径valpath="D:\test\data"// Linux/Mac系统路径// val path = "file:///usr/local/test/data/"// textFile(path, minPartitions) 第二个参数指定最小分区数valrdd=sc.textFile(path,2)// 统计文件内数据的总长度vallength=rdd.map(_.length).reduce(_+_)println(s"文件总字符数:$length")sc.stop()}}textFile()方法详解:
| 参数 | 类型 | 说明 |
|---|---|---|
path | String | 文件路径,支持本地文件、HDFS、S3等 |
minPartitions | Int | 最小分区数(可选),默认根据文件大小和块大小计算 |
2.2.2 从HDFS加载数据
// 只需修改路径为HDFS路径valpath="hdfs://hadoop101:9000/test/"valrdd=sc.textFile(path,2)2.2.3 从多个文件加载
// 加载目录下所有文件valrdd=sc.textFile("hdfs://hadoop101:9000/logs/")// 加载匹配通配符的文件valrdd=sc.textFile("hdfs://hadoop101:9000/logs/2024-*.log")// 加载整个目录(包括子目录)valrdd=sc.wholeTextFiles("hdfs://hadoop101:9000/logs/")2.3 通过并行集合(数组)创建RDD
调用SparkContext.parallelize()方法,将已存在的集合转换为RDD。
objectCreateRddByArrayScala{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("CreateRddByArrayScala").setMaster("local")valsc=newSparkContext(conf)// 创建Scala集合valarr=Array(1,2,3,4,5)// 基于集合创建RDD,可指定分区数valrdd=sc.parallelize(arr,2)// 查看RDD的分区数println(s"分区数:${rdd.getNumPartitions}")// 查看各分区数据rdd.glom().collect().foreach(partition=>println(partition.mkString(",")))sc.stop()}}parallelize()方法详解:
| 参数 | 类型 | 说明 |
|---|---|---|
seq | Seq[T] | 要并行化的集合 |
numSlices | Int | 分区数(可选),默认根据集群配置计算 |
其他创建方式:
// 从List创建vallistRdd=sc.parallelize(List(1,2,3,4,5))// 从Range创建valrangeRdd=sc.parallelize(1to100)// 从Seq创建valseqRdd=sc.parallelize(Seq("a","b","c"))// makeRDD是parallelize的别名,更语义化valrdd=sc.makeRDD(Array(1,2,3,4,5),2)2.4 创建方式对比
| 创建方式 | 适用场景 | 特点 |
|---|---|---|
textFile() | 读取文件数据 | 支持本地、HDFS、S3等多种文件系统 |
parallelize() | 测试数据、小数据集 | 将内存集合转换为分布式RDD |
wholeTextFiles() | 读取小文件 | 返回(K,V)对,K是文件名,V是文件内容 |
sequenceFile() | 读取SequenceFile | Hadoop生态的二进制键值对文件 |
三、RDD操作概述
RDD的操作分为两大类:
| 操作类型 | 特点 | 返回值 | 示例 |
|---|---|---|---|
| 转换操作(Transformation) | 惰性执行,记录转换轨迹 | 新的RDD | map、filter、flatMap |
| 行动操作(Action) | 触发真正计算 | 值或结果 | count、collect、reduce |
惰性机制图解:
RDD1 → map() → RDD2 → filter() → RDD3 → reduce() → 结果 ↓ ↓ ↓ 记录轨迹 记录轨迹 记录轨迹 (不计算) (不计算) (触发计算)四、核心转换操作详解
4.1 filter() — 过滤操作
功能:对RDD中的每个元素应用过滤函数,保留返回true的元素。
签名:def filter(f: T => Boolean): RDD[T]
示例:筛选包含"Spark"的行
objectFilterDemo{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("filter-test").setMaster("local")valsc=newSparkContext(conf)// 加载数据valrdd:RDD[String]=sc.textFile("data/word.txt")// filter参数是一个函数,返回Boolean类型// true - 保留该元素,false - 过滤掉vallineWithSpark:RDD[String]=rdd.filter(line=>{line.contains("Spark")})// 输出结果lineWithSpark.foreach(println)sc.stop()}}输入数据(word.txt):
Hadoop is good Spark is better Spark is fast运行结果:
Spark is better Spark is fast简写形式:
// 当函数体只有一行时,可省略花括号valresult=rdd.filter(_.contains("Spark"))4.2 map() — 映射操作
功能:对RDD中的每个元素应用映射函数,返回一个新的RDD。
签名:def map[U: ClassTag](f: T => U): RDD[U]
示例1:数值翻倍
valarr=Array(1,2,3,4,5)valrdd1=sc.parallelize(arr)// 每个元素乘以2valrdd2=rdd1.map(num=>num*2)rdd2.foreach(println)// 输出: 2, 4, 6, 8, 10示例2:字符串拆分
valrdd1=sc.textFile("data/word.txt")// 将每行按空格拆分为数组valrdd2=rdd1.map(line=>line.split(" "))rdd2.foreach(arr=>println(arr.mkString(",")))数据转换过程:
输入RDD: "Hadoop is good" "Spark is better" "Spark is fast" map(line => line.split(" "))后: Array("Hadoop", "is", "good") Array("Spark", "is", "better") Array("Spark", "is", "fast")注意:
map()操作后,RDD的元素类型从String变成了Array[String],这是一个二维结构。
4.3 flatMap() — 扁平化映射
功能:先对RDD中的每个元素执行map()操作,再对结果执行flatten()扁平化操作,将多维结构展平为一维。
签名:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
示例:单词拆分扁平化
valrdd1=sc.textFile("data/word.txt")// flatMap = map + flattenvalrdd2=rdd1.flatMap(line=>line.split(" "))rdd2.foreach(println)数据转换过程:
输入RDD: "Hadoop is good" "Spark is better" "Spark is fast" Step 1 - map(line => line.split(" ")): Array("Hadoop", "is", "good") Array("Spark", "is", "better") Array("Spark", "is", "fast") Step 2 - flatten: "Hadoop" "is" "good" "Spark" "is" "better" "Spark" "is" "fast"map() vs flatMap() 对比:
| 操作 | 输入 | 输出 | 维度变化 |
|---|---|---|---|
map | RDD[String] | RDD[Array[String]] | 1维 → 2维 |
flatMap | RDD[String] | RDD[String] | 1维 → 1维(展平) |
4.4 groupByKey() — 按键分组
功能:将RDD中相同Key的元素分组,返回(Key, Iterable[Value])形式的RDD。
签名:def groupByKey(): RDD[(K, Iterable[V])]
示例:单词分组
valrdd1=sc.textFile("data/word.txt")valrdd2=rdd1.flatMap(_.split(" "))// 将单词映射为 (word, 1) 的键值对valrdd3=rdd2.map(word=>(word,1))// RDD(("Hadoop",1), ("is",1), ("good",1), ("Spark",1), ("is",1), ...)// 按Key分组,相同Key的Value放入Iterablevalrdd4=rdd3.groupByKey()// RDD(("Hadoop", Iterable(1)), ("is", Iterable(1,1,1)), ...)// 统计每个单词出现的次数valrdd5=rdd4.map(t=>(t._1,t._2.size))// RDD(("Hadoop",1), ("is",3), ("good",1), ("Spark",2), ...)rdd5.foreach(println)运行结果:
(Spark,2) (is,3) (fast,1) (good,1) (better,1) (Hadoop,1)groupByKey()的数据流转:
rdd3: RDD[(String, Int)] ("Hadoop", 1) ("is", 1) ("good", 1) ("Spark", 1) ("is", 1) ("better", 1) ("Spark", 1) ("is", 1) ("fast", 1) ↓ groupByKey() rdd4: RDD[(String, Iterable[Int])] ("Hadoop", Iterable(1)) ("is", Iterable(1, 1, 1)) ("good", Iterable(1)) ("Spark", Iterable(1, 1)) ("better", Iterable(1)) ("fast", Iterable(1))4.5 reduceByKey() — 按键聚合
功能:将RDD中相同Key的Value进行聚合计算,返回(Key, AggregatedValue)形式的RDD。
签名:def reduceByKey(func: (V, V) => V): RDD[(K, V)]
示例:WordCount优化版
valrdd1=sc.textFile("data/word.txt")valrdd2=rdd1.flatMap(_.split(" "))valrdd3=rdd2.map(word=>(word,1))// reduceByKey在Map端先进行预聚合,再Shuffle,效率高于groupByKeyvalrdd4=rdd3.reduceByKey(_+_)// 等价于: rdd3.reduceByKey((v1, v2) => v1 + v2)rdd4.foreach(println)运行结果:
(Spark,2) (is,3) (fast,1) (good,1) (better,1) (Hadoop,1)reduceByKey() vs groupByKey() 对比:
| 特性 | reduceByKey | groupByKey |
|---|---|---|
| 返回类型 | RDD[(K, V)] | RDD[(K, Iterable[V])] |
| Map端预聚合 | ✅ 有 | ❌ 无 |
| Shuffle数据量 | 少(已预聚合) | 多(全量传输) |
| 内存占用 | 低 | 高(需存储Iterable) |
| 适用场景 | 聚合计算(求和、计数等) | 需要保留所有原始值 |
reduceByKey工作原理:
Map端预聚合: 分区1: ("is",1), ("is",1) → ("is",2) 分区2: ("is",1), ("Spark",1), ("Spark",1) → ("is",1), ("Spark",2) Shuffle后Reduce端: ("is",2) + ("is",1) → ("is",3) ("Spark",2) → ("Spark",2)五、完整实战:WordCount
综合以上转换操作,实现经典的WordCount程序:
importorg.apache.spark.{SparkConf,SparkContext}objectWordCount{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("WordCount").setMaster("local")valsc=newSparkContext(conf)// 1. 读取文件创建RDDvallines=sc.textFile("data/word.txt")// 2. flatMap拆分单词valwords=lines.flatMap(_.split(" "))// 3. map转换为 (word, 1) 键值对valwordPairs=words.map((_,1))// 4. reduceByKey聚合统计valwordCounts=wordPairs.reduceByKey(_+_)// 5. 按词频降序排序(行动操作触发计算)valsorted=wordCounts.sortBy(_._2,ascending=false)// 6. collect收集结果到Driver并打印sorted.collect().foreach(println)sc.stop()}}输出结果:
(is,3) (Spark,2) (Hadoop,1) (good,1) (better,1) (fast,1)六、转换操作速查表
| 转换操作 | 功能 | 输入 → 输出 | 是否触发Shuffle |
|---|---|---|---|
map(f) | 一对一映射 | RDD[T]→RDD[U] | ❌ |
filter(f) | 条件过滤 | RDD[T]→RDD[T] | ❌ |
flatMap(f) | 映射后扁平化 | RDD[T]→RDD[U] | ❌ |
mapPartitions(f) | 按分区映射 | RDD[T]→RDD[U] | ❌ |
sample(fraction) | 随机采样 | RDD[T]→RDD[T] | ❌ |
union(other) | 合并两个RDD | RDD[T]+RDD[T]→RDD[T] | ❌ |
distinct() | 去重 | RDD[T]→RDD[T] | ✅ |
groupByKey() | 按键分组 | RDD[(K,V)]→RDD[(K, Iterable[V])] | ✅ |
reduceByKey(f) | 按键聚合 | RDD[(K,V)]→RDD[(K,V)] | ✅ |
sortByKey() | 按键排序 | RDD[(K,V)]→RDD[(K,V)] | ✅ |
join(other) | 内连接 | RDD[(K,V)]+RDD[(K,W)]→RDD[(K,(V,W))] | ✅ |
coalesce(n) | 减少分区 | RDD[T]→RDD[T] | ❌(窄依赖) |
repartition(n) | 重新分区 | RDD[T]→RDD[T] | ✅ |
七、总结
本文系统讲解了RDD的创建方式和核心转换操作:
- RDD创建:通过
textFile()从文件系统加载,或通过parallelize()从集合创建 - 惰性机制:转换操作记录轨迹不计算,行动操作触发真正执行
- 核心算子:
filter():条件过滤,保留满足条件的元素map():一对一映射,转换元素类型或值flatMap():映射+扁平化,将多维结构展平groupByKey():按键分组,返回(Key, Iterable[Value])reduceByKey():按键聚合,Map端预聚合减少Shuffle
- 最佳实践:聚合场景优先使用
reduceByKey替代groupByKey
