当前位置: 首页 > news >正文

Spark SQL详解(三):Dataset深度解析与RDD、DataFrame、Dataset互转实战

摘要:Dataset是Spark SQL中最高级的数据抽象,兼具RDD的类型安全和DataFrame的Catalyst优化能力。本文深入讲解Dataset的创建方式(createDataset、toDS、DataFrame转换),系统梳理RDD、DataFrame、Dataset三者之间的相互转换方法,并通过Dataset实现WordCount的实战案例,帮助读者建立完整的Spark SQL数据抽象体系认知。


一、Dataset概述

1.1 为什么需要Dataset

DataFrame的出现让Spark可以更好地处理结构化数据的计算,但存在一个关键问题:编译时的类型安全

DataFrame的类型安全问题:

// DataFrame是无类型的,编译时无法检查字段名和类型valdf=spark.read.json("people.json")df.select("nmae")// 字段名拼写错误,编译通过,运行时才报错!df("age")+"abc"// 类型不匹配,编译通过,运行时才报错!

Dataset的解决方案:

// Dataset是强类型的,编译时即可发现错误caseclassPerson(name:String,age:Long,sex:String)valds=spark.read.json("people.json").as[Person]ds.map(_.nmae)// 编译报错!字段名拼写错误被提前发现ds.map(_.age+"abc")// 编译报错!类型不匹配被提前发现

1.2 Dataset的核心特性

特性RDDDataFrameDataset
类型安全✅ 编译时❌ 运行时✅ 编译时
Catalyst优化
Tungsten优化
API风格函数式SQL + DSL函数式 + SQL
序列化Java/KryoTungsten二进制Tungsten二进制
适用场景非结构化数据结构化数据查询复杂类型数据处理

1.3 Spark 2.0+中的关系

在Spark 2.0中,DataFrame和Dataset被合并为统一的Dataset API:

Dataset[T] // 泛型Dataset,T可以是任意类型 ├── Dataset[Row] // DataFrame的本质,Row是无类型的行记录 └── Dataset[Person] // 强类型的Dataset,Person是case class 结论:DataFrame = Dataset[Row],即DataFrame是Dataset的子集

1.4 三种API的选择策略

需求推荐API原因
需要精确控制执行细节RDD直接操作分区、血统、缓存
编译时类型安全Dataset强类型约束,IDE自动补全
统一简化APIDataFrame/Dataset一套API处理所有结构化数据
非结构化数据处理RDD文本流、不规则数据
SQL风格查询DataFrameDSL和SQL语法直观
复杂对象处理Dataset支持嵌套case class

二、Dataset的创建

2.1 方式一:使用createDataset()方法

通过SparkSession.createDataset()方法,从集合或RDD创建Dataset。

完整代码:

importorg.apache.spark.sql.SparkSessionobjectCreateDatasetMethod1{defmain(args:Array[String]):Unit={valspark=SparkSession.builder().appName("CreateDataset-Method1").master("local[*]").getOrCreate()// 必须导入隐式转换,否则无法创建Datasetimportspark.implicits._// 从Range创建Dataset[Int]valds1=spark.createDataset(1to5)println("=== Dataset[Int] ===")ds1.show()// 从RDD[String]创建Dataset[String]valds2=spark.createDataset(spark.sparkContext.textFile("data/sql/people.txt"))println("=== Dataset[String] ===")ds2.show()spark.stop()}}

运行结果:

=== Dataset[Int] === +-----+ |value| +-----+ | 1| | 2| | 3| | 4| | 5| +-----+ === Dataset[String] === +--------+ | value| +--------+ | Tom, 21| |Mike, 25| |Andy, 18| +--------+

关键点:

  • import spark.implicits._必须导入,提供基本类型的Encoder
  • createDataset支持Scala集合(List、Array、Range等)和RDD
  • 基本类型(Int、String、Long等)的Encoder由Spark自动提供

2.2 方式二:通过toDS()方法

将Scala集合或RDD[CaseClass]通过toDS()隐式转换为Dataset。

完整代码:

importorg.apache.spark.sql.{Dataset,SparkSession}objectCreateDatasetMethod2{// case class必须定义在main方法之外caseclassPerson(name:String,age:Int)defmain(args:Array[String]):Unit={valspark=SparkSession.builder().appName("CreateDataset-Method2").master("local[*]").getOrCreate()// 导入SparkSession对象下的implicitsimportspark.implicits._// 从List[Person]创建Dataset[Person]valdata=List(Person("Tom",21),Person("Andy",22))valds:Dataset[Person]=data.toDS()ds.show()spark.stop()}}

运行结果:

+----+---+ |name|age| +----+---+ | Tom| 21| |Andy| 22| +----+---+

toDS() vs toDF() 对比:

方法输入输出类型
toDS()List[Person]Dataset[Person]强类型
toDF()List[Person]DataFrame(即Dataset[Row]无类型

2.3 方式三:通过DataFrame转换

将DataFrame通过as[CaseClass]方法转换为强类型的Dataset。

完整代码:

importorg.apache.spark.sql.SparkSessionobjectCreateDatasetMethod3{// 注意:JSON中数值默认推断为Long类型,case class字段类型需匹配caseclassPerson(name:String,age:Long,sex:String)defmain(args:Array[String]):Unit={valspark=SparkSession.builder().appName("CreateDataset-Method3").master("local[*]").getOrCreate()importspark.implicits._// 从JSON文件读取DataFramevaldf=spark.read.json("data/sql/people.json")// DataFrame转Dataset[Person]valds=df.as[Person]ds.show()spark.stop()}}

运行结果:

+---+----------+---+ |age| name|sex| +---+----------+---+ | 30| Michael| 男| | 19| Andy| 女| | 19| Justin| 男| | 20|Bernadette| 女| | 23| Gretchen| 女| | 27| David| 男| | 33| Joseph| 女| | 27| Trish| 女| | 33| Alex| 女| | 25| Ben| 男| +---+----------+---+

重要注意事项:

注意点说明错误示例
字段名匹配case class字段名必须与DataFrame列名一致case class Person(nmae: String)→ 报错
字段类型匹配case class字段类型必须与DataFrame推断类型一致JSON中age推断为Long,用Int会报错
字段顺序case class字段顺序不影响匹配(按名匹配)顺序不同不影响
可空字段数据库字段可为null时,case class用Optionage: Option[Long]

JSON字段类型推断规则:

JSON值Spark推断类型case class建议类型
"Tom"StringTypeString
30LongTypeLong
30.5DoubleTypeDouble
trueBooleanTypeBoolean

建议:处理JSON数据时,case class的数值字段优先使用LongDouble,避免类型不匹配。


三、RDD、DataFrame、Dataset相互转换

3.1 RDD <=> DataFrame

RDD转DataFrame

方法1:反射推断模式(推荐)

importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{Row,SparkSession}objectRDDToDataFrame{caseclassPerson(name:String,age:Int)defmain(args:Array[String]):Unit={valspark=SparkSession.builder().appName("RDD-To-DataFrame").master("local[*]").getOrCreate()importspark.implicits._// RDD[Person] -> DataFramevalrdd:RDD[Person]=spark.sparkContext.textFile("data/sql/people.txt").map(_.split(",")).map(attr=>Person(attr(0).trim,attr(1).trim.toInt))valdf=rdd.toDF()df.show()spark.stop()}}

方法2:编程式定义模式

importorg.apache.spark.sql.types._importorg.apache.spark.sql.Row// 定义Schemavalschema=StructType(Array(StructField("name",StringType,true),StructField("age",IntegerType,true)))// 创建Row RDDvalrowRDD=spark.sparkContext.textFile("data/sql/people.txt").map(_.split(",")).map(attr=>Row(attr(0).trim,attr(1).trim.toInt))// 合并为DataFramevaldf=spark.createDataFrame(rowRDD,schema)

方法3:元组RDD直接转DataFrame

// RDD[(String, Int)] 可直接toDF,因为元组类型已知valtupleRDD=spark.sparkContext.textFile("data/sql/people.txt").map(_.split(",")).map(attr=>(attr(0).trim,attr(1).trim.toInt))valdf=tupleRDD.toDF("name","age")
DataFrame转RDD
// DataFrame -> RDD[Row]valrdd:RDD[Row]=df.rdd// 注意:转换后为RDD[Row],不再是原始的RDD[Person]rdd.foreach(println)// 输出: [Andy,18] [Tom,21] [Mike,25]

类型变化图解:

RDD[Person] DataFrame (Dataset[Row]) Person("Tom",21) Row("Tom",21) Person("Mike",25) <-> Row("Mike",25) Person("Andy",18) Row("Andy",18) 转换特点: RDD -> DataFrame: 类型信息丢失,变为无类型的Row DataFrame -> RDD: 只能得到RDD[Row],无法恢复原始类型

3.2 RDD <=> Dataset

RDD转Dataset
importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{Dataset,SparkSession}objectRDDToDataset{caseclassPerson(name:String,age:Int)defmain(args:Array[String]):Unit={valspark=SparkSession.builder().appName("RDD-To-Dataset").master("local[*]").getOrCreate()importspark.implicits._// 创建RDD[Person]valrdd:RDD[Person]=spark.sparkContext.textFile("data/sql/people.txt").map(_.split(",")).map(attr=>Person(attr(0).trim,attr(1).trim.toInt))// RDD[Person] -> Dataset[Person]valds:Dataset[Person]=rdd.toDS()ds.show()// Dataset[Person] -> RDD[Person]valresRDD:RDD[Person]=ds.rdd resRDD.foreach(println)spark.stop()}}

运行结果:

+----+---+ |name|age| +----+---+ | Tom| 21| |Mike| 25| |Andy| 18| +----+---+ Person(Andy,18) Person(Tom,21) Person(Mike,25)

关键发现:

转换方向类型变化特点
RDD[Person] -> Dataset[Person]类型不变强类型保留,安全
Dataset[Person] -> RDD[Person]类型不变强类型保留,安全

对比RDD<->DataFrame:RDD和Dataset互转过程中,数据类型不会丢失;而DataFrame转RDD时,case class会被转为Row对象。


3.3 DataFrame <=> Dataset

DataFrame转Dataset
importorg.apache.spark.sql.{DataFrame,Dataset,SparkSession}objectDataFrameToDataset{// JSON中数值默认推断为LongcaseclassPerson(name:String,age:Long,sex:String)defmain(args:Array[String]):Unit={valspark=SparkSession.builder().appName("DataFrame-To-Dataset").master("local[*]").getOrCreate()importspark.implicits._// 创建DataFramevaldf:DataFrame=spark.read.json("data/sql/people.json")df.show()// DataFrame -> Dataset[Person]valds:Dataset[Person]=df.as[Person]ds.show()// Dataset[Person] -> DataFramevalresDF:DataFrame=ds.toDF()resDF.show()spark.stop()}}

运行结果:

// DataFrame +---+----------+---+ |age| name|sex| +---+----------+---+ | 30| Michael| 男| | 19| Andy| 女| ... // Dataset[Person](显示效果相同,但底层是强类型) +---+----------+---+ |age| name|sex| +---+----------+---+ | 30| Michael| 男| | 19| Andy| 女| ... // 转回DataFrame +---+----------+---+ |age| name|sex| +---+----------+---+ | 30| Michael| 男| | 19| Andy| 女| ...

3.4 三种抽象互转总览图

┌─────────────────┐ │ RDD[Person] │ │ (分布式集合) │ └────────┬────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │ toDF() │ │ toDS() │ │ 保持不变 │ └────┬─────┘ └────┬─────┘ └──────────────┘ │ │ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ DataFrame │ │ Dataset[Person]│ │ (Dataset[Row]) │◄─┤ (强类型) │ │ 无类型 │ │ │ └────────┬────────┘ └────────┬────────┘ │ │ │ ┌─────────────────┘ │ │ ▼ ▼ ┌─────────────────┐ │ .rdd │ │ │ ▼ ▼ ┌──────────────┐ ┌──────────────┐ │ RDD[Row] │ │ RDD[Person] │ │ (类型丢失) │ │ (类型保留) │ └──────────────┘ └──────────────┘

3.5 转换方法速查表

转换方向方法类型变化说明
RDD -> DataFramerdd.toDF()RDD[T]DataFrame需导入implicits
RDD -> Datasetrdd.toDS()RDD[T]Dataset[T]类型保留
DataFrame -> RDDdf.rddDataFrameRDD[Row]类型丢失
DataFrame -> Datasetdf.as[T]DataFrameDataset[T]需case class
Dataset -> RDDds.rddDataset[T]RDD[T]类型保留
Dataset -> DataFrameds.toDF()Dataset[T]DataFrame类型丢失

四、Dataset实现WordCount

利用Dataset的强类型特性和函数式API,可以写出更简洁优雅的WordCount。

4.1 完整代码

importorg.apache.spark.sql.{Dataset,SparkSession}objectDatasetWordCount{defmain(args:Array[String]):Unit={valspark=SparkSession.builder().appName("Dataset-WordCount").master("local[*]").getOrCreate()importspark.implicits._// Dataset实现WordCountvalres:Dataset[(String,Long)]=spark.read.text("data/word.txt")// 读取文本文件,得到DataFrame.as[String]// DataFrame -> Dataset[String].flatMap(_.split(" "))// 拆分单词.groupByKey(_.toLowerCase)// 按单词分组(转小写统一).count()// 统计每组数量res.show()spark.stop()}}

4.2 代码解析

步骤代码说明类型变化
1spark.read.text("data/word.txt")读取文本文件DataFrame(单列value)
2.as[String]转为Dataset[String]Dataset[String]
3.flatMap(_.split(" "))拆分单词Dataset[String](单词流)
4.groupByKey(_.toLowerCase)按小写单词分组KeyValueGroupedDataset
5.count()统计每组数量Dataset[(String, Long)]

4.3 运行结果

+------+--------+ | key|count(1)| +------+--------+ | fast| 1| | is| 3| | spark| 2| |better| 1| | good| 1| |hadoop| 1| +------+--------+

4.4 Dataset WordCount vs RDD WordCount

RDD版本:

valrdd=sc.textFile("data/word.txt")valresult=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

Dataset版本:

valds=spark.read.text("data/word.txt").as[String]valresult=ds.flatMap(_.split(" ")).groupByKey(_.toLowerCase).count()

对比分析:

特性RDD版本Dataset版本
代码量较少更少
类型安全有(编译时检查)
性能优化手动Catalyst自动优化
API风格函数式函数式 + SQL
ShufflereduceByKeygroupByKey + count
序列化Java/KryoTungsten(更高效)

五、Dataset高级操作

5.1 强类型map操作

caseclassPerson(name:String,age:Long)valds=spark.read.json("people.json").as[Person]// 编译时检查字段名和类型ds.map(_.name.toUpperCase).show()// ✅ 正确ds.map(_.nmae.toUpperCase).show()// ❌ 编译报错!字段名错误ds.map(_.age+"years").show()// ❌ 编译报错!类型不匹配

5.2 类型安全的聚合

caseclassScore(name:String,subject:String,score:Int)valds=spark.read.json("scores.json").as[Score]// 按学生分组,计算平均分ds.groupByKey(_.name).mapGroups{case(name,scores)=>valscoreList=scores.toList(name,scoreList.map(_.score).sum/scoreList.size.toDouble)}.show()

5.3 Dataset与SQL混用

caseclassPerson(name:String,age:Long,sex:String)valds=spark.read.json("people.json").as[Person]// 注册临时视图,使用SQL查询ds.createOrReplaceTempView("people")spark.sql("SELECT * FROM people WHERE age > 25").as[Person].show()

六、总结

本文系统讲解了Spark SQL中Dataset的核心知识:

核心知识点回顾

  1. Dataset的定位

    • 兼具RDD的类型安全和DataFrame的Catalyst优化
    • DataFrame = Dataset[Row],是Dataset的子集
    • Spark 2.0+中三者统一为Dataset API
  2. Dataset的三种创建方式

    • spark.createDataset(集合/RDD):从数据源创建
    • 集合.toDS():隐式转换
    • df.as[CaseClass]:从DataFrame转换
  3. 三种抽象的互转

    • RDD <-> DataFrame:类型会丢失(转为Row)
    • RDD <-> Dataset:类型保留(安全)
    • DataFrame <-> Dataset:as[T]toDF()互转
  4. 选择策略

    • 非结构化数据 → RDD
    • SQL查询、简单ETL → DataFrame
    • 复杂类型处理、编译时安全 → Dataset
http://www.rkmt.cn/news/1459315.html

相关文章:

  • 来杭州返程伴手礼怎么选?本地人从不乱买,这款非遗糕点包揽送礼刚需 - 玖叁鹿
  • 2026 年 6 月贵港防水维修机构甄选指南:卫生间免砸砖、屋顶阳台外墙地下室漏水检修与避坑全攻略 - 吉修匠
  • 杭州防水市场价参考全攻略:避开低价转包隐形陷阱,2026 年业主必看指南 - 玖叁鹿
  • 合肥卖金避坑|5家黄金回收实地横评,底价清单 + 防宰攻略收好 - 奢侈品回收评测
  • 别再傻拧了!SX1308升压模块调压失败?实测教你用万用表快速定位问题(附5V安全供电指南)
  • 无人机低空安防巡检AI落地方案|航拍小目标人员入侵检测、多场景跨领域目标检测数据集与YOLO算法工程实战
  • 游杭州收尾别乱买!藏在市井里的非遗糕点,才是值得带走的江南印记 - 玖叁鹿
  • 2026 深圳小规模一般纳税人代账收费标准详解,深圳老牌代理记账公司排名,各区优质代账机构精选汇总 - 品牌智鉴榜
  • 【架构实战】API版本管理:让接口平滑演进
  • Servlet 到 Spring MVC 架构演进:Java Web 开发二十年技术变迁史
  • Telegram 机器人安全审计
  • 自然语言修图:混元图像3.0如何实现一句话修图
  • 随时随地管设备!聚英云免费APP+电脑端,多端数据无缝同步
  • STM32F407用ADC实时采样信号,通过UART直驱串口屏动态画波形
  • 100个免配置HTML模板:电商/教育/企业站源码,双击即看效果
  • 2026年泉州装修设计公司优选指南:从别墅私宅到酒店办公,谁能真正实现“效果图落地”? - 资讯快报
  • Android 11.0 webview 加载https白屏,忽略Https证书校验不当弹窗提醒功能实现
  • 从Java字节码到十六进制:手把手教你破解一个密码管理器的试用限制
  • 想考PMP不知道怎么选机构?PMP主流培训机构通过率实力与购买性价比分析 - 资讯焦点
  • 2026最新肇庆市本地黄金铂金白银彩金回收服务 五大黄金靠谱回收门店汇总,正规渠道对比推荐及联系方式 - 前途无量YY
  • 避坑指南:ABB机器人PC SDK开发中,网络扫描与连接的那些‘坑’(C#/.NET实战)
  • 用VBScript和批处理文件模拟恶意网页攻击:一个信息安全新手的实验笔记(附完整代码)
  • 购物卡回收高价技巧,天猫卡轻松变现! - 团团收购物卡回收
  • Gemini为何不开源?解析大模型闭源背后的商业与工程逻辑
  • 保姆级教程:用sendmsg/recvmsg在Linux多进程间传递文件描述符(附完整C代码)
  • Python之ya-direct-api包语法、参数和实际应用案例
  • Chrome扩展集成Gemma-2B:WebGPU+WASM本地AI实践
  • 免费AIGC降重工具指南:轻松降低AI查重率 学生党必备 - 仙仙学姐测评
  • 实战演练:在快马平台部署一个集成libopus的WebRTC语音聊天室
  • 长春靠谱的专业不锈钢零售制造商,究竟哪家才是你的理想之选? - GrowthUME