尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

《Debezium + Kafka Connect 实战:从零搭建 MySQL CDC 数据管道,踩坑全记录》

《Debezium + Kafka Connect 实战:从零搭建 MySQL CDC 数据管道,踩坑全记录》
📅 发布时间:2026/7/1 1:39:57
部分命令没有详细解释,不清楚的可以问AI

一、背景与目标

搭建一条从 MySQL 到 Kafka 的实时数据同步管道(CDC),为后续实时数仓(Flink + Doris + Paimon)提供数据源。

技术选型:

  • MySQL 5.7.28(开启 binlog)

  • Kafka 3.2.0(消息队列)

  • Debezium 1.9.7(CDC 工具)

  • Kafka Connect(Debezium 运行框架)


二、环境准备

组件版本安装路径
CentOS7—
JavaOpenJDK 1.8.0_412/opt/module/jdk1.8.0_212
Kafka3.2.0/opt/module/kafka
Zookeeper3.5.7/opt/module/zookeeper-3.5.7
MySQL5.7.28hadoop102:3306
Debezium1.9.7.Final/opt/module/kafka/plugins/debezium-connector-mysql

三、MySQL 开启 binlog(数据源准备)

修改配置文件/etc/my.cnf:

ini

[mysqld] server-id=1 log-bin=mysql-bin binlog_format=row binlog-row-image=FULL # 注意:如果配置了 binlog-do-db,只记录指定数据库的变更 # 如需同步所有数据库,请注释掉该配置 # binlog-do-db=retail_db

重启 MySQL:

bash

systemctl restart mysqld

验证 binlog 是否开启:

bash

mysql -u root -p -e "SHOW VARIABLES LIKE 'log_bin';" # 预期输出: log_bin = ON

四、安装 Debezium MySQL Connector

bash

cd /opt/module/kafka mkdir -p plugins cd plugins # 下载 Debezium 插件 wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.2.0/connect-plugins/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz # 解压 tar -xzvf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz # 确认解压成功 ls -l debezium-connector-mysql/

五、配置 Kafka Connect

配置文件路径:/opt/module/kafka/config/connect-distributed.properties

properties

# Kafka Connect 集群标识 group.id=connect-cluster # Kafka 集群地址(三台 broker) bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 # Debezium 插件路径 plugin.path=/opt/module/kafka/plugins # 偏移量存储 Topic offset.storage.topic=connect-offsets offset.storage.replication.factor=1 offset.storage.partitions=25 # 配置存储 Topic config.storage.topic=connect-configs config.storage.replication.factor=1 # 状态存储 Topic status.storage.topic=connect-status status.storage.replication.factor=1 # 序列化格式 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false # REST API 地址 rest.advertised.host.name=hadoop102 rest.advertised.port=8083

六、踩坑记录(核心章节)


坑 1:connect-offsetsTopic 的cleanup.policy必须是compact

问题现象:

log

org.apache.kafka.common.config.ConfigException: Topic 'connect-offsets' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'.

根本原因:

  • Kafka Connect 要求connect-offsetsTopic 的cleanup.policy必须是compact,否则无法持久化偏移量。

  • 如果该 Topic 不存在,Kafka 自动创建时默认使用cleanup.policy=delete。

解决方案:

bash

# 1. 删除原有 Topic kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic connect-offsets # 2. 手动重建,指定 cleanup.policy=compact kafka-topics.sh --bootstrap-server hadoop102:9092 --create \ --topic connect-offsets \ --partitions 25 \ --replication-factor 1 \ --config cleanup.policy=compact \ --config min.cleanable.dirty.ratio=0.01 \ --config segment.ms=10000 # 3. 验证配置是否生效 kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic connect-offsets | grep cleanup # 预期输出: cleanup.policy=compact

坑 2:8083 端口被占用(Address already in use)

问题现象:

log

ERROR Stopping due to error org.apache.kafka.connect.errors.ConnectException: Unable to initialize REST server Caused by: java.net.BindException: Address already in use

curl http://hadoop102:8083/connectors返回Connection refused。

根本原因:

  • 上一个 Kafka Connect 进程未正常退出,仍占用 8083 端口。

  • 新的 Connect 进程启动时无法绑定端口,导致启动失败。

解决方案:

bash

# 1. 查看谁占用了 8083 端口 lsof -i:8083 # 2. 强制杀掉占用进程 kill -9 <PID> # 3. 重新启动 Kafka Connect cd /opt/module/kafka nohup bin/connect-distributed.sh config/connect-distributed.properties > /dev/null 2>&1 &

坑 3:Schema History Topic 缺失(The db history topic is missing)

问题现象:

log

WARN Database history was not found but was expected ERROR io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY

根本原因:

  • Debezium 需要schema-changes.retail_dbTopic 来存储表结构变更历史。

  • 该 Topic 不存在时,Debezium 不会自动创建,而是直接报错退出。

解决方案:

bash

# 1. 手动创建 Schema History Topic kafka-topics.sh --bootstrap-server hadoop102:9092 --create \ --topic schema-changes.retail_db \ --partitions 1 \ --replication-factor 1 # 2. 删除连接器后重新注册,使用 schema_only_recovery 模式 curl -X DELETE http://hadoop102:8083/connectors/mysql-connector-retail curl -X POST -H "Content-Type: application/json" --data ' { "name": "mysql-connector-retail", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "hadoop102", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "1", "database.server.name": "retail_db", "database.include.list": "retail_db", "database.history.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092", "database.history.kafka.topic": "schema-changes.retail_db", "include.schema.changes": "true", "snapshot.mode": "schema_only_recovery", "snapshot.locking.mode": "none", "decimal.handling.mode": "double" } }' http://hadoop102:8083/connectors

坑 4:Decimal 类型序列化为乱码

问题现象:
Flink 或kafka-console-consumer消费数据时,price字段显示为"AV8s"、"Cq38"等不可读字符,而不是6999.00。

根本原因:

  • Debezium 默认将 MySQL 的DECIMAL类型序列化为 Base64 编码的字节数组。

  • 需要配置decimal.handling.mode=double,让 Debezium 以数字形式输出。

解决方案:

在连接器配置中添加:

json

"decimal.handling.mode": "double"

完整连接器注册命令:

bash

curl -X POST -H "Content-Type: application/json" --data ' { "name": "mysql-connector-retail", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "hadoop102", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "1", "database.server.name": "retail_db", "database.include.list": "retail_db", "database.history.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092", "database.history.kafka.topic": "schema-changes.retail_db", "include.schema.changes": "true", "snapshot.mode": "initial", "snapshot.locking.mode": "none", "decimal.handling.mode": "double" } }' http://hadoop102:8083/connectors

坑 5:binlog-do-db 限制了可同步的数据库

问题现象:
Debezium 连接器状态显示RUNNING,但 Kafka 中始终没有生成对应的 Topic,消费不到任何数据。

根本原因:
MySQL 配置了binlog-do-db,只记录指定数据库的 binlog。如果目标数据库不在白名单中,Debezium 读不到任何变更。

解决方案:

修改/etc/my.cnf:

ini

# 如需同步所有数据库,注释掉该配置 # binlog-do-db=gmall # binlog-do-db=gmall2023_config # 或添加目标数据库 binlog-do-db=retail_db

重启 MySQL:

bash

systemctl restart mysqld

七、最终验证

1. 查看连接器状态:

bash

curl http://hadoop102:8083/connectors/mysql-connector-retail/status # 预期: "state":"RUNNING"

2. 查看 Kafka 中的 Topic:

bash

kafka-topics.sh --bootstrap-server hadoop102:9092 --list | grep retail # 预期输出: # retail_db # retail_db.retail_db.orders # retail_db.retail_db.products # schema-changes.retail_db

3. 消费数据验证:

bash

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 \ --topic retail_db.retail_db.products \ --from-beginning \ --max-messages 3

预期输出(JSON 格式,price为数字):

json

{"before":null,"after":{"product_id":1,"product_name":"iPhone 15","category":"Electronics","price":6999.0,"stock":50},"source":{...},"op":"r","ts_ms":...}

八、踩坑总结

坑关键词核心解决方案
1cleanup.policy=compact手动重建connect-offsets,指定compact
2Address already in uselsof -i:8083→kill -9
3The db history topic is missing手动创建schema-changes.retail_db,使用SCHEMA_ONLY_RECOVERY
4Decimal 乱码连接器配置添加decimal.handling.mode=double
5数据同步不到检查binlog-do-db是否包含目标数据库

相关新闻

  • HCIA-Datacom 课程学习心得
  • 基于PI外环-FCS-MPC内环的永磁同步电机双环调速系统仿真分析(Simulink仿真实现)
  • Tensor 是什么?PyTorch 里最重要的对象讲清楚

最新新闻

  • 跨越微伏级噪声鸿沟:硬核解析工业微弱传感器信号调理与高精度捕获实战
  • 为什么你的vmx文件压缩后反而增大?深度解析NTFS稀疏文件、零填充与TRIM指令协同失效原理
  • OpenHarness源码研究-4-AgentLoop对话引擎与工具系统
  • 如何深度掌控AMD Ryzen处理器:专业硬件调试工具完全指南
  • 机器人-混合关节架构
  • How To Secure A Linux Server:一份持续更新的服务器安全加固手册

日新闻

  • 2026年6月公司网站搭建最新热门渠道测评:四大低成本/零代码平台对比+避坑
  • 【Linux】Linux arm 编译QT程序,出现expected “}“报错
  • 【MATLAB例程】四基站二维AOA定位与距离辅助增强对比仿真。基于角度观测和测距修正的固定目标平面定位精度分析

周新闻

  • Windows字体自定义终极方案:No!! MeiryoUI完全指南
  • Deepin Boot Maker:告别命令行,3分钟制作Linux启动盘的智能解决方案
  • Plain Craft Launcher 2:重新定义你的Minecraft游戏体验

月新闻

  • 2026年6月公司网站搭建最新热门渠道测评:四大低成本/零代码平台对比+避坑
  • 【Linux】Linux arm 编译QT程序,出现expected “}“报错
  • 【MATLAB例程】四基站二维AOA定位与距离辅助增强对比仿真。基于角度观测和测距修正的固定目标平面定位精度分析

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号