尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

Flink2.1.1-Kafka写入Elasticsearch7

Flink2.1.1-Kafka写入Elasticsearch7
📅 发布时间:2026/6/19 13:23:35

安装

Flink2.1.1 docker安装

Java代码示例

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example.flink</groupId><artifactId>flink-kafka-es7-sql</artifactId><version>1.0.0</version><name>flink-kafka-es7-sql</name><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>2.1.1</flink.version><scala.binary.version>2.12</scala.binary.version><log4j.version>2.24.3</log4j.version><commons-math3.version>3.6.1</commons-math3.version><lombok.version>1.18.26</lombok.version></properties><dependencies><!-- Flink Streaming 核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink 客户端依赖,用于本地执行 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!--        本地启动需要--><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-table-planner_2.12</artifactId>--><!--            <version>2.1.1</version>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>4.0.1-2.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>4.0.0-2.0</version></dependency><!-- Flink JSON Format --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>${commons-math3.version}</version></dependency></dependencies><build><plugins><!-- Java 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><!-- 统一打包插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.4.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><transformers><!-- 避免 META-INF 冲突 --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.flink.KafkaEs7Sql</mainClass></transformer></transformers><!-- 可选:把签名去掉,防止非法包 --><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude>//会和Flink集群自带的 Kryo/Objenesis冲突。<exclude>com/esotericsoftware/kryo/**</exclude><exclude>org/objenesis/**</exclude><exclude>META-INF/versions/9/org/objenesis/**</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

代码

package com.example.flink;import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** Flink SQL application to consume from Kafka and count files per userId*/
public class KafkaEs7Sql {public static void main(String[] args) throws Exception {// 1. 创建流执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//每 5 秒触发一次env.enableCheckpointing(5000);// 精确一次env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);// 60 s 超时env.getCheckpointConfig().setCheckpointTimeout(60000);// 禁止并发,降低 IO 峰刺env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 两次 CP 至少间隔 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 作业取消后保留 CP,便于手工恢复env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);// 2. 创建表环境final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);/* 2. 源表(Processing Time 窗口无需事件时间字段) */tableEnv.executeSql("CREATE TABLE test_file_source (" +"  userId STRING," +"  type STRING," +"  fileType STRING," +"  fileUrl STRING," +"  rlsFileList ARRAY<ROW<fileUrl STRING, filePath STRING, fileType STRING>>," +"  shootTime BIGINT," +"  uploadTime BIGINT," +"  location STRING," +"  duration BIGINT," +"  pt AS PROCTIME()" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'user_file_topic'," +"  'properties.bootstrap.servers' = 'kk.kk.kk.kk:9092,kk.kk.kk.kk:9092,kk.kk.kk.kk:9092'," +"  'format' = 'json'," +"  'scan.startup.mode' = 'earliest-offset'" +")");tableEnv.executeSql("CREATE TABLE es_sink (" +"  id STRING," +"  userId STRING," +"  total BIGINT," +"  proTime TIMESTAMP(3)," +"  PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +"  'connector' = 'elasticsearch-7'," +"  'hosts' = 'http://es.es.es.es:9200'," +"  'index' = 'test_file_index7'," +"  'document-id.key-delimiter' = '_'," +"  'format' = 'json'," +// 【修改点】使用有效的值 at-least-once"  'sink.delivery-guarantee' = 'at-least-once'" +")");/* 4. 30 秒窗口统计 + 当前时间作为 timestamp */TableResult tableResult = tableEnv.executeSql("INSERT INTO es_sink " +"SELECT " +"  UUID() AS id, " +"  userId, " +"  COUNT(*) AS total, " +"  CURRENT_TIMESTAMP AS proTime " +"FROM test_file_source " +"GROUP BY " +"  userId, " +"  TUMBLE(pt, INTERVAL '30' SECOND)");tableResult.await();}
}

运行示例

上传KafkaEs7Sql的jar包到flink

upload_kafka_es7_jar

写入ES数据

test_file_index7_search

查看ES数据结构

{"test_file_index7" : {"mappings" : {"properties" : {"id" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"proTime" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"total" : {"type" : "long"},"userId" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}}}}}
}

备注

查看es数据

#查看数据
http://es.es.es.es:9200/test_file_index7/_search?pretty
#查看结构
http://es.es.es.es:9200/test_file_index7/_mapping?pretty

相关新闻

  • django基于Python的京东教辅图书销售数据分析系统的设计与实现演示录像2023_2q236-vue爬虫可视化
  • django基于数据挖掘的微博事件分析与可视化系统的设计与实现演示录像2023_u9nmf-vue
  • 读懂HikariCP一百行代码,多线程就是个孙子

最新新闻

  • 从入门到精通:Catcher异常过滤器与参数排除高级用法终极指南
  • 解决Docker Machine文件共享慢问题:NFS替代默认挂载的完整方案
  • 淮南GEO服务商代理加盟选型靠谱推荐哪家?2026年淮南GEO优化代理加盟服务商选型指南与合作权益解析 - 子柔传媒
  • Madmom深度解析:Python音乐信息检索的高效方案
  • Xiaomusic深度解析:3大核心功能与进阶配置实战指南
  • 2026佛山防水补漏维修团队实测盘点TOP4:佛山业主房屋渗漏修缮靠谱选择 - 宅安选房屋修缮

日新闻

  • 5分钟掌握Python进化算法:Geatpy高性能优化工具完全指南
  • Microchip 24AA044 EEPROM选型与应用全指南:从参数解析到实战编程
  • 华为的鸿蒙到底有多牛?为什么称作遥遥领先?

周新闻

  • 3步解锁iOS设备:applera1n激活锁绕过完全指南
  • 39 2026 人工智能证书终极盘点,普通人选 AI 证书可以从这些方向入手
  • Redis 暴露公网有多危险?从端口检查到补救步骤

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号