当前位置: 首页 > news >正文

SeaTunnel 数据采集实战指南(K8S Docker)

SeaTunnel 数据采集实战指南

概述

本文档提供了一个完整的 SeaTunnel(V2.3.8) 数据采集部署和使用指南,适用于 MongoDB 和 MySQL(RDS)的数据同步场景。通过本文,您将学会如何搭建一个自动化的数据采集系统,实现每日定时的数据同步任务。
其他版本大同小异,实际数据同步配置文档说明以官方文档为准

一、环境准备

1.1 系统要求

环境类型要求
操作系统Linux (x86_64 / ARM64)
Docker20.10+
Kubernetes1.20+(可选)
SeaTunnel2.3.8

1.2 目录结构

seatunnel/ ├── bin/ │ ├── mongo_start.sh# MongoDB任务启动脚本│ ├── rds_start.sh# RDS任务启动脚本│ ├── mongoshell-linux-amd64 │ └── mongoshell-linux-arm64 ├── config/ │ ├── mongo_dynamic.template# MongoDB动态任务配置│ ├── mongo_static.template# MongoDB静态任务配置│ └── rds.template# RDS任务配置├── seatunnel_mongo.yaml# K8s MongoDB部署文件└── seatunnel_rds.yaml# K8s RDS部署文件

二、配置文件详解

2.1 配置模板机制

配置模板使用$(变量名)作为占位符,启动脚本运行时会替换为实际环境变量值。同步脚本语言类型:Hocon

支持的占位符:

占位符说明
$(MONGODB_URI)MongoDB连接字符串
$(MONGODB_DATABASE)MongoDB数据库名称
$(RDS_URI)MySQL JDBC连接前缀
$(RDS_USERNAME)MySQL用户名
$(RDS_PASSWORD)MySQL密码

2.2 MongoDB静态任务配置

用于同步固定的MongoDB集合:

env { parallelism = 1 job.mode = "BATCH" } source { MongoDB { uri = "$(MONGODB_URI)" database = "$(MONGODB_DATABASE)" collection = "skyladder_flowline_logs" result_table_name = "skyladder_flowline_logs" schema = { columns = [ { name = "_id", type = STRING, nullable = true }, { name = "projectId", type = STRING, nullable = true } ] } } } transform { sql { source_table_name = ["skyladder_flowline_logs"] result_table_name = "sub_skyladder_flowline_logs" query = "select projectId as project_id, _id as _id from skyladder_flowline_logs;" } } sink { jdbc { user = "$(RDS_USERNAME)" driver = "com.mysql.cj.jdbc.Driver" url = "$(RDS_URI)/metric?useSSL=false&characterEncoding=utf-8" password = "$(RDS_PASSWORD)" source_table_name = ["sub_skyladder_flowline_logs"] generate_sink_sql = true database = "metric" table = "metric.sub_skyladder_flowline_logs" primary_keys = ["_id"] } }

2.3 MongoDB动态任务配置

支持按项目ID动态遍历多个集合:

env { parallelism = 1 job.mode = "BATCH" } source { MongoDB { uri = "$(MONGODB_URI)" database = "$(MONGODB_DATABASE)" collection = "3e9d762f34d944c782876ef07723e3ac.npm_allItemData" result_table_name = "npm_allItemData" schema = { columns = [ { name = "_id", type = STRING, nullable = true }, { name = "projectId", type = STRING, nullable = true } ] } } } transform { sql { source_table_name = ["npm_allItemData"] result_table_name = "sub_kb_workbench" query = "select _id as _id, projectId as project_id from npm_allItemData;" } } sink { jdbc { user = "$(RDS_USERNAME)" driver = "com.mysql.cj.jdbc.Driver" url = "$(RDS_URI)/metric?useSSL=false&characterEncoding=utf-8" password = "$(RDS_PASSWORD)" source_table_name = ["sub_kb_workbench"] generate_sink_sql = true database = "metric" table = "metric.sub_kb_workbench" primary_keys = ["_id"] } }

2.4 RDS任务配置

env { parallelism = 1 job.mode = "BATCH" } source { Jdbc { "result_table_name"=pms_unit_info table_path="portal.pms_unit_info" url="$(RDS_URI)/portal?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true" driver = "com.mysql.cj.jdbc.Driver" user="$(RDS_USERNAME)" password="$(RDS_PASSWORD)" } } transform { sql { source_table_name = ["npm_allItemData"] result_table_name = "sub_kb_workbench" query = "select _id as _id, projectId as project_id from npm_allItemData;" } } sink { Jdbc { "source_table_name"=["pms_unit_info"] "generate_sink_sql"=true database="metric" table="a_pms_unit_info" user="$(RDS_USERNAME)" driver="com.mysql.cj.jdbc.Driver" url= "$(RDS_URI)/metric?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true" password="$(RDS_PASSWORD)" schema_save_mode=IGNORE data_save_mode=CUSTOM_PROCESSING custom_sql="truncate table a_pms_unit_info" } }

三、启动脚本编写

3.1 MongoDB启动脚本

#!/bin/bash:"${ARCH:?Error:ARCH not set,use 'x86' or 'arm'}":"${MONGODB_URI:?Error:MONGODB_URI not set}":"${MONGODB_DATABASE:?Error:MONGODB_DATABASE not set}":"${RDS_URI:?Error:RDS_URI not set}":"${RDS_USERNAME:?Error:RDS_USERNAME not set}":"${RDS_PASSWORD:?Error:RDS_PASSWORD not set}"home="/sea"case"$ARCH"inx86)mongoshell="${home}/bin/mongoshell-linux-amd64";;arm)mongoshell="${home}/bin/mongoshell-linux-arm64";;*)echo"Error: Unsupported ARCH:$ARCH";exit1;;esac\cp"${home}/config/mongo_dynamic.template""${home}/config/mongo_dynamic.conf"\cp"${home}/config/mongo_static.template""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_URI)#${MONGODB_URI}#g""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_URI)#${MONGODB_URI}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g""${home}/config/mongo_dynamic.conf"mkdir-p"${home}/logs"whiletrue;donow=$(date+%s)tomorrow=$(date-d"tomorrow 00:00:00"+%s2>/dev/null||date-v+1d-v0H-v0M-v0S+%s2>/dev/null)[-z"$tomorrow"]&&tomorrow=$((now-now%86400+86400))sleep_seconds=$((tomorrow-now))echo"Starting daily task:$(date)"/opt/seatunnel/bin/seatunnel.sh--config"${home}/config/mongo_static.conf"-elocal>>"${home}/logs/mongo_static-$(date+%Y%m%d).log"2>&1echo"Waiting$sleep_secondsseconds for next run..."sleep"$sleep_seconds"done

3.2 RDS启动脚本

#!/bin/bash:"${RDS_URI:?错误:环境变量 RDS_URI 未设置}":"${RDS_USERNAME:?错误:环境变量 RDS_USERNAME 未设置}":"${RDS_PASSWORD:?错误:环境变量 RDS_PASSWORD 未设置}"# 定义其他路径(使用环境变量)# 根目录home="/sea"rds_config_file="${home}/config/rds.conf"log_dir="${home}/logs"\cp"${home}/config/rds.template""${home}/config/rds.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g"$rds_config_filesed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g"$rds_config_filesed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g"$rds_config_file# 确保日志目录存在mkdir-p"$log_dir"# 注意:原脚本中的 chmod +x /config/* 可能路径错误,已修正为 ${home}/config/*chmod+x${home}/config/*2>/dev/null# 无限循环,每天0点执行一次任务whiletrue;do# 计算距离下一个0点的秒数now=$(date+%s)tomorrow=$(date-d"tomorrow 00:00:00"+%s2>/dev/null||date-v+1d-v0H-v0M-v0S+%s2>/dev/null)if[-z"$tomorrow"];thenseconds_today=$((now%86400))sleep_seconds=$((86400-seconds_today))elsesleep_seconds=$((tomorrow-now))fiecho"开始执行每日任务:$(date)"# 使用当天日期作为日志文件名(按天分割)today=$(date+%Y%m%d)static_log="${log_dir}/rds-${today}.log"# 将本次执行的开始时间记录到日志(追加)echo"===== 开始执行任务:$(date)=====">>"$static_log"# 执行一次rds任务echo"执行配置文件任务:$rds_config_file"# 使用追加模式 >> 将 seatunnel 输出写入当天日志文件/opt/seatunnel/bin/seatunnel.sh--config"$rds_config_file"-elocal>>"$static_log"2>&1echo"每日任务完成:$(date)"echo"当前时间:$(date),等待$sleep_seconds秒后到达下一个0点..."sleep"$sleep_seconds"done

四、Docker部署

4.1 准备目录

mkdir-p/opt/data/seatunnel/{bin,config}

4.2 启动RDS任务

dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eRDS_URI="jdbc:mysql://mysql-host:3306"\-eRDS_USERNAME="user"\-eRDS_PASSWORD="password"\apache/seatunnel:2.3.8\sh/sea/bin/rds_start.sh

4.3 启动MongoDB任务

dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eARCH="x86"\-eMONGODB_URI="mongodb://user:pass@mongo-host:27017"\-eMONGODB_DATABASE="dbname"\-eRDS_URI="jdbc:mysql://mysql-host:3306"\-eRDS_USERNAME="user"\-eRDS_PASSWORD="password"\apache/seatunnel:2.3.8\sh/sea/bin/mongo_start.sh

五、Kubernetes部署

5.1 Deployment示例-MongoDB

apiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-mongonamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-mongotemplate:metadata:labels:app:seatunnel-mongospec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:ARCHvalue:"x86"-name:MONGODB_URIvalue:"mongodb://user:pass@mongo-host:27017"-name:MONGODB_DATABASEvalue:"dbname"-name:RDS_URIvalue:"jdbc:mysql://mysql-host:3306"-name:RDS_USERNAMEvalue:"user"-name:RDS_PASSWORDvalue:"password"command:-/bin/sh--c-|chmod +x /sea/bin/mongo_start.sh && bash /sea/bin/mongo_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea

5.1 Deployment示例-RDS

apiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-rdsnamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-rdstemplate:metadata:labels:app:seatunnel-rdsspec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:RDS_URIvalue:"jdbc:mysql://mysql-host:3306"-name:RDS_USERNAMEvalue:"user"-name:RDS_PASSWORDvalue:"password"command:-/bin/sh--c-|chmod +x /sea/bin/rds_start.sh && bash /sea/bin/rds_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea

六、常见问题

问题解决方案
脚本换行符错误sed -i 's/\r$//' script.sh
连接失败检查网络和认证信息
占位符未替换确认环境变量正确传递
mongo客户端可替换为本地客户端

七、参考链接

  • SeaTunnel官方文档
  • SeaTunnel GitHub
http://www.rkmt.cn/news/1532472.html

相关文章:

  • 模板驱动的文档操作系统:自动化排版原理与实战
  • Sqribble:面向专业文档生产的模板化操作系统
  • 2026全业务链条断层破解:智能体如何重构端到端业务闭环
  • 阿里云云解析DNS从零到一:从基础解析到智能调度与安全防护的完全指南
  • E-Hentai Viewer完全指南:iOS上最佳的E-Hentai阅读器终极教程
  • 重庆音响改装门店,6月给你的音响来一次完美蜕变,宝马音响改装/豪车音响改装/问界音响改装,音响改装门店找哪家 - 音响改装门店分享
  • 【2027最新】基于SpringBoot+Vue的html网上团购系统管理系统源码+MyBatis+MySQL
  • Windows安卓驱动安装终极指南:一键自动化ADB Fastboot工具
  • 2026年6月市场观察:优质渗透结晶型防水涂料厂家如何炼成 - 品牌鉴赏官2026
  • Ubuntu 20.04中文TTS实战:espeak-ng+mbrola语音合成全链路打通
  • Windows 11终极优化指南:用Win11Debloat彻底掌控你的系统
  • 百色高口碑黄金铂金回收白银回收实体老店排行 5 家靠谱门店电话地址全收录
  • 多维聚合实战指南:从SQL GROUP BY到OLAP立方体构建
  • 2026年常州全屋定制十大品牌推荐:橱柜、衣柜、鞋柜、书柜、餐边柜、衣帽间、电视柜、玄关柜全案定制深度榜单及选购指南 - 品牌发掘
  • 一文吃透 NVIDIA PhysX 物理引擎:原理、架构、核心组件与实战应用
  • 2026年新消息:深度剖析太原全屋定制实力品牌的可靠之选 - 品牌鉴赏官2026
  • 微信小程序图片裁剪终极指南:we-cropper完整使用教程
  • 2026年河南公办二本院校可靠推荐排行权威盘点:河南口腔类专业大学推荐/河南好就业的本科有哪些?/排行一览 - 优质品牌商家
  • 2026年 东莞背光源/背光方案定制/黑白屏背光/中小尺寸背光/白色背光厂家推荐榜:创意光源与精密工艺口碑之选 - 品牌发掘
  • Logistic Regression工业级实战:二分类模型落地全流程
  • 2026年 广东磁铁厂家推荐排行榜:耐高温磁铁/手机磁铁/包装盒磁铁/门吸磁铁/电器磁铁/服装磁铁/镀锌磁铁/电机磁铁/圆形磁铁/箱包磁铁源头实力之选 - 品牌发掘
  • 2026成都变压器回收技术推荐:成都风管机回收/成都中央空调回收/成都发电机回收/靠谱服务商实测维度解析 - 优质品牌商家
  • 通知告警系统深入指南 —— Aneiang.Yarp 2.3.0.20 核心功能
  • 2026成都汽车发动机维修实测:三家机构核心维度对比 - 优质品牌商家
  • Fable 5访问暂停后,模型接入层不能再只写死一个模型名
  • 用AI御三家和Mac mini 搭了个私人生产力系统
  • 2026年6月制袋机品牌哪家强,平口包装袋制袋机/快递袋吹膜机/吹膜机/塑料造粒机,制袋机生产厂家哪家强 - 品牌推荐师
  • Windows 7已成过去式?从CVE-2019-0708看老旧系统在企业的遗留风险与实战化防御演练
  • 武汉公司注册多少钱 剖析本地创业财税服务的真实成本与价值 - 热点观察
  • AI 绘图工具对比:DALL-E / Midjourney / SD 实测