跳到主要内容

Kafka 数据中转校验

案例简述

CloudCanal 支持 MySQL -> Kafka完整的传输链路。

Kafka是消息系统,不支持SQL查询数据。针对这种异构数据源,我们如何来校验源对端数据的一致性呢?

本案例将通过 CloudCanal 先将Kafka数据再次回流到MySQL, 最后 检验两个 MySQL 的数据是否一致,即可间接地对比数据一致性。本案例使用 DebeziumEnvelope 消息格式。

前置条件

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备好 2 个 MySQL 实例,1 个 Kafka 实例(本例使用自己搭建的两台 MySQL 5.6,阿里云 Kafka 2.2)
  • 两台 MySQL 实例准备两个相同的表 kafka_migration_test.table1
create schema kafka_migration_test collate utf8mb4_unicode_ci;
create table table1 (
`col1` varchar(25) null,
`col2` varchar(25) null,
`col3` varchar(45) null
);
  • 登录 CloudCanal 平台,添加 Kafka,MySQL image.png

  • kafka 自定义一个主题 topic_1,并创建一条 MySQL(1) -> Kafka 链路作为增量数据来源

    • 因为默认的消息格式是不带 Schema 字段的,这里的消息我们会反写到 MySQL 要利用 Schema 的信息,所以创建成功后需要在 源端配置页面 设置 SchemaInclude 参数为 trueimage.png
  • kafka 自定义一个主题 topic_2,并创建消费者组 migration,后续用于反写数据, 并创建一条 Kafka -> MySQL(2) 链路作为增量数据来源

任务创建 1 [ MySQL(1) -> Kafka ]

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库,并选择 DebeziumEnvelope 消息格式,和 topic_1 主题 image.png

  • 选择 数据同步,不勾选 全量数据初始化,其他选项默认 image.png

  • 选择需要迁移同步的表 table1和对应的 Kafka 主题 topic_1 image.png

  • 没有主键自动忽略,方便后序生成数据 image.png

  • 确认创建任务,创建成功后在源端配置页面设置 SchemaInclude 参数为 true image.png

  • 然后点击生效配置,确认重启 image.png

任务创建 2 [ Kafka -> MySQL(2) ]

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库 image.png

  • 选择 数据同步,其他选项默认

  • 选择消费的 Kafka 主题 topic_2和需要迁移同步的表 table1 image.png

  • 确认创建任务,没有主键自动忽略,方便后序生成数据

数据验证

  • 程序造数据,向 MySQL(1) 生成数据,MySQL(1) -> Kafka(topic_1) -> Kafka(topic_2) -> MySQL(2)

  • Kafka(topic_1) 数据会下沉到 topic_2 主题

  • 任务正常运行一段时间后,停止造数据,等待数据全部同步完成

  • 创建 MySQL 到 MySQL 的数据准确校验 image.png

  • 选着数据校验功能,其他的默认 image.png

  • 数据校验 OK image.png

总结

本文展示了 Kafka 的双向同步案例,验证了 CloudCanal 传输数据的准确性。