当前位置: 首页 > news >正文

Flink数据流分布式写入文件实战

目录

1. 代码结构

代码解析

(1) 主程序入口

(2) 配置文件 Sink

(3) 添加 Sink 到数据流

(4) 执行任务

3. 输出结果


这段代码展示了如何使用 Apache Flink 将数据流以文本形式写入文件,并配置了文件的滚动策略。以下是对代码的详细解析和说明:

1. 代码结构

  • 包声明package sink
    定义了代码所在的包。

  • 导入依赖
    导入了必要的 Java 和 Flink 相关类库,包括:

    • java.util.concurrent.TimeUnit:用于时间单位转换。
    • org.apache.flink:Flink 的核心类库。
    • org.apache.flink.streaming.api.functions.sink.filesystem:Flink 的文件 Sink 相关类。
  • sinkToFile对象
    主程序入口,包含 Flink 流处理逻辑和文件 Sink 的配置。

package sink import java.util.concurrent.TimeUnit import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala._ /** * * @PROJECT_NAME: flink1.13 * @PACKAGE_NAME: sink * @author: 赵嘉盟-HONOR * @data: 2023-11-19 23:26 * @DESCRIPTION * */ object sinkToFile { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) ) //TODO 直接一文本形式分布式写到文件中 val fileSink = StreamingFileSink .forRowFormat( new Path("src/main/resources/output/fileSink"), new SimpleStringEncoder[String]("UTF-8") ) .withRollingPolicy( //指定滚动策略 DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15)) //十五分钟混动 .withInactivityInterval(TimeUnit.MINUTES.toMinutes(5)) //五分钟无数据滚动 .withMaxPartSize(1024*1024*1024) //最大文件大小 .build() ) .build() data.map(_.toString).addSink(fileSink) env.execute("SinkFile") } }

基于scala使用flink将读取到的数据分布式写入到文件中

可以指定滚动策略(滚动时间、滚动方式、最大文件数量等)

代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) )
  • 创建 Flink 流处理环境StreamExecutionEnvironment
  • 使用fromElements方法生成一个包含 4 个Event对象的流。
(2) 配置文件 Sink
val fileSink = StreamingFileSink .forRowFormat( new Path("src/main/resources/output/fileSink"), new SimpleStringEncoder[String]("UTF-8") ) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15)) // 15 分钟滚动 .withInactivityInterval(TimeUnit.MINUTES.toMinutes(5)) // 5 分钟无数据滚动 .withMaxPartSize(1024 * 1024 * 1024) // 最大文件大小 1GB .build() ) .build()
  • 使用StreamingFileSink.forRowFormat创建一个文件 Sink:
    • 指定输出路径为src/main/resources/output/fileSink
    • 使用SimpleStringEncoder将数据编码为 UTF-8 格式的字符串。
  • 配置滚动策略DefaultRollingPolicy
    • withRolloverInterval(15):每 15 分钟滚动一次文件。
    • withInactivityInterval(5):如果 5 分钟内没有新数据,则滚动文件。
    • withMaxPartSize(1024 * 1024 * 1024):当文件大小达到 1GB 时滚动文件。
(3) 添加 Sink 到数据流
data.map(_.toString).addSink(fileSink)
  • Event对象转换为字符串。
  • 将文件 Sink 添加到数据流中。
(4) 执行任务
env.execute("SinkFile")
  • 启动 Flink 流处理任务,任务名称为SinkFile

3. 输出结果

程序运行后,数据会以文本形式写入src/main/resources/output/fileSink目录下的文件中。文件会根据滚动策略进行分割,例如:

  • 每 15 分钟生成一个新文件。
  • 如果 5 分钟内没有新数据,则生成一个新文件。
  • 当文件大小达到 1GB 时生成一个新文件。

文件内容示例:

Event(Mary,./home,100) Event(Sum,./cart,500) Event(King,./prod,1000) Event(King,./root,200)
http://www.rkmt.cn/news/1387361.html

相关文章:

  • KouShare-dl终极指南:10个高效下载蔻享学术视频的实用技巧
  • 嵌入式开发避坑指南:eMMC通信协议中Data Strobe信号到底怎么用?
  • Unity AndroidWebView模块:安卓原生WebView深度接管指南
  • 《流畅的Python》读书笔记10(补充02): 装饰器和闭包 - 闭包并发安全解决方案
  • NumPy 2.0 迁移指南:ABI断裂、标量规则与StringDType实战
  • 强化学习在并行机构人形机器人控制中的应用
  • 为Chromebook和树莓派打造的VS Code社区构建版本完全指南:终极安装与使用教程
  • Jetson Orin Nano 升级jetpack5.1.2刷机过程记录
  • PICO4帧时间抖动根因与稳帧工程实践
  • 保姆级教程:在Ubuntu 20.04上从零配置UR5机械臂的ROS Noetic驱动与MoveIt仿真环境
  • 如何实现多平台Charting Library集成:从Web到移动端的完整指南
  • 上海亚卡黎实业有限公司2026作业设备优选:专业车载高空作业平台厂家/剪式平台厂家推荐上海亚卡黎实业 - 栗子测评
  • IPFS去中心化存储实战指南:黑马程序员音乐播放器项目开发完整教程
  • ZjDroid命令大全:从DEX内存dump到Lua脚本注入的完整教程
  • 美国签证预约自动提醒工具终极指南:告别手动刷新的智能解决方案
  • 【实战系列整合】《从 0 到 1 打造鸿蒙原生应用:会议随记 Pro 开发实战合集》
  • SocialR1-8B-i1-GGUF:终极社交推理AI模型完全指南
  • everfu/hexo-theme-solitude主题用户行为分析:热力图与转化路径追踪配置
  • 如何使用SQLite Viewer快速加载和分析本地SQLite数据库文件?完整操作指南
  • MuJoCo物理仿真终极指南:深度解析接触动力学与7个实战调优技巧
  • 保姆级教程:在ArcGIS Pro插件中集成你的自定义工具箱(以‘消除重复要素’为例)
  • Visual Studio 项目属性页开发完全教程:从基础到高级
  • MinIO + Docker 快速搭建 S3 兼容对象存储
  • 如何用AOT-GAN实现高分辨率图像修复:从原理到实践
  • 保姆级教程:手把手带你走通UDS Bootloader刷写全流程(附报文解析)
  • 含分布式风力发电的微电网系统优化控制【附代码】
  • 从Bert到Ernie:百度文心大模型是如何通过‘知识融合’解决中文分词难题的?
  • QuickBMS终极指南:如何快速提取和修改游戏资源文件
  • InsForge与Cursor集成:AI代码编辑器的完美后端平台指南
  • MedGemma与Hugging Face集成:如何在医疗AI项目中无缝使用预训练模型