Flink数据流分布式写入文件实战
目录
1. 代码结构
代码解析
(1) 主程序入口
(2) 配置文件 Sink
(3) 添加 Sink 到数据流
(4) 执行任务
3. 输出结果
这段代码展示了如何使用 Apache Flink 将数据流以文本形式写入文件,并配置了文件的滚动策略。以下是对代码的详细解析和说明:
1. 代码结构
包声明:
package sink
定义了代码所在的包。导入依赖:
导入了必要的 Java 和 Flink 相关类库,包括:java.util.concurrent.TimeUnit:用于时间单位转换。org.apache.flink:Flink 的核心类库。org.apache.flink.streaming.api.functions.sink.filesystem:Flink 的文件 Sink 相关类。
sinkToFile对象:
主程序入口,包含 Flink 流处理逻辑和文件 Sink 的配置。
package sink import java.util.concurrent.TimeUnit import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala._ /** * * @PROJECT_NAME: flink1.13 * @PACKAGE_NAME: sink * @author: 赵嘉盟-HONOR * @data: 2023-11-19 23:26 * @DESCRIPTION * */ object sinkToFile { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) ) //TODO 直接一文本形式分布式写到文件中 val fileSink = StreamingFileSink .forRowFormat( new Path("src/main/resources/output/fileSink"), new SimpleStringEncoder[String]("UTF-8") ) .withRollingPolicy( //指定滚动策略 DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15)) //十五分钟混动 .withInactivityInterval(TimeUnit.MINUTES.toMinutes(5)) //五分钟无数据滚动 .withMaxPartSize(1024*1024*1024) //最大文件大小 .build() ) .build() data.map(_.toString).addSink(fileSink) env.execute("SinkFile") } }基于scala使用flink将读取到的数据分布式写入到文件中
可以指定滚动策略(滚动时间、滚动方式、最大文件数量等)
代码解析
(1) 主程序入口
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) )- 创建 Flink 流处理环境
StreamExecutionEnvironment。 - 使用
fromElements方法生成一个包含 4 个Event对象的流。
(2) 配置文件 Sink
val fileSink = StreamingFileSink .forRowFormat( new Path("src/main/resources/output/fileSink"), new SimpleStringEncoder[String]("UTF-8") ) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15)) // 15 分钟滚动 .withInactivityInterval(TimeUnit.MINUTES.toMinutes(5)) // 5 分钟无数据滚动 .withMaxPartSize(1024 * 1024 * 1024) // 最大文件大小 1GB .build() ) .build()- 使用
StreamingFileSink.forRowFormat创建一个文件 Sink:- 指定输出路径为
src/main/resources/output/fileSink。 - 使用
SimpleStringEncoder将数据编码为 UTF-8 格式的字符串。
- 指定输出路径为
- 配置滚动策略
DefaultRollingPolicy:withRolloverInterval(15):每 15 分钟滚动一次文件。withInactivityInterval(5):如果 5 分钟内没有新数据,则滚动文件。withMaxPartSize(1024 * 1024 * 1024):当文件大小达到 1GB 时滚动文件。
(3) 添加 Sink 到数据流
data.map(_.toString).addSink(fileSink)- 将
Event对象转换为字符串。 - 将文件 Sink 添加到数据流中。
(4) 执行任务
env.execute("SinkFile")- 启动 Flink 流处理任务,任务名称为
SinkFile。
3. 输出结果
程序运行后,数据会以文本形式写入src/main/resources/output/fileSink目录下的文件中。文件会根据滚动策略进行分割,例如:
- 每 15 分钟生成一个新文件。
- 如果 5 分钟内没有新数据,则生成一个新文件。
- 当文件大小达到 1GB 时生成一个新文件。
文件内容示例:
Event(Mary,./home,100) Event(Sum,./cart,500) Event(King,./prod,1000) Event(King,./root,200)