跳到主要内容

42 篇博文 含有标签「同步链路演示」

Data pipeline demonstration

查看所有标签

MongoDB Atlas 到 DocumentDB 数据同步

· 阅读需 4 分钟
John Li
John Li
Chief Executive Officer

简述

MongoDB Atlas 是一个全托管的云数据库即服务 (DBaaS),由 MongoDB 官方提供,支持在主流云平台(AWS、Azure、GCP)上快速部署、扩展和管理 MongoDB 集群,为用户提供高可用、安全合规的数据库托管服务。

本文主要介绍 CloudCanal 如何快速构建一条 MongoDB Atlas 到 DocumentDB 数据迁移同步链路。链路特点包括:

  • 基于 ChangeStream 增量订阅方式
  • 支持索引迁移

技术点

基于 ChangeStream 增量订阅方式

CloudCanal 利用 MongoDB Change Stream 的能力来实时订阅数据库的数据变更,当数据库中的数据发生 DML 等操作时,MongoDB Change Stream 会立即将这些 DML 事件,以流的形式实时推送给 CloudCanal。

CloudCanal 在消费 ChangeStream 事件时,会持久化保存其同步位点。当任务因异常或重启而中断后,CloudCanal 会将位点发送给 MongoDB,从而实现从上一次中断的位置 无缝续传,保证数据变更事件的“不重不漏”。

支持索引迁移

CloudCanal 提供对 MongoDB 索引的迁移支持,确保目标端数据库 继承与源端一致的查询性能优化,保障业务的无缝过渡。

信息

由于 DocumentDB 不支持哈希(Hashed)索引,CloudCanal 会在迁移时自动识别并忽略此类索引,从而保障其余所有兼容索引能够成功迁移,确保任务的平滑进行。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 准备数据源

  1. 登录 AWS 管理控制台并启动一台 EC2 实例,启动时开启公网以便 CloudCanal 连接。

  2. 切换至 Amazon DocumentDB 管理控制台,在左侧面板点击 参数组,创建参数组:

    1. 填写 新集群参数组名描述 后,点击 创建

    1. 进入刚创建的参数组,选择 tls 参数,点击 编辑,设置为 disabled,保存生效。

  3. 在左侧面板点击 集群,创建集群:

    1. 连接 部分中,选择 连接到 EC2 计算资源,并选择第一步创建的 EC2。

    1. 高级设置 > 集群选项 中,切换集群参数组为第二步创建的参数组。

    1. 其他所有选项保持默认,并点击 创建

步骤 2: 添加数据源

登录 CloudCanal 平台,点击 数据源管理 > 添加数据源,分别添加 2 个数据源。

  1. 添加 MongoDB Atlas 集群实例时,需设置额外参数 isAtlas 为 true。

  2. 添加 DocumentDB 数据源时,需要在 Sidecar 容器中创建 SSH 隧道,参考以下命令。

ssh -i "ec2Access.pem" -L 27017:sample-cluster.node.us-east-1.docdb.amazonaws.com:27017 ubuntu@ec2-34-229-221-164.compute-1.amazonaws.com -N 

-- 使用 SSH 隧道时,建议使用集群端点连接到集群

创建 SSH 隧道后,连接 localhost:27017 即可。

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务
  2. 配置源和目标数据源。
    1. 选择源和目标实例,并分别点击 测试连接
    2. 在源端实例下方 高级配置 中选择 是否迁移索引:是 / 否。

  1. 选择 数据同步 并勾选 全量初始化

  2. 点击 确认创建

信息

任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

MongoDB 源端的任务创建会有以下几个步骤:

  • 结构迁移
  • 分配任务执行机器
  • 创建任务状态机
  • 完成任务创建
  1. 任务启动,自动进行流转。
信息

当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

  • 结构迁移: MongoDB 源端的集合/索引将会迁移到对端,如果同名集合/索引在对端已经存在,则会忽略。
  • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
  • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简要介绍了使用 CloudCanal 实现 MongoDB Atlas 到 DocumentDB 数据迁移同步,加速数据的自由流动,满足多样的业务需求。

TDengine 到 MySQL 数据同步

· 阅读需 5 分钟
Zoe
Zoe
Docusaurus maintainer

简述

TDengine 是一款开源、高性能、云原生的时序数据库,专为物联网、车联网、工业互联网、金融、IT 运维等场景优化设计。在工业自动化的时代,时序数据库在电力、轨道交通、智能制造等领域有着广泛的应用。

MySQL 是全球广泛使用的开源关系型数据库,能够高效处理大量数据和复杂查询需求,并且具有较强的稳定性和可靠性。

本文主要介绍如何通过 CloudCanal 实现 TDengine 到 MySQL 数据迁移同步。

应用场景

  • 数据备份与归档:将 TDengine 的数据迁移到 MySQL 作为备份或归档,确保数据长期保存和恢复,提升数据的安全性和高可用性。
  • 复杂查询:MySQL 支持复杂 SQL 查询和事务处理,适合需要对时序数据进行深度分析或关联查询的场景。
  • 数据集成与共享:企业通常会同时使用多种数据库,将 TDengine 的数据同步到 MySQL,便于将时序数据与其他业务数据进行关联分析。
  • 数据分析:将 TDengine 的数据同步到 MySQL 后,可再通过 CloudCanal 将数据同步到其他分析型数据库或数仓,支持更复杂的数据分析和操作,满足更多业务需求。

技术点

增量数据同步整体流程

CloudCanal 基于 Query Topic 实现 TDengine 到 MySQL 的数据同步,同步流程如下:

  1. 创建 Topic,通过 Topic 订阅 TDengine 数据库的变更事件(无法捕获 DELETE 事件)。
  2. 执行增量数据同步。
  3. 捕获变更事件,记录表级 offset 位点。

表级别多位点

CloudCanal 支持 TDengine 源端表级别多位点,即可以在表级别进行多位点的配置,确保每个表能够消费各自对应的增量同步位点。位点的具体体现为:

[
{
"db": "us_power",
"table": "s1",
"topic": "canalt7g262cm6jy_increment_us_power_s1",
"offset": 1010,
"vgroup": 3,
"timestamp": 1715828416114
},
{
"db": "us_power",
"table": "s2",
"topic": "canalt7g262cm6jy_increment_us_power_s2",
"offset": 2093,
"vgroup": 3,
"timestamp": 1715828311123
},
...
]

纳秒级时间戳同步

TDengine 最高支持纳秒级 Timestamp 类型,而 MySQL 最高支持微秒级 Timestamp 类型。CloudCanal 支持纳秒级时间戳同步,将 TDengine 纳秒级时间戳转换为 MySQL BIGINT 类型数据。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源。

  3. 选择 数据同步 并勾选 全量初始化

    信息

    先取消勾选 自动启动任务,后续仍需要修改个别参数。

  4. 选择需要同步的表。

  5. 选择表对应的列。

    信息

    如果是超级表增量同步,可以点击 操作 > 子表订阅过滤条件,设置子表订阅过滤条件,具体可参考 TDengine Query Topic,默认订阅所有子表。

    如果需要进行纳秒级时间戳同步,需在对端手动创建表,源端 Timestamp 列要映射到对端 BIGINT 类型列。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    TDengine 源端的任务创建会有以下几个步骤:

    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 进入任务详情页,点击右上角 功能列表 > 修改任务参数,修改以下任务参数:

    • 源端参数 srcTimezone:源端时区,默认 UTC,需要与源端时区保持一致。
    • 源端参数 supportTimestampToEpochNano:是否开启 Timestamp-Number 数值转换,默认 false。
    • 对端参数 dstTimezone:对端时区,需要与对端时区保持一致。

8. 启动任务,CloudCanal 会自动进行任务流转,其中的步骤包括:

  • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
  • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

TDengine 适用于高吞吐的时序数据存储与查询,而 MySQL 适用于事务处理和复杂查询。使用 CloudCanal 进行 TDengine 到 MySQL 数据迁移同步,能够兼顾时序数据的高效存储与业务数据的灵活分析,从而实现时序数据价值的最大化。

Redis 到 Redis 数据同步

· 阅读需 3 分钟
John Li
John Li
Chief Executive Officer

简述

本文主要介绍如何通过 CloudCanal 实现 Redis 到 Redis 数据迁移同步。

技术点

基于 PSync 协议同步

CloudCanal 基于 PSync 协议实现 Redis 到 Redis 的数据同步,同步流程如下:

  1. CloudCanal 与 Redis Master 建立 Socket 长连接。
  2. 发送 Auth 命令验证身份(如果有)。
  3. CloudCanal 向 Redis Master 发送 PSync 命令,伪装成一个 Redis Slave 节点。
  4. Redis Master 节点持续向 CloudCanal 伪装的 Redis Slave 节点推送 二进制数据流
  5. CloudCanal 将二进制数据流解析为 Redis 命令,发送给目标端执行。

redis_sync_redis_1

双向同步防循环能力

CloudCanal 采用辅助命令实现 Redis 之间的双向同步防循环。

当收到正常指令,CloudCanal 计算其 hash 值,构建辅助指令 Key,反向查询辅助指令是否存在,如果存在则为循环。随后,CloudCanal 反向删除对应辅助命令,如果删除成功,过滤即可。

对于对端写入以及源端查询辅助指令,CloudCanal 进行了批量和多线程优化,同步性能得到有效提升。

redis_sync_redis_2

过滤超大 Key 能力

在实际的 Redis 业务使用中,由于历史遗留问题或维护不当,可能会出现一些体积过大的 Key。

在数据同步场景下,这些超大 Key 容易引发任务同步异常。CloudCanal 提供了灵活的任务参数配置,支持过滤超出指定大小的 Key。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源。

    1. 选择源和目标实例,并分别点击 测试连接
    2. 在源端实例下方 高级配置 中选择 启用 DB 映射 :是 / 否。
      信息

      当启用 DB 映射时,需要确保源端 Redis 实例与目标端 Redis 实例 DB 数量相同。

  3. 选择 数据同步 并勾选 全量初始化

  4. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 ** 详情** 即可查看。

    Redis 源端的任务创建会有以下几个步骤:

    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  5. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 Redis 到 Redis 数据迁移同步,打通数据流动的渠道,实现端到端的精准数据传输。

OceanBase(Oracle 租户) 到 OceanBase(MySQL 租户) 数据同步

· 阅读需 3 分钟
Barry
Chief Technology Officer

简述

CloudCanal 4.4.0.0 版本开始支持 OceanBase (Oracle 租户) 作为源/对端的数据迁移能力。本篇文章主要介绍如何使用 CloudCanal 构建一条 OceanBase(Oracle 租户)到 OceanBase(MySQL 租户)的数据同步链路(以下简称 ObForOracle 和 OceanBase)。

技术点

订阅增量日志

ObForOracle 增量数据变更的订阅依赖其官方提供的日志代理组件 oblogproxy。CloudCanal 不断接收并解析 oblogproxy 推送的增量事件,最终写入对端。

前提条件

ObForOracle 版本为 3.0、3.1、3.2。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

  1. 登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

  2. 数据库类型选择 ObForOracle,并填写网络地址和认证信息。

    信息
    • 网络地址格式为 ip1:sql_port1;ip2:sql_port2
    • 多个 Root Server 用英文分号分隔。
    • 支持填写 OBProxy 地址,格式为 proxy_ip:proxy_port
  3. 配置 额外参数

    参数描述
    obLogProxyHostoblogproxy server 的地址,格式为 ip:port,默认端口统一为 2983。如果需要订阅增量数据,该参数不可为空。
    clusterUrl可以为空。为空时订阅增量数据时会使用 root server list。不为空时订阅增量数据会优先使用 cluster url。
    rpcPortList订阅增量数据时,该参数不可为空,默认端口为 2882。如果网络地址包含多个 Root Server(假设为 3 个),此处填写格式为 2882:2882:2882。
    tenantOceanBase Oracle 模式的租户名称,不可为空。若为空,使用 OceanBase 驱动在连接时会默认连接到 sys 租户(MySQL 模式)。
    clusterNameOceanBase 集群名称,可以为空。用于拼接连接串(用户名@租户名#集群名)以通过 OceanBase 官方驱动连接数据库。

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源,并分别点击 测试连接

  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的表。

  5. 选择表对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好表结构。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    ObForOracle 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: ObForOracle 源端的表定义将会迁移到对端,如果同名表在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 ObForOracle 到 OceanBase 数据迁移同步,打通数据流动的渠道,实现端到端的精准数据传输。

HANA 到 MySQL 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

SAP HANA 是一款列式存储的内存数据库系统,相比传统的硬盘存储数据库,数据处理速度更快,支持联机分析处理(OLAP)和联机事务处理(OLTP),常常用于实时分析处理、应用程序开发等场景。

MySQL 是在全球广泛使用的开源关系型数据库,历史悠久,运行稳定可靠,简便易用,灵活可扩展,因而受到许多组织的青睐。常用于 Web 应用的后端数据库、企业资源规划(ERP)系统的数据库、开发和测试数据库等。

本篇文章主要介绍如何使用 CloudCanal 构建一条 HANA 到 MySQL 的数据同步链路。

技术点

数据同步整体流程

CloudCanal 实现 HANA 源端增量数据同步,主要使用其触发器捕获变更事件。整体流程如下:

  • 安装触发器,通过触发器捕获增量变更数据
  • 记录位点,记录增量数据同步的起点
  • 执行全量数据迁移
  • 执行增量数据同步

表级别 CDC 表

CloudCanal 实现了表级别的 CDC 表设计,每张源表都对应一张 CDC 表,CDC 表的结构仅在原表结构的基础上增加了几个位点字段,用于增量同步。

相比于所有数据写入单一 CDC 表,表级别的 CDC 表更加独立,方便多次订阅表。此外,触发器只需要执行 INSERT 语句,因此对于字段较多的表也能够快速执行。扫描消费 CDC 数据时,不需要做额外的处理,消费更简单。

image.png

原表

CREATE COLUMN TABLE "SYSTEM"."TABLE_TWO_PK" (
"ORDERID" INTEGER NOT NULL ,
"PRODUCTID" INTEGER NOT NULL ,
"QUANTITY" INTEGER,
CONSTRAINT "FANQIE_pkey_for_TA_171171268" PRIMARY KEY ("ORDERID", "PRODUCTID")
)

CDC 表

CREATE COLUMN TABLE "SYSTEM"."SYSTEMDB_FANQIE_TABLE_TWO_PK_CDC_TABLE" (
"ORDERID" INTEGER,
"PRODUCTID" INTEGER,
"QUANTITY" INTEGER,
"__$DATA_ID" BIGINT NOT NULL ,
"__$TRIGGER_ID" INTEGER NOT NULL ,
"__$TRANSACTION_ID" BIGINT NOT NULL ,
"__$CREATE_TIME" TIMESTAMP,
"__$OPERATION" INTEGER NOT NULL
);
-- other index

触发器 (INSERT)

CREATE TRIGGER "FANQIE"."CLOUD_CANAL_ON_I_TABLE_TWO_PK_TRIGGER_104" AFTER INSERT ON "SYSTEM"."TABLE_TWO_PK" REFERENCING NEW ROW NEW FOR EACH ROW 
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
IF 1=1 THEN
INSERT INTO "SYSTEM"."SYSTEMDB_FANQIE_TABLE_TWO_PK_CDC_TABLE" (__$DATA_ID, __$TRIGGER_ID, __$TRANSACTION_ID, __$CREATE_TIME, __$OPERATION, "ORDERID","PRODUCTID","QUANTITY")
VALUES(
"SYSTEM"."CC_TRIGGER_SEQ".NEXTVAL,
433,
CURRENT_UPDATE_TRANSACTION(),
CURRENT_UTCTIMESTAMP,
2,
:NEW."ORDERID" ,
:NEW."PRODUCTID" ,
:NEW."QUANTITY"
);
END IF;
END;

表级别任务位点

在表级别 CDC 表模式下,同步增量数据时,每个表都有自己的位点,原有的单一位点无法满足这种同步需求。

因此,CloudCanal 引入了表级别的增量同步位点,确保每个表能够消费各自对应的增量同步位点。位点的具体体现为:

[
{
"db": "SYSTEMDB",
"schema": "FANQIE",
"table": "TABLE_TWO_PK",
"dataId": 352,
"txId": 442441,
"timestamp": 1715828416114
},
{
"db": "SYSTEMDB",
"schema": "FANQIE",
"table": "TABLE_TWO_PK_2",
"dataId": 97,
"txId": 11212,
"timestamp": 1715828311123
},
...
]

这样的设计有以下好处:

  • 位点精细控制:每个表都有自己的增量同步位点,在增量任务中可以重新消费特定表中的增量数据,而无需消费所有表的数据,实现更加精细的控制,减少不必要的数据传输和处理,提高同步效率。

  • 数据并行处理:由于每个表有自己的位点,可以实现表级别的并行处理。不同表的增量数据可以同时处理,避免了单一位点导致的串行处理瓶颈,从而加快了同步速度。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源。

    1. 选择源和目标实例,并分别点击 测试连接
    2. 在目标实例下方 高级配置 中选择源端 CDC 表模式:单 CDC 表 / 表级 CDC 表
  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的表。

  5. 选择表对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好表结构。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    HANA 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 初始化 HANA CDC 表以及对应触发器
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: HANA 源端的表定义将会迁移到对端,如果同名表在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 HANA 到 MySQL 数据迁移同步,打通数据流动的渠道,实现端到端的精准数据传输。

Oracle 到 Elasticsearch 数据同步

· 阅读需 5 分钟
Zoe
Zoe
Docusaurus maintainer

简述

Elasticsearch 是一个分布式的实时搜索与数据分析引擎,具有强大的可扩展性和高度的灵活性。CloudCanal 对于 Elasticsearch 的支持经过了多轮迭代,支持版本从 6.x 和 7.x 一路扩展到 8.x,并适配了其丰富多样的 API。

同时 CloudCanal 对 Oracle 源端同步技术进行了多处优化,大幅提升了数据同步的稳定性和可靠性。

本文主要介绍如何快速构建一条 Oracle 到 Elasticsearch 的数据同步链路。

技术点

Oracle 基于 LogMiner 的实时同步

CloudCanal 在 Oracle 源端的增量同步中,通过 LogMiner 分析 redo 日志,并结合多轮优化,显著提升了同步的稳定性与效率,已在用户生产环境中成功验证。主要特点包括:

  • Oracle RAC 支持与优化:专为 Oracle RAC 场景优化,确保数据同步的完整性和一致性。
  • 标准化 LogMiner 解析:默认采用 LogMiner 标准方法(ADD_FILE)解析 redo 日志,并提供 CONTINUOUS_MINE 作为补充(取决于 Oracle 版本)。
  • 全事件消费模式:支持全事件消费,保障同步过程中的稳定性。
  • 超大事务处理:本地缓存超大变更数据,处理源端 Oracle 超过百万级的变更。
  • 位点回溯:在消费出错时,支持使用时间戳、SCN 回溯位点,增强容错能力。
  • 数据校验与订正:提供定时的数据校验和订正机制,确保数据质量稳定。

经过以上优化,CloudCanal 在 Oracle 同步场景中表现更加稳健可靠,适应各种复杂的数据同步需求。

自动创建 Elasticsearch 索引和映射结构

CloudCanal 迁移同步任务支持自动将源端数据库表结构映射成 Elasticsearch 索引,该过程中允许用户在 级别上,个性化设置自己需要的索引和映射(Mapping)结构。这些设置包括:

  • 每个列可以指定是否需要索引。
  • 可以对 TEXT 类型的列设置 Elasticsearch 映射中的分词器(标准分词器)。
  • 自定义设置索引分片数、副本数。

生成内置 _id 并指定路由字段

在写入 Elasticsearch 时,_id 字段用于唯一标识每个文档(Doc)。数据路由基于 _id 值,确保数据分片定位一致。CloudCanal 数据同步默认遵循以下策略生成 _id:

  • 单主键表:对于单一主键表,默认使用源端关系表的主键列值作为 _id。
  • 多主键表:对于多主键表,通过分隔符 $ 连接多个主键列的值,组合成唯一 _id(可以通过参数 pkSeparator 自定义分隔符)。
  • 无主键表:若表无主键,则将所有列的值用 $ 连接,生成唯一的 _id。

此默认设置确保了每条数据在 Elasticsearch 中具有唯一的标识,同时提供稳定的分片路由。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源。

    1. 选择源和目标实例,并分别点击 测试连接
    2. 在源端实例下方 高级配置 中选择增量模式:LogMiner / 物化视图
    3. 在目标实例下方 高级配置 中选择时区配置:+08:00(默认)。
  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的表。

  5. 选择表对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好索引。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    Oracle 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 初始化 Oracle 表级补全日志
    • 初始化 Oracle LogMiner 位点
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: Oracle 源端的表定义将会迁移到对端,如果同名索引在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 Oracle 到 Elasticsearch 数据迁移同步,帮助企业增强数据分析及实时搜索能力,提升数据的价值。

MySQL 到 StarRocks 数据同步

· 阅读需 3 分钟
Zoe
Zoe
Docusaurus maintainer

简述

简述

本文介绍如何通过 CloudCanal ,五分钟内创建一条长期稳定运行的 MySQL -> StarRocks 实时数据迁移同步链路。

技术点

写入StarRocks采用StreamLoad导入方式

CloudCanal 采用了 StarRocks StreamLoad 方式进行导入,源端数据和变更转成字节流,以通过 HTTP 协议批量写入 StarRocks。

基于 StreamLoad 方式,写入对端的操作均为 INSERT,CloudCanal 自动将 INSERT / UPDATE / DELETE 转成 INSERT 语句,并填入 __op 值(删除标识符),StarRocks 将自动进行数据合并。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 选择源和目标数据源,并分别点击 测试连接

  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的表。

  5. 选择表对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好表结构。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    MySQL 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 初始化位点
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: Hana 源端的表定义将会迁移到对端,如果同名表在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 MySQL 到 StarRocks 数据迁移同步,助力企业快速构建数据管道,增强数据分析能力。

Kafka 到 Kafka 数据同步

· 阅读需 3 分钟
Barry
Chief Technology Officer

简述

Kafka 为处理实时数据提供了一个统一、高吞吐、低延迟的平台,其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。因此实现 Kafka 到 Kafka 的数据同步也成了一项重要工作。

本篇文章主要介绍如何使用 CloudCanal 构建一条 Kafka 到 Kafka 的数据同步链路。

技术点

消费者消息推送

在任务创建后,CloudCanal 会自动创建消费组,并订阅需要同步消息的 Topic。CloudCanal 从源端拉取到消息后,会将消息推送到目标端。

心跳机制

在源端 Kafka 未产生消息时,CloudCanal 便无法正常感知消息的延时时间。

我们采用心跳模式解决这个问题。在 打开 Kafka 源端心跳 后,CloudCanal 会监测所有分区的消费位点,若所有分区的最新的位点与当前位点差值均小于设定的最长位点间隔(通过 dbHeartbeatToleranceStep 参数设置),则会产生一条包含当前系统时间的心跳记录。CloudCanal 在消费到该记录后,会根据其包含的时间计算延迟。

操作示例

步骤1: 配置 Kafka 权限

参考 Kafka 需要的权限 文档,设置 CloudCanal 需要的账号权限。

步骤2: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 3: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤4: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 选择源和目标数据源,并分别点击 测试连接

  3. 选择同步的 消息格式

    信息

    倘若没有特定的消息格式,请选择 原始消息格式

  4. 选择 增量同步

  5. 选择需要同步的 Topic。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    Kafka 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: Kafka 源端会自动为对端创建 Topic,如果目标 Topic 在对端已存在,则会忽略。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 Kakfa 到 Kafka 数据迁移同步,助力企业快速构建数据管道,增强数据分析能力。

PostgreSQL 到 PostgreSQL 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

PostgreSQL 是一个历史悠久且广泛使用的数据库,不仅具备标准的关系型数据库能力,还具有相当不错的复杂 SQL 执行能力。用户常常会将 PostgreSQL 应用于在线事务型业务,以及部分数据分析工作,所以 PostgreSQL 到 PostgreSQL 数据迁移同步成为了一个重要工作。

本文主要介绍如何通过 CloudCanal 实现 PostgreSQL 到 PostgreSQL 数据迁移同步。

技术点

使用 PostgreSQL 逻辑复制机制

CloudCanal 使用 PostgreSQL 逻辑复制 (Logical) 机制实现对其增量数据的订阅。

发布(Publication)和同步任务一一关联,任务修改订阅后也会自动增减 Publication 中的表。

Trigger 方式实现 DDL 同步

DDL 同步对于在线数据库灾备等场景必不可少,但 PostgreSQL 逻辑复制并未提供 DDL 操作。

为此,我们采用了行业通用的 Trigger 方式捕获 DDL 操作,并且自动写入一张普通表 cc_pg_ddl_capture_tab。CloudCanal 正常订阅该表即可获取 DDL 操作,和普通 DML 增量订阅机制一致,可准确地保障 DDL 和涉及表 DML 事件顺序。

pg_ddl_capture

双向同步防循环能力

数据库多活能力要求常常出现在一些在线数据库核心应用场景中,对于数据同步工具,主要需要做到同步数据无循环。

对于 PostgreSQL 之间的多活同步防循环,我们采用同一事务标记操作方式实现。

通过一张额外的标记表,同步任务写入正常业务数据到对端时,在同一个事务中对标记表做操作。当任务从 PostgreSQL 中获取到该标记表事件时,则忽略当前事务所有操作,即达到防循环目的。

pg_loop_sync

操作示例

步骤 1: 修改 PostgreSQL 日志级别

  1. 参考 PostgreSQL 需要的权限 文档创建用户和授予权限。
  2. 设置 PostgreSQL wal_levellogical
信息

自建数据库可修改 postgresql.conf,设置 wal_level=logical 和 wal_log_hints = on。

  1. 设置同步账号网络权限。
信息

自建数据库可修改 pg_hba.conf,添加以下配置:

host replication 同步账号 CIDR网段 md5
host 同步库 同步账号 CIDR网段 md5
host postgres 同步账号 CIDR网段 md5

  1. 重启 PostgreSQL 生效。

步骤 2: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 3: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 4: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 选择源和目标数据源,并分别点击 测试连接

  3. 选择 数据同步 并勾选 全量初始化

    信息

    勾选 同步 DDL 将会自动创建对应的 DDL 捕获 trigger 和 event,需要较高权限。

  4. 选择需要同步的索引。

  5. 选择索引对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好表即可。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    PostgreSQL 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 初始化 DDL 捕获触发器和表
    • 初始化 PostgreSQL 增量复制位点
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动步骤流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: 将 PostgreSQL 源端的表结构迁移到对端,如果同名表在对端已存在,则忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端,支持断点续传。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 PostgreSQL 到 PostgreSQL 数据迁移同步。

PostgreSQL 作为流行的关系型数据库,通过 CloudCanal 数据迁移同步加持,让数据进出更加便利和顺畅。

Hana 到 PostgreSQL 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

本篇文章主要介绍如何使用 CloudCanal 构建一条 Hana 到 PostgreSQL 的数据同步链路。

技术点

表级别 CDC 表

CloudCanal 在实现 Hana 源端增量同步时,最初采用的是单 CDC 表的模式,即所有订阅表的增量数据(插入、更新、删除)通过触发器统一写入同一张 CDC 表。这样设计的初衷是简化架构和实现方式,但是同时也带来了一些问题。

  • 触发器执行效率低:采用单个 CDC 表时,我们将订阅表的字段值拼接成 JSON 字符串。虽然这种方式统一,但增加了触发器的复杂性。当字段数量超过 300 个时,触发器效率显著下降,影响同步性能。

  • 增量数据积压:所有订阅表的变更数据集中写入单个 CDC 表,当 A 表增量数据较多而 B 表较少时,混合写入会导致无法及时处理 B 表数据,造成 B 表数据积压,影响同步及时性。

后续我们对于单 CDC 表模式进行了优化。本次优化实现了表级别的 CDC 表设计,每张源表都对应一张 CDC 表,CDC 表的结构仅在原表结构的基础上增加了几个位点字段,用于增量同步。

原表

CREATE COLUMN TABLE "SYSTEM"."TEST" (
"TEST1" INTEGER NOT NULL ,
"TEST2" INTEGER NOT NULL ,
"TEST3" INTEGER,
CONSTRAINT "TEST_KEY" PRIMARY KEY ("TEST1", "TEST2")
)

CDC 表

CREATE COLUMN TABLE "SYSTEM"."SYSTEM_TEST_CDC_TABLE" (
"TEST1" INTEGER,
"TEST2" INTEGER,
"TEST3" INTEGER,
"__$DATA_ID" BIGINT NOT NULL ,
"__$TRIGGER_ID" INTEGER NOT NULL ,
"__$TRANSACTION_ID" BIGINT NOT NULL ,
"__$CREATE_TIME" TIMESTAMP,
"__$OPERATION" INTEGER NOT NULL
);
-- other index

触发器 (INSERT)

CREATE TRIGGER "SYSTEM"."CLOUD_CANAL_ON_I_TEST_TRIGGER_TEST" AFTER INSERT ON "SYSTEM"."TEST" REFERENCING NEW ROW NEW FOR EACH ROW 
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
IF 1=1 THEN
INSERT INTO "SYSTEM"."SYSTEM_TEST_CDC_TABLE" ("__$DATA_ID", "__$TRIGGER_ID", "__$TRANSACTION_ID", "__$CREATE_TIME", "__$OPERATION", "TEST1", "TEST2", "TEST3")
VALUES(
"SYSTEM"."CC_TRIGGER_SEQ".NEXTVAL,
433,
CURRENT_UPDATE_TRANSACTION(),
CURRENT_UTCTIMESTAMP,
2,
:NEW."TEST1" ,
:NEW."TEST2" ,
:NEW."TEST3"
);
END IF;
END;

采用这种方式有以下几个好处:

  • 表级别 CDC 表更加独立,方便进行多次订阅。
  • 触发器只需要执行 INSERT 语句,因此对于字段较多的表也能够快速执行。
  • 扫描消费 CDC 数据时,不需要做额外的处理,消费更简单。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源。

    1. 选择源和目标实例,并分别点击 测试连接
    2. 在目标实例下方 高级配置 中选择源端 CDC 表模式:单 CDC 表 / 表级 CDC 表
  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的表。

  5. 选择表对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好表结构。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    Hana 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 初始化 Hana CDC 表以及对应触发器
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: Hana 源端的表定义将会迁移到对端,如果同名表在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文介绍了如何使用 CloudCanal 进行 Hana 到 PostgreSQL 数据迁移同步,操作简便的同时带来高效的数据同步体验,大大加快了企业的数据流通与数据平台构建。

Elasticsearch 到 Elasticsearch 数据同步

· 阅读需 4 分钟
John Li
John Li
Chief Executive Officer

简述

Elasticsearch 是一款流行的搜索引擎,和关系型数据库、缓存数据库、实时数据仓库、消息中间件共同组成应用的现代化数据堆栈。

写入数据到 Elasticsearch 相对容易,但是如何将其数据实时同步出来,则要困难一些。

本文主要介绍如何通过 CloudCanal 结合 Elasticsearch 增量数据捕获插件,实现 Elasticsearch 到 Elasticsearch 数据迁移同步。

技术点

使用 Elasticsearch 插件机制

Elasticsearch 并没有明确给出如何实时获取其中的变更数据,但是其插件 API IndexingOperationListener 可订阅到 INDEX 事件和 DELETE 事件,前者即 INSERT 或 UPDATE 操作, 后者即传统意义上的 DELETE 操作。

明确增量数据的获取机制,如何让下游获取这些数据则成了接下来的问题。

我们采用了一个单独索引 cc_es_trigger_idx 作为增量数据的容器。

这个方式有几个好处:

  • 不依赖第三方组件(如消息中间件)。
  • Elasticsearch 索引管理方便。
  • 和 CloudCanal 其他数据源 Trigger 方式增量数据获取风格一致,机制代码可重用。

es_es_sync

索引 cc_es_trigger_idx 结构如下, 其中 row_data 保留 INDEX 操作变更后数据,pk 则保存文档 _id。

{
"mappings": {
"_doc": {
"properties": {
"create_time": {
"type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ssSSS"
},
"event_type": {
"type": "text",
"analyzer": "standard"
},
"idx_name": {
"type": "text",
"analyzer": "standard"
},
"pk": {
"type": "text",
"analyzer": "standard"
},
"row_data": {
"type": "text",
"index": false
},
"scn": {
"type": "long"
}
}
}
}
}

Trigger 数据扫描机制

CloudCanal 使用 Elasticsearch 插件产生的增量数据,只要按照 cc_es_trigger_idx 索引 scn 字段顺序批量扫描消费即可。

消费代码风格和 SAP Hana 源端保持一致。

插件开源

Elasticsearch 插件加载会严格识别插件所依赖的三方包,如和 Elasticsearch 本体三方依赖包重复或版本不一致,则无法加载,所以插件需要和 Elasticsearch 版本保持一致(包括小版本)。

鉴于发布大量预编译包可操作性差,同时为了插件能够有更加广泛的使用,我们将插件开源在了 GitHub 上。

操作示例

步骤 1: 源端 Elasticsearch 安装插件

参考 Elasticsearch 源端同步准备 文档安装增量数据捕获插件。

步骤 2: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 3: 添加数据源

登录 CloudCanal 控制台 > 数据源管理 > 新增数据源

步骤 4: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 选择源和目标数据源,并分别点击 测试连接

  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的索引。

  5. 选择索引对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好索引即可。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    Elasticsearch 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 初始化 ES 触发器和位点
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: Elasticsearch 源端的索引映射定义将会迁移到对端,如果同名索引在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 Elasticsearch 到 Elasticsearch 数据迁移同步。

Elasticsearch 作为现代数据应用的重要组成部分,通过 CloudCanal 数据迁移同步加持,让数据进出更加便利和顺畅。

Hana 到 Starrocks 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

SAP HANA 是由 SAP 开发的一款内存列式数据库, 具有预测分析、空间数据处理、文本分析、文本搜索、流分析、图形数据处理等高级分析功能。

HANA 内存列式数据库特性,即启动后可以把所有数据载入内存,相比传统基于硬盘的数据库,性能提升10~10,000倍。

HANA 一般内置在 SAP ERP 系统中提供服务,在制造业应用广泛。

现如今企业尝试建立统一数据分析平台,SAP HANA 保存了ERP相关数据,如何实时同步 HANA 数据到数据平台成为困扰企业的一个难题。

CloudCanal 最新版本已支持 HANA 作为源端迁移同步数据到 StarRocks 来构建实时数仓, 本文简要介绍使用 CloudCanal 快速构建一个 HANA 到 StarRocks 数据迁移同步任务

技术要点

数据同步整体流程

CloudCanal 实现 HANA 增量数据同步主要使用其触发器捕获变更事件,整体流程如下:

  • 安装触发器,通过触发器捕获增量变更数据
  • 记录位点,记录增量数据数据同步的起点
  • 执行全量数据迁移
  • 执行增量数据同步

数据捕获触发器

触发器是一种自动触发执行的存储过程,它可以在数据变更前执行也可以在数据变更后执行,因为本质也是存储过程,所以存储过程支持的操作触发器均支持。

不同数据库对触发器的支持程度不同,HANA 的触发器支持监听 I(新增)/U(更新)/D(删除) 三种事件,因此数据的所有变更都可以通过触发器捕获。

安装触发器的方式与创建存储过程类似,即通过执行 SQL 创建触发器。

通过触发器实现增量数据同步,需要触发器捕获数据的I/U/D变更事件并写入增量 CDC 数据表,数据的变更事件最终都会写到增量 CDC 数据表,执行流程如下:

其他 HANA 同步方案

目前支持同步 HANA 数据的产品还有 Informatica、Qlik 等,实现方案也是通过触发器。

因为 HANA 的触发器不能监听 DDL 变更,因此 CloudCanal 与 Informatica、Qlik 一样,都不支持DDL同步。

操作示例

准备动作

添加数据源

  • 登录 CloudCanal ,数据源管理->添加数据源 添加数据源

  • 创建源端数据源, 选择自建数据源,选择 HANA 并填写相关信息

    默认数据库: 即需要同步的数据所在数据库,常见默认数据库:SYSTEMDB、HXE、DB0

    image.png

  • 创建目标端数据源,选择自建数据源,选择StarRocks,并填写相关信息

    Client地址: CloudCanal 用其查询库表表的元数据信息,对应 StarRocks QueryPort,默认端口为 9030

    额外参数 Http地址: StarRocks 接收 streamload 的 http 请求,此处可填写 BE 节点地址,默认端口为 8040 , 如需负载均衡也可直接填写 FE节点 地址和端口,FE节点默认端口 8030

    image.png

  • 数据源创建成功 image.png

任务创建

  • 任务管理 > 创建任务 image.png

  • 源端选择 HANA 数据源,目标端选择 StarRocks 数据源,分别点击测试连接按钮并设置数据库映射关系

  • 点击下一步 image.png

  • 选择 增量同步,并且勾选 全量初始化

  • 点击下一步 image.png

  • 选择订阅的表

  • 点击下一步 image.png

  • 配置列映射

  • 点击下一步 image.png

  • 点击创建任务 image.png

  • 任务创建成功并启动后,会自动执行结构迁移、全量迁移、增量同步 image.png

总结

本文简单介绍了如何使用 CloudCanal 进行 HANA 到 StarRocks 数据迁移同步。

StarRocks 作为新兴的实时数仓产品,为传统数据业务带去更加实时、一致的体验,让数据得到更加广泛的使用,CloudCanal希望助一臂之力,让数据流动更加平滑顺畅。

MySQL 到 GaussDB 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简介

CloudCanal 近期开放了 MySQL -> GaussDB for MySQL /OpenGauss 数据链路,本篇文章将简要概述 CloudCanal 对于 GaussDB for MySQL/OpenGauss 目标数据迁移同步的支持。

功能介绍

结构迁移类型自动处理与优化

不同数据库对于数据类型支持存在差异,CloudCanal 结构迁移时会进行类型自动转换与优化。

例如:在 MySQL 中可以定义的 VARCHAR(0) 数据类型,在 OpenGauss 中不支持,CloudCanal 结构迁移时会自动将源端 MySQL 的 VARCHAR(0) 类型映射为 VARCHAR(1)

自定义数据处理

用户在迁移、实时同步期间如需要对传输的数据行进行自定义的加工可以采用 CloudCanal 提供的自定义数据处理能力,这对于实时宽表构建、新增动态列、基于微服务、缓存的数据清洗等数据处理场景都非常有帮助。关于更多自定义数据的使用方式可以参考:数据处理插件使用方式

支持高性能写入模式

CloudCanal 中默认采用 OpenGauss的驱动通过JDBC的方式进行批量写入。如果用户对性能要求很苛刻,可以尝试开启基于Copy模式的高性能写入模式。在Copy写入模式下,写入性能相比采用JDBC的方式有很大的提升。

可视化创建

CloudCanal 创建 GaussDB for MySQL/OpenGauss 数据迁移同步任务是完全可视化的,通过获取数据库元数据,让用户在 浏览器页面上即可决定哪些库、表、列进行迁移同步等。

自动化流程

GaussDB for MySQL/OpenGauss 数据迁移同步任务创建后,CloudCanal 将自动流转各个阶段的任务,用户无需干涉,直达数据实时同步状态。

监控图表支撑

CloudCanal 为 GaussDB for MySQL/OpenGauss 数据迁移同步任务提供了多个实用监控指标,包括增量缓存RPS增量缓存延迟(ms)内存队列数据个数等,当调优任务性能或排查任务异常原因时,监控指标提供了很好的判断依据。

告警支持

CloudCanal 为 GaussDB for MySQL/OpenGauss 数据迁移任务提供了包括钉钉/企业微信/飞书/自定义等 webhook 类型告警,对于企业级客户,可额外选择邮件,以及短信告警,实时保障同步任务的高可用。

简单示例

本示例以将数据从 MySQL 数据库同步到 GaussDB for OpenGauss 数据库为操作案例,以便更好地说明 CloudCanal 在不同数据库之间进行数据同步的能力。

准备动作

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备好 MySQL 数据库(本例使用 8.0 版本)和 GaussDB for OpenGauss 数据库(本例使用 5.0 版本)
  • 登录 CloudCanal 平台 ,添加 GaussDB for OpenGauss 和 MySQL

任务创建

  • 任务管理 -> 新建任务

  • 测试连接并选择 源 和 目标 数据库

  • 点击下一步

  • 选择 数据同步,并勾选全量数据初始化以及开启一次性校验,其他选项默认

  • 选择需要迁移同步的表

  • 确认创建任务

  • 任务自动执行结构迁移、全量同步和增量同步,执行数据校验,结果显示数据校验通过

总结

本文主要介绍了 CloudCanal 支持 GaussDB for MySQL/OpenGauss 目标端数据迁移同步功能,通过这个能力,用户可以便利地将数据实时同步到 GaussDB for MySQL/OpenGauss 数据库,实现数据更广泛、更实时的应用。

SQL Server 到 StarRocks 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

本篇文章主要介绍如何使用 CloudCanal 构建一条 SQLServerStarRocks 的数据同步链路。

技术点

源端SQLServer基于CDC代理

当数据库启用 CDC 能力后,SQL Server 代理上会生成一个专门分析ldf文件的作业,再将具体的表启用 CDC, 则该作业开始持续分析文件中的变更事件到指定的表中。

image.png

写入StarRocks采用StreamLoad导入方式

CloudCanal 采用了 StarRocks StreamLoad 方式进行导入,源端数据和变更转成字节流,以通过 HTTP 协议批量写入 StarRocks。

基于 StreamLoad 方式,写入对端的操作均为 INSERT,CloudCanal 自动将 INSERT / UPDATE / DELETE 转成 INSERT 语句,并填入 __op 值(删除标识符),StarRocks 将自动进行数据合并。

准备工作

SQL Server源端准备工作

  • 源库需要启用 CDC 执行命令

    • 建议使用 sa 账号,启用 CDC 需要 sysadmin 权限
    • 切换数据库,可以采用以下命令(假设你的数据库名称为example)
    use example;
  • 执行启用 CDC 功能 可以执行如下命令

    exec sys.sp_cdc_enable_db
  • 准备一个 CloudCanal 同步账号,并为这个账号授权(db_ownerpublic 权限) image.png image.png

  • 确认 SQL SERVER 代理是启用状态 image.png

  • 源端 SQL Server 实例中待同步的表需具备主键(CloudCanal 在创建同步任务的时候会帮你自动选择有主键的表)

StarRocks 对端准备工作

  • StarRocks 最高支持版本为:2.4.0
  • Cloudcanal 添加StarRocks 数据源 image.png
    • Client 地址
      • MySQL 协议端口,用于查询元数据,对应 StarRocks QueryPort,默认为 IP:9030
    • Http 地址
      • Stream Load 导入数据用途,对应 StarRocks HttpPort,默认为 IP:8030

注意事项

  • SQL SERVER 作为源端结构迁移仅中支持 Schema、Table 迁移。

SQL Server -> StarRocks 的数据类型支持

CloudCanal 结构迁移和数据迁移同步时会自动进行数据类型映射。详情见下表:

SQL ServerStarRocks
BIGINTBIGINT
BINARYNot Supported
BITTINYINT
CHARCHAR
DATEDATE
DATETIMEDATETIME
DATETIME2DATETIME
DATETIMEOFFSETDATETIME
DECIMALDECIMAL
FLOATFLOAT
GEOGRAPHYSTRING
GEOMETRYSTRING
HIERARCHYIDNot Supported
IMAGENot Supported
INTINT
MONEYFLOAT
NCHARCHAR
NTEXTSTRING
NUMERICDECIMAL
NVARCHARVARCHAR
REALDOUBLE
ROWVERSIONLARGEINT
SMALLDATETIMEDATETIME
SMALLINTSMALLINT
SMALLMONEYFLOAT
SQL_VARIANTNot Supported
TEXTSTRING
TIMESTRING
TIMESTAMPLARGEINT
TINYINTSMALLINT
UNIQUEIDENTIFIERVARCHAR
VARBINARYNot Supported
VARCHARVARCHAR
XMLSTRING
sysnameVARCHAR

操作示例

前置条件

  • 登录ClouGence官网 下载私有部署版,使用参见快速上手文档

  • 准备一个 SQL Server 数据库,和 StarRocks 实例(本例分别使用自建 SQL Server 2016 和 StarRocks 2.4.0)

  • 登录 CloudCanal 平台 ,添加 SQL Server 和 StarRocks image.png

  • 创建一条 SQL Server -> StarRocks 链路作为增量数据来源

任务创建

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库

  • 点击下一步 image.png

  • 选择 增量同步,其它默认 image.png

  • 此时如果 SQL Server 上数据库还没有启用 CDC 功能,则会在点击下一步的时候提示如何启用 CDC。只要按照提示的参考语句执行即可。 image.png

  • 选择需要迁移同步的 image.png image.png

  • 确认创建任务 image.png

  • 任务自动做结构迁移全量迁移增量同步 image.png

常见问题

数据不同步了都有那些情况?

  • SQL Server CDC 需要依赖 SQL Server 代理,首先要确定 SQL Server 代理服务是否启动
  • 表在启动 CDC 的时候会确定要捕获的列清单,此时如果修改列的类型可能会导致 CDC 中断。目前解决办法只能重建任务。
  • 增/减 同一个列名的列,对一个列删除后在增加。虽然 CDC 表中字段依然存在但是也会导致整个 CDC 中断。

什么情况下会影响稳定的数据同步?

  • 如果任务在同步期间出现了异常导致任务延迟。这时候需要格外注意,如果过长时间的延迟,即便是修复了延迟的问题(比如对端数据库长时间出现不可用)在后续数据同步上也可能存在丢失数据的风险。
  • SQL Server 为了防止 CDC 表数据无限膨胀 SQL Server 会每天定时执行清理作业,清理超过 3天的数据。
  • 为了增加延迟的容忍度可以执行这条 SQL 来增加 CDC 数据的保存时间,代价是这些数据需要存放到数据库表中,如果每日数据变更很多对磁盘开销会有额外的要求。
    • execute sys.sp_cdc_change_job @job_type = n'cleanup', @retention = 4320
    • msdb.dbo.cdc_jobs 表中保存了具体 捕获任务的数据保存时间。

总结

本文简单介绍了如何使用 CloudCanal 进行 SQL Server -> StarRocks 数据迁移同步。

Oracle 到 PostgreSQL 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

本篇文章主要介绍如何使用 CloudCanal 构建一条 OraclePostgreSQL 的数据同步链路。

技术要点

缩小的数据库权限要求

CloudCanalOracle 数据库的高权限要求,主要来自两个面向 DBA 的操作,自动构建字典自动切换归档日志,这两个操作主要是让用户使用更加自动化和便利,但是问题也比较明显,对数据库运维标准严苛的客户来说,这些权限对于我们的客户是没有的,所以新版本 CloudCanal ,通过参数配置,支持了关闭自动字典构建能力(默认打开)关闭自动切换归档日志能力(默认关闭)

多版本 schema 以支持位点回拉

对于关系型数据库同步工具而言,增量数据本身往往和元数据分离,也就是消费到的增量数据和即时从数据库里面获取的元数据不一定匹配(两个时间点之间有DDL),所以维持一个多版本的元数据以应对增量数据解析是必要的, CloudCanal 以每天的 schema dump 为基准,辅以到当前位点的 DDL 语句列表,可构建出任何时间点的元数据(实际上是更加精确的 scn 位点),单个 DDL 前后的数据变更事件,能够精确匹配到相对应的元数据进行解析, CloudCanal 才有可能在此版本产品上提供了回拉位点重复消费一段时间增量数据的能力

支持的版本

源端 Oracle 支持的版本:10.X11.X12.X18.X19.X

对端 PostgreSQL 支持的版本:8.49.09.19.29.39.49.59.610.X11.X12.X13.X14.X15.X16.X17.X

支持的DDL&数据类型映射

  • Oracle -> PostgreSQL 链路支持的DDL暂时只有 ALTER TABLE ,后续我们将不断进行完善
  • CloudCanal 结构迁移和数据迁移同步时会自动进行数据类型映射

类型映射见下表:

Oracle 字段类型PostgreSQL 字段类型
CHAR、NCHAR、VARCHAR2、NVARCHAR、NVARCHAR2、ROWID、HTTPURITYPECHARACTER_VARYING
LONG、CLOB、NCLOBTEXT
NUMBER_BIGINTBIGINT
NUMBER_DECIMAL、BINARY_FLOAT、BINARY_DOUBLENUMERIC
FLOATREAL
DATE、TIMESTAMPTIMESTAMP_WITHOUT_TIME_ZONE、TIMESTAMP_WITHOUT_TIME_ZONE
TIMESTAMP_WITH_TIME_ZONE、TIMESTAMP_WITH_LOCAL_TIME_ZONETIMESTAMP_WITH_TIME_ZONE
XMLTYPEXML
信息

针对于 Oracle -> PostgreSQL 链路,源端 Oracle 不在上表的字段类型暂时不支持。

操作示例

准备工作

添加数据源

  • 登录 CloudCanal 平台

  • 数据源管理 -> 新增数据源 image.png

  • 选择自建数据库 -> 选择对应数据库 -> 输入相关信息 -> 测试连接-> 新增数据源 image.png

信息

Oracle 相较于其他数据源有一些额外的参数可以调整

  • logminerUser:ORACLE源端增量任务使用redo解析(logminer)方式时使用的账号,需要CDB类型用户
  • logminerPasswd:ORACLE源端增量任务使用redo解析(logminer)方式时使用的账号密码
  • logminerConnectType:ORACLE源端增量任务使用redo解析(logminer)方式时使用的连接方式,目前支持ORACLE_SID或ORACLE_SERVICE模式
  • logminerSidOrService:ORACLE源端增量任务使用redo解析(logminer)方式时使用的连接标识符,和logminerConnectType参数配合使用,ORACLE_SID连接方式,则填写sid,ORACLE_SERVICE连接方式,则填写service name
  • 添加 OraclePostgreSQL 之后可以在数据源列表中看到新增的数据源 image.png

创建同步任务

  • 任务管理 -> 创建任务

  • 源端选择 Oracle 数据源,对端选择 PostgreSQL数据源

  • 分别点击测试连接,选择源端和对端需要订阅的数据库,选择下一步 image.png

  • 选择 全量迁移 -> 勾选 增量同步 -> 根据自身机器配置选择 任务规格

  • 选择 下一步 image.png

  • 选择源端需要同步的表,如果目标表显示橙色表示对端不存在该表,任务创建之后,会自动生成该表

  • 点击 下一步 image.png

  • 可以在左侧选择添加 数据过滤条件

  • 点击 下一步 image.png

  • 点击 创建任务 image.png

任务执行

任务创建并且启动后,会自动进行如下的三个阶段:

  • 结构迁移:任务创建之后,如果对端没有表结构,那么 CloudCanal 会去自动在对端创建表结构
  • 全量迁移:将源端存量数据整体迁移到对端
  • 增量同步:全量迁移期间以及全量完成以后的源端增量数据变更会实时同步到对端 image.png image.png

总结

本文简单介绍了如何使用 CloudCanal 进行 Oracle -> PostgreSQL 数据迁移同步。

OceanBase 企业版源端订阅

· 阅读需 7 分钟
John Li
John Li
Chief Executive Officer

简介

CloudCanal 从 2.3.1.1 版本开始支持 OceanBase 企业版( MySQL 兼容模式)源端的订阅。本文将介绍如何使用 CloudCanal 构建一条 OceanBase 到 OceanBase 的迁移同步链路。

技术点

同时兼容社区版和企业版

OceanBase 增量变更的订阅依赖其官方提供的日志代理组件 oblogproxy

部署对应版本的 oblogproxy server,即可使用 CloudCanal 订阅企业版或社区版 OceanBase。

支持集群模式和 cluster url 订阅

CloudCanal 支持订阅集群,添加数据源填写 root server listcluster url 即可。

社区版需要额外部署 config server 才可以使用 cluster url,企业版则需要部署 OCP

使用 cluster url 好处是 CloudCanal 可自动完成 OceanBase 集群节点的变化感知,而使用 root server list 则无法达成此目标,当集群节点发生变化时,需要手动调整 CloudCanal 任务配置。

登录 OceanBase 后采用如下命令即可获取 root server listcluster url

## 使用 sys 用户执行时,返回的 value 字段即为该值。 
show parameters like 'obconfig_url'
## 使用 sys 用户执行时,返回的 value 字段即为该值。
show parameters like 'rootservice_list'

支持常用DDL实时同步

OceanBase 源端支持同步到多个异构数据源对端,并且均支持常用 DDL 的同步,目前支持常用的新增列删除列新增和删除索引操作。

编辑订阅功能

CloudCanal 提供了平滑的修改订阅能力。

对于一个已经创建好正在运行的增量同步任务,如果由于业务需求有新增表需要订阅,可在原有任务基础上新增需要订阅的表,生成子任务,自动完成全量增量同步,在完成后会子任务会自动与原有的任务合并

支持的能力

支持的 OceanBase 版本

3.0、3.1、3.2、4.0

OceanBase 源端支持的对端

MySQL、StarRocks、OceanBase、Kafka

前置准备

部署增量订阅组件 oblogproxy

社区版 OceanBase 请部署社区版oblogproxy

企业版 OceanBase 请联系 OceanBase 官方人员协助部署企业版 oblogproxy server。

在下文的例子中,使用企业版 oblogproxy 2.1.2

部署企业版 OceanBase

企业版 OceanBase 请联系OceanBase官方人员协助部署,并自行准备好测试数据。

操作示例

添加数据源

点击 数据源管理 -> 新增数据源 选择自建数据库中的OceanBase

  • 网络地址

    • 格式为 ip1:sql_port1;ip2:sql_port2
    • 多个root server 用英文分号分隔
    • 支持填写 obproxy 地址,格式为 proxy_ip:proxy_port
  • 额外参数

    • obLogProxyHost
      • oblogproxy server 的地址,格式为 ip:port,默认端口统一为 2983,如果需要订阅增量,该参数不可为空
    • clusterUrl
      • 可以为空,为空时订阅增量时会使用 root server list,不为空时订阅增量会优先使用 cluster url
    • rpcPortList
      • 订阅增量时,该参数不可为空,默认端口为2882。如果网络地址包含多个root server(假设为3个),此处填写格式为2882:2882:2882

    image.png

创建任务

  • 选择 任务管理 -> 创建任务

  • 第一步 配置源端和目标端,测试连通性通过后选择需要订阅的源端库和目标端 并选择订阅和映射的库

  • 源端勾选高级配置可以设置OceanBase的租户,默认为sys。CloudCanal的一个任务只支持订阅属于一个租户下的表。此处请设置需要订阅的表所属的租户。 image.png

  • 第二步 选择任务类型和规格,一般需要同时进行全量初始化和长周期增量同步可以按照如图的配置。 image.png

  • 第三步 表和Action过滤,在第三步用户可以勾选自己需要订阅的表以及事件类型 image.png

  • 第四步 数据处理:在这一步用户可以进行列裁剪和映射;如果有自定义实时处理数据的需求,在这一步也可以上传自定义代码进行自定义处理。自定义代码处理可以参考样例工程image.png

  • 第五步 确认配置:这一步用户可以确认任务的完整配置,确认无误后点击创建任务即可完成任务创建的流程。 image.png

任务运行

  • 任务创建完毕后会自动开始表结构迁移、全量迁移和增量同步 image.png

[可选]老版本 CloudCanal 订正数据源配置

  • 支持填写集群模式的host,原有OceanBase数据源的网络地址需要替换成新的格式,即ip:sql_port,或者直接填写obproxy的ip和port。点击 数据源管理->更多 修改内网/外网地址 修改 image.png image.png

  • 支持了独立的数据源额外配置参数,点击 数据源管理->更多->查看配置 修改。补充obLogProxyHost、rpcPortList等信息 image.png

  • 完成以上配置后,新创建的任务即可正常进行全量迁移和增量实时同步。

[可选]老版本 CloudCanal 订正任务配置

方法一:创建相似任务(推荐)

  • 任务管理点击任务进入任务详情页,选择 功能列表->创建相似任务

  • 创建相似任务,不勾选自动启动,不勾选全量初始化,然后点击下一步直到完成相似任务的创建 image.png image.png

  • 停止老的增量任务,并且记录其位点时间戳T1 image.png

  • 对通过相似任务创建出来的任务在任务详情页选择 回溯位点,设置一个比之前记录的时间戳T1更早的一个时间,启动任务,即可将老的OceanBase源端历史任务升级成支持新版代码的任务 image.png

方法二:修改任务配置

  • 任务管理点击任务进入任务详情页,选择 功能列表->参数修改->源端数据配置 image.png

  • 新版CloudCanal针对OceanBase源端新增了以下参数,补全新增参数的值

    新增参数填写值
    workingModestorage
    timezone非跨时区情况设置+8:00
    clusterUrl如果有configServer或者OCP则填写对应地址,格式为ip:port
    obLogProxyHost填写oblogproxy server的地址,格式为ip:port,默认端口为2983
    rpcPortListOceanBase的RPC PORT,默认为2882,如果使用多个root server,则格式为2882;2882;2882

FAQ

问:报错message: "Failed to parse configuration"

答:出现该报错应该是添加数据源时没有正确配置host、rpcPortList、logproxy host等地址引起。请检查相关配置是否正确配置。

问:报错message: "Failed to auth"

答:如果oblogproxy server开启了鉴权,CloudCanal需确保配置的租户有相关库表的权限。默认情况下CloudCanal使用的租户名为SYS,这可能与用户实际订阅的库表所属的租户不同。请在 **任务详情->功能列表->修改参数 **搜索obTenant参数设置正确的值。

构建数据库多版本 Schema 仓库

· 阅读需 5 分钟
John Li
John Li
Chief Executive Officer

简介

本文主要介绍通过 CloudCanal 快速构建一条 MySQL -> Kafka 同步链路,以订阅源端数据库 DDL 变更和相关 schema 信息来构建数据库的多版本Schema仓库。

技术点

轻量便利的 SCHEMA 管理

常见的 Schema 管理方式如 Schema Registry, 用来管理 Schema 的演变历史,所有数据序列化反序列化需要用到的 Schema 保存在注册表里(构成一个多版本 Schema 仓库)。

每一个数据引用 Schema ID ,序列化或反序列化时从注册表里拉取对应的 Schema 进行相应动作。

image.png

CloudCanal 使用的是一种相对轻巧便利的解决方案:将 DDL 消息携带上最新的 Schema 信息直接发送到指定的 DDL Topic 上,有如下好处:

  • 集中化管理:DML消息会发送到对应的 Topic 中,DDL 变更发送到指定的 DDL Topic,更加集中。
  • 消费者无需解析DDL:下游不需要做任何处理直接消费(不需要做 SQL 解析,不需要序列化等前置操作)。
  • Schema信息实时复用:多个应用程序可以直接复用实时的 Schema 信息。
  • 简单轻巧:不需要依赖第三方组件,轻巧方便,自主可控。

image.png

新增 SCHEMA 元信息字段

CloudCanal 2.3.x 版本之前之前的 DDL 只是携带了对应的 DDL SQL,用户侧一些数据入湖的场景需要有数据的 Schema,否则数据湖的消费者要从每条消息去解析 Schema 信息,十分浪费计算资源,也不能给其他应用复用。

2.3.x 版本之后,发送到 Kafka 的 DDL 消息携带了整个表的结构化数据(Schema信息),并且支持将 DDL 同步到指定的 Topic 中,方便下游的 DDL 的统一化管理。

在CloudCanal发给消息系统的消息体中, tableChanges 字段携带的就是具体的源数据库的 Schema 信息,该 tableChanges 字段包含一个数组,其中包含表中每一列的元信息。由于结构化数据以 JSON 呈现数据,因此消费者无需先通过 DDL 解析器处理消息即可轻松读取消息。具体信息如下,其他字段详情见 Kafka 消息同步格式

字段类型说明
typeStringDDL 类型。
primaryKeyColumnNamesList主键名列表。
columnsJson列信息。
jdbcTypeStringjdbc 类型。
nameString字段名称。
positionString字段的顺序。
typeExpressionString类型描述,如:varchar(20),int。
typeNameString类型名称,如:varchar,int。

前置条件

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备一个 MySQL 数据库,和 Kafka 实例(本例分别使用自建 MySQL 8.0.30 和 Kafka 2.2)
  • 登录 CloudCanal 平台 ,添加 MySQL 和 Kafka
  • Kafka 需要先创建一个 DDL Topic,用于后续 Topic 订阅。 image.png

任务创建

  • 任务管理-> 任务创建。

  • 测试链接并选择 目标 数据库。

  • 选择 Kafka 消息同步格式,这里选择 CloudCanal 默认消息格式,其他详情见 Kafka 消息同步格式

  • 选择 DDL Topic,后续所有的 DDL 都将发送到这个 Topic。

  • 点击下一步。 image.png

  • 因为这里只订阅 DDL,只勾选ALTER,RENAME,TRUNCATEimage.png

  • 持续点击下一步,并创建出数据同步任务。

  • 任务自动做增量同步,进行 DDL 订阅。 image.png

消费订阅数据

  • 源端 MySQL 执行模拟多条 DDL 执行,最后会以用户选择的序列化器,同步到对端 Kafka 中。
alter table alltype_mysql.aaa add c int null;

truncate table e;

alter table alltype_mysql.aaa drop column d;
...

  • 由于我们使用的是阿里云 kafka,可以直接查看对端 Kafka 消息内容。

image.png

  • 同样也可以使用 Kafka 消费程序直接消费 DDL 信息,多个消费程序可以对 Schema 信息进行复用,具体 Demo 详情见:Kafka 消费实例程序

image.png

  • 消费示例消息:
{
"action":"ALTER",
"before":[],
"bid":0,
"data":[],
"db":"alltype_mysql",
"dbValType":{
"a":"int",
"b":"int",
"c":"int"
},
"ddl":true,
"entryType":"ROWDATA",
"execTs":1669880624000,
"jdbcType":{
"a":4,
"b":4,
"c":4
},
"pks":[],
"schema":"alltype_mysql",
"sendTs":1669880625842,
"sql":"alter table aaa add c int null",
"table":"aaa",
"tableChanges":{
"table":{
"columns":[
{
"jdbcType":4,
"name":"a",
"position":0,
"typeExpression":"int",
"typeName":"int"
},
{
"jdbcType":4,
"name":"b",
"position":1,
"typeExpression":"int",
"typeName":"int"
},
{
"jdbcType":4,
"name":"c",
"position":2,
"typeExpression":"int",
"typeName":"int"
}
],
"primaryKeyColumnNames":[]
},
"type":"ALTER"
}
}

常见问题

支持订阅哪些携带 Schema 消息的数据源

目前源端只支持 MySQL 订阅携带 Schema 的消息,而 Oracle、PostgreSQL 等支持普通 DDL 订阅,后续会陆续支持。

总结

本文简单介绍了如何使用 CloudCanal 进行 DDL 消息订阅,可以方便的选择多种 Kafka 消息格式,将 DDL 发送到指定的 Topic,方便了下游的 Schema 管理。

Oracle 到 Starrocks 数据同步

· 阅读需 5 分钟
John Li
John Li
Chief Executive Officer

简述

CloudCanal 当前最新版本已经支持源端 Oracle、SqlServer 等主流传统数据库作为源端迁移同步数据到 StarRocks 来构建实时数仓。

本文简要介绍如何快速构建一个 Oracle 到 StarRocks 数据迁移同步任务

技术要点

基于StreamLoad的导入方式

CloudCanal 采用了 StreamLoad 方式进行导入,源端的消息会转成字节流,最后会以批量发送的形式通过 HTTP 协议发往 StarRocks。

CloudCanal 默认采用 json 格式来进行StreamLoad导入,如果用户内容特殊字符较少,也可以开启 csv 格式导入,分隔符可以通过参数 columnSeparator 和 lineSeparator 设置。

基于 StreamLoad 的写入方式,实际写入对端的操作均为 INSERT,CloudCanal 同步时会自动将 UPDATE / DELETE 转成 INSERT 语句,并修改 __op 值,StarRocks会自动进行数据合并。

支持常用DDL实时同步

Oracle -> StarRocks 支持新增列删除列DDL。DDL实时同步到对端时会自动转换成 StarRocks 兼容的语法,数据类型也会根据映射关系进行自动转换。

编辑订阅功能

CloudCanal 提供了便利的修改订阅能力。对于一个已经创建好的正在运行的增量同步任务,如业务需求有新增表需要订阅,可以在原有任务的基础上新增需要订阅的表,生成子任务,自动完成全量、增量迁移同步,在完成后会子任务会自动与原有的任务合并。

数据类型映射

CloudCanal结构迁移和数据迁移同步时会自动进行数据类型映射。类型映射见下表:

Oracle类型StarRocks类型
CHARVARCHAR
NCHARVARCHAR
VARCHAR2VARCHAR
NVARCHARVARCHAR
NVARCHAR2VARCHAR
LONGSTRING
NUMBER_BIGINTBIGINT
NUMBER_DECIMALDECIMAL
FLOATFLOAT
BINARY_FLOATDECIMAL
BINARY_DOUBLEDECIMAL
CLOBSTRING
NCLOBSTRING
DATADATETIME
TIMESTAMPDATETIME
TIMESTAMP_WITH_TIME_ZONEDATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONEDATETIME
INTERVAL_YEAR_TO_MONTHDATETIME
INTERVAL_DAY_TO_SECONDDATETIME
ROWIDSTRING
PLSQL_BOOLEANBOOLEAN
XMLTYPESTRING
HTTPURITYPESTRING

操作示例

准备CloudCanal

Oracle源端前置准备

CloudCanal在做Oracle作为源端的数据迁移同步时,需要做一些前置准备。具体可以参考 ORACLE LogMiner同步准备

添加数据源

  • 登录CloudCanal平台

  • 数据源管理->添加数据源 添加数据源

  • 创建源端数据源:选择自建数据源,选择Oracle,并填写相关数据库信息 image.png

    • logminerUser
      • ORACLE源端增量任务使用redo解析(logminer)方式时使用的账号,需要CDB类型用户
    • logminerPasswd
      • ORACLE源端增量任务使用redo解析(logminer)方式时使用的账号密码
    • logminerConnectType
      • ORACLE源端增量任务使用redo解析(logminer)方式时使用的连接方式,目前支持ORACLE_SID或ORACLE_SERVICE模式
    • logminerSidOrService
      • ORACLE源端增量任务使用redo解析(logminer)方式时使用的连接标识符,和logminerConnectType参数配合使用,ORACLE_SID连接方式,则填写sid,ORACLE_SERVICE连接方式,则填写service name
  • 创建目的端数据源,选择自建数据源,选择StarRocks,并填写相关数据库信息 image.png

  • Client地址

    • StarRocks 提供的 MySQL 交互协议端口,CloudCanal 用其查询库表表的元数据信息,对应 StarRocks QueryPort,默认端口为 9030
  • Http地址

    • StarRocks 接收 streamload 的 http 请求,此处可填写 BE 节点地址,默认端口为 8030
    • 如需负载均衡也可直接填写 FE 地址和端口
  • 数据源创建成功 image.png

任务创建

  • 任务管理->创建任务 image.png

  • 选择集群

  • 源端选择刚添加的Oracle数据源,目标端选择StarRocks数据源,分别点击测试连接按钮以测试数据库连通性和获取schema级别元信息,显示连接成功后,设置数据库映射关系

  • StarRocks的结构迁移支持用户自定义分桶数等自定义建表信息

  • 点击下一步 image.png

  • 选择增量同步,并且勾选全量初始化

  • 点击下一步 image.png

  • 选择订阅的表

  • 点击下一步 image.png

  • 配置列映射

  • 点击下一步 image.png

  • 点击创建任务 image.png

  • 任务创建成功并启动后,会自动执行结构迁移、全量迁移、增量同步 image.png

总结

本文简单介绍了如何使用 CloudCanal 进行 Oracle 到 StarRocks 数据迁移同步。

StarRocks 作为新兴的实时数仓产品,为传统数据业务带去更加实时、一致的体验,让数据得到更加广泛的使用,CloudCanal希望助一臂之力,让数据流动更加平滑顺畅。

跨互联网数据互通(HTTP)

· 阅读需 7 分钟
Barry
Chief Technology Officer

简介

CloudCanal 实现的 基于 Kafka 构建安全的跨互联网数据同步 方案被客户用于生产后,又出现了新的需求,主要集中在方案能否更加轻量化和可控性上,简而言之,去掉 Kafka 中转,直接在 CloudCanal 中实现跨网络安全互通。

本篇文章即介绍 CloudCanal 实现的更加轻量化方案,特点包括

  • 无消息等独立软件依赖
  • 两端数据库完全不开放公网端口
  • 两端数据库元数据可映射
  • 基于 HTTPS 传输
  • 具备用户名密码鉴权机制
  • 支持多种数据库异构互通

技术点

image.png

Tunnel数据源

去掉消息依赖的跨互联网数据库互通,我们是通过一个虚拟的数据源 Tunnel 实现。 Tunnel 数据源本身并不是实体数据库,而是一组逻辑信息,包括

  • ip(或域名)
  • port
  • 用户名
  • 密码
  • TLS 证书文件和密码
  • 元数据

通过这个虚拟数据源,我们可以使用 HTTP(S) 或 TCP 实现数据拉取或者接收数据的目的,同时完全匹配 CloudCanal 业务模型,达到功能的完整性。

PUSH模型

对于数据传输模式 PUSH 或 PULL,我们选择了 PUSH 模式,即客户端将数据推送到服务端,本质原因在于

  • 主要解决互通问题,而非订阅问题
  • 目标端同步写入数据更加匹配 CloudCanal 其他目标端风格
  • 数据通道无数据持久化,无需维护 store 来暂存数据

当然,PUSH 模式也带来一些问题,包括

  • 如何确保最终数据写入再提交位点
  • 位点回溯复杂(全量和增量、不同数据源位点格式不一致)

对于上述两个问题,我们采用 延迟提交位点技术 解决

延迟提交位点技术

采用 PUSH 模式后,位点管理是比较复杂且危险的工作,如果提早提交位点,可能丢数据。 为此,我们实现了 延迟提交位点技术,即客户端每一次写入 server 端,只返回已经提交的位点,并且将所有数据源、所有任务类型的位点序列化成 json 字符串。

通过这项技术,我们能够确保位点之前的数据肯定已经写到对端,并且在某些业务场景下,通过客户端任务的位点回溯,达到重复消费某一段时间数据的目的。

元数据映射

因为使用了虚拟的 Tunnel 数据源,并且其带有 schema(存储于CloudCanal kv配置表中), 所以针对这个数据源,我们模拟了表结构获取和迁移的过程,让其在任务创建和运维过程中如同一个真实存在的数据库。 一个真实的 Tunnel 数据源的元数据如下:

[
{
"db": "cc_virtual_db",
"schemas": [
{
"schema": "cc_virtual_schema",
"tables": [
{
"table": "WORKER_STATS",
"columns": [
{
"name": "ID",
"jdbcType": -5,
"typeName": "LONG",
"key": true
},
{
"name": "GMT_CREATE",
"jdbcType": 93,
"typeName": "TIMESTAMP",
"key": false
},
{
"name": "AUCRDT",
"jdbcType": 93,
"typeName": "TIMESTAMP",
"key": false
}
]
},
{
"table": "KBS_QUESTION",
"columns": [
{
"name": "ID",
"jdbcType": -5,
"typeName": "LONG",
"key": true
},
{
"name": "CATEGORY",
"jdbcType": 12,
"typeName": "STRING",
"key": false
}
]
}
]
}
]
}
]

我们可以通过结构迁移 (MySQL/SQLServer/Oracle -> Tunnel) 扩充它,或者直接通过 数据源管理->更多->查看配置->_**dbsJson **_进行修改。

操作示例

本示例使用阿里云资源模拟杭州 RDS for MySQL 到深圳 RDS for MySQL , 两端数据库均不开公网端口,数据走互联网, 采用 HTTPS 传输和用户名密码认证。

环境准备

  • 杭州环境部署 CloudCanal ,并购买 RDS for MySQL 作为源端 blog/tech_share/http_sync_3 blog/tech_share/http_sync_4

  • 深圳环境部署 CloudCanal , 并购买 RDS for MySQL 作为目标端 blog/tech_share/http_sync_1 blog/tech_share/http_sync_2

  • 因 CloudCanal 为 docker 版本 ,深圳环境 CloudCanal 安装包解压后 ,需要修改 docker-compose.yml 端口映射再安装/升级,并开放 ECS 安全组相关端口,以便远程连接

  • 此例以 18443 端口作为 Tunnel 数据源监听端口 blog/tech_share/http_sync_5 blog/tech_share/http_sync_6

为目标端数据库初始化元数据

  • 因无法通过 Tunnel 到对端数据库做结构迁移,所以需要事先使用 mysqldump 等工具初始化对端数据库结构

添加 Tunnel 数据源

  • 分别在源端和目标端 CloudCanal 配置 Tunnel 数据源 blog/tech_share/http_sync_7

  • 源端数据源列表 blog/tech_share/http_sync_9

  • 目标端数据源列表
    blog/tech_share/http_sync_8

为 Tunnel 初始化元数据

  • 源端创建一个 MySQL -> Tunnel 结构迁移,并完成 blog/tech_share/http_sync_10 blog/tech_share/http_sync_11

  • 从源端 Tunnel 数据源拷贝结构并复制到目标端 blog/tech_share/http_sync_12 blog/tech_share/http_sync_13 blog/tech_share/http_sync_14

目标端任务创建

  • 选择 Tunnel 和 目标数据库 blog/tech_share/http_sync_15

  • 选择数据同步 blog/tech_share/http_sync_16

  • 选择表、列、映射略

  • 任务正常运行,监听端口并准备接收数据 blog/tech_share/http_sync_17

源端任务创建

  • 选择源端数据库 和 Tunnel 数据源 blog/tech_share/http_sync_18

  • 选择数据同步,并初始化数据 blog/tech_share/http_sync_19

  • 数据持续同步中 blog/tech_share/http_sync_20

数据验证

造增量数据

  • 为了造数据简便,开下源端数据库公网地址 blog/tech_share/http_sync_21

数据校验

  • 在深圳环境添加源端数据源,并做数据校验。结果显示数据一致。 blog/tech_share/http_sync_22 blog/tech_share/http_sync_23

常见问题

  • 目前支持哪些链路的互通?

    • MySQL/SQLServer/ORACLE -> MySQL , 其他互通按需添加。
  • Tunnel 到对端数据库能做结构迁移么?准备表结构比较麻烦

    • 因为数据库结构对元数据精度要求很高,Tunnel中间结构主要为同步服务,所以元数据级别上还无法构成精确的结构迁移源端。建议构建临时实例(只dump表结构)并开公网,再使用CloudCanal结构迁移解决问题。
  • Tunnel 数据源有结构,能动态编辑么?

    • Tunnel 数据源模拟了一个数据库,编辑任务能力天然具备。加表先编辑目标端任务,再编辑源端任务,否则反之。我们后续计划用一篇专门的文章介绍这个运维操作。
  • 目前数据互通还存在什么问题?

    • 对于 blob 等字段类型还需要进一步支持和验证
    • 跨互联网,性能层面需要经过特别的优化
    • 安全层面,目前仅用到 HTTPS 证书加密,配合自定义的账号密码

总结

本文主要介绍纯粹通过 CloudCanal 进行数据互通实践,通过引入虚拟数据源,达成数据互通和元数据映射等能力,具备不错的可落地性。

MySQL 到 PostgreSQL/Greenplum 数据同步

· 阅读需 5 分钟
John Li
John Li
Chief Executive Officer

简述

本文主要介绍如何使用 CloudCanal 构建一条 MySQLGreenplum / PostgreSQL 的数据同步链路。

支持版本

源端 MySQL 支持的版本为:5.65.78.X 对端 PostgreSQL 支持的版本为:8.49.09.19.29.3 9.49.59.610.X11.X12.X13.X14.X15.X16.X17.X 对端 Greenplum 支持的版本为:6.X

技术点

流程自动化&功能丰富

支持创建结构迁移、全量迁移、增量同步、数据校验、数据订正类型的任务。结构迁移、全量迁移和增量同步可作为一个任务的多个阶段自动化进行。

新增表自动迁移同步

CloudCanal 提供了修改订阅的能力。对于一个正在运行的增量同步的任务,CloudCanal 提供了动态修改订阅的方式,可以对一个正在运行的增量同步任务新增需要订阅的表。对于新增的表,CloudCanal 会自动迁移、同步其数据。

自定义数据处理

用户在迁移、实时同步期间如需要对传输的数据行进行自定义的加工可以采用 CloudCanal提供的自定义数据处理能力,这对于实时宽表构建、新增动态列、基于微服务、缓存的数据清洗等数据处理场景都非常有帮助。关于更多自定义数据的使用方式可以参考:数据处理插件使用方式

支持多种 DDL

MySQL -> Greenplum / PostgreSQL 链路支持的DDL有 Create TableDrop TableAlter TableRname TableCreate Index

支持高性能写入模式

CloudCanal 中默认采用 PostgreSQL/Greenplum的驱动通过JDBC的方式进行批量写入。如果用户对性能要求很苛刻,可以尝试开启基于Copy模式的高性能写入模式。在Copy写入模式下,写入性能相比采用JDBC的方式有很大的提升。了解基于Copy的高性能写入模式可以参考:开启PG/GP高性能写入

支持地理信息类型写入

PostgreSQLGreenplum对于地理信息类型的处理比较友好,因此常常被用于存储地理信息数据。CloudCanal 支持迁移同步源端地理信息类型的数据并且对其做自动转换后写入对端。了解更多 CloudCanal对于地理信息类型的处理可以参考文章:CloudCanal地理数据同步与处理

结构迁移类型自动处理

异构数据源之间对类型的处理都存在差异,CloudCanal 会进行自动的转化和优化,例如在 MySQL 中可以定义的VARCHAR(0)数据类型,在 PostgreSQL / Greenplum 中不支持,CloudCanal 结构迁移时会自动将源端MySQLVARCHAR(0)类型映射为 VARCHAR(1)

数据类型映射

CloudCanal 结构迁移和数据迁移同步时会自动进行数据类型映射。类型映射见下表:

MySQL 类型PostgreSQL / Greenplum 类型
BITBIT
TINYINTSMALLINT
SMALLINTSMALLINT
MEDIUMININTINTEGER
INTINTEGER
BIGINTBIGINT
DECIMALNUMERIC
FLOATNUMERIC
DOUBLENUMERIC
DATETIMESTAMP WITHOUT TIME ZONE
DATETIMETIMESTAMP WITHOUT TIME ZONE
TIMESTAMPTIMESTAMP WITHOUT TIME ZONE
TIMETIME WITHOUT TIME ZONE
YEARINTEGER
CHARCHARACTER
VARCHARCHARACTER VARYING
BINARYBYTEA
VARBINARYBYTEA
TINYBLOBBYTEA
BLOBBYTEA
MEDIUMBLOBBYTEA
LONGBLOBBYTEA
TINYTEXTBYTEA
TEXTTEXT
MEDIUMTEXTTEXT
LONGTEXTTEXT
ENUMTEXT
SETTEXT
JSONJSON
GEOMETRYTEXT
POINTPOINT
LINESTRINGTEXT
POLYGONPOLYGON
MULTIPOINTTEXT
GEOMETRY_COLLECTIONTEXT
GEOM_COLLECTIONTEXT
MULTILINESTRINGTEXT

准备工作

操作示例

添加数据源

  • 登录 CloudCanal 平台

  • 数据源管理 -> 新增数据源 -> 自建数据库 image.png

  • 添加 Greenplum 或者 PostgreSQL后可以在数据源列表看到新增的数据源。 image.png

创建同步任务

  • 任务管理 -> 创建任务

  • 源端选择 MySQL 数据源,对端选择 Greenplum / PostgreSQL

  • 分别点击 测试连接,选择源端对端需要订阅的库,选择 下一步 image.png

  • 选择 增量同步 -> 选择 全量初始化

  • 根据自身机器配置选择 规格

  • 选择 下一步 image.png

  • 选择源端需要同步的表,如果目标表显示橙色表示对端不存在该表,任务创建之后,会自动生成该表

  • 点击下一步 image.png

  • 可以在左侧,添加 数据过滤条件

  • 选择 下一步 image.png

  • 选择 创建任务 image.png

任务执行

任务创建并且启动后,会自动进行如下的三个阶段:

  • 结构迁移:任务创建之后,如果对端没有表结构,那么 CloudCanal 会去自动在对端创建表结构
  • 数据初始化:将源端存量数据整体迁移到对端
  • 数据同步:全量迁移期间以及全量完成以后的源端增量数据变更会实时同步到对端 image.png image.png

总结

本文简单介绍了如何使用 CloudCanal 进行 MySQL -> Greenplum 数据迁移同步。各位读者朋友,如果你觉得还不错,请点赞、评论加转发吧。

分库分表数据汇聚

· 阅读需 6 分钟
John Li
John Li
Chief Executive Officer

简述

CloudCanal 2.X 版本近期支持了自定义代码能力,带来了丰富的场景化数据能力,本文主要介绍在面向 To C 业务分库分表情况下,如何通过 CloudCanal 进行数据实时汇聚。

本方案特点:

  • 数据处理灵活,适配多变的业务数据汇聚需求
  • 针对大部分带结构数据源互通,可举一反三
  • 稳定性较好

技术点

约束冲突

对于一部分分库分表中间件或业务自己写的拆分逻辑,并没有考虑写入数据主键或者唯一字段值的全局唯一问题,导致做数据汇聚时约束冲突。

另一类系统,在业务上就独立,做数据汇集时,除了约束冲突,还存在结构不一致,数据规范不统一的问题。

对以上两种情况,添加额外的字段以消除分表之间的约束冲突,进行数据清洗、结构调整,将数据进行规整。自定义代码能够很好的完成这种使命。

DDL 同步

分库分表数据汇聚还存在一个较大的问题是 DDL 同步,对于大部分这类场景, 类似的 DDL 会在源端执行多遍,但是在对端只能执行一遍,并且数据和部分 DDL 有顺序依赖问题 --- 只有 DDL 在对端执行成功之后,新的数据才能写入或者执行。

我们目前建议不同步 DDL, 按照一定规范进行源和目标端 DDL 变更,达到不延迟且 DDL 不处于中间状态的目的。

操作示例

前置条件:

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档

  • 准备好 MySQL 数据库(本例源端 5.7 ,目标端 8.0)

  • 源端 MySQL 上创建 2 个分库( shard_1shard_2), 表结构一致(本例每一个分库只有一张分表)

     CREATE TABLE `shard_1`.`my_order` (
    `id` bigint(19) NOT NULL AUTO_INCREMENT,
    `gmt_create` datetime NOT NULL,
    `gmt_modified` datetime NOT NULL,
    `product_id` bigint(20) NOT NULL,
    `user_id` bigint(20) NOT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=35 DEFAULT CHARSET=utf8

    CREATE TABLE `shard_2`.`my_order` (
    `id` bigint(19) NOT NULL AUTO_INCREMENT,
    `gmt_create` datetime NOT NULL,
    `gmt_modified` datetime NOT NULL,
    `product_id` bigint(20) NOT NULL,
    `user_id` bigint(20) NOT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=35 DEFAULT CHARSET=utf8
  • 目标 MySQL 上创建 1 个汇聚库(no_shard),并包含1张汇聚表

    • 额外多出 region 字段,该字段通过自定义代码固定生成
    • 源端主键 id 和生成字段 region 组合成联合主键,方便数据汇聚时保持唯一
     CREATE TABLE `my_order` (
    `id` bigint NOT NULL,
    `region` varchar(64) NOT NULL,
    `gmt_create` datetime NOT NULL,
    `gmt_modified` datetime NOT NULL,
    `product_id` bigint NOT NULL,
    `user_id` bigint NOT NULL,
    PRIMARY KEY (`id`,`region`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3

开发宽表代码

添加数据源

  • 登录 CloudCanal 平台
  • 数据源管理->新增数据源
  • 将源端和目标端MySQL 分别添加 截屏2021-12-20 下午5.06.06.png

分库shard_1任务创建

  • 任务管理->任务创建
  • 选择 目标 数据源
  • 选择 数据同步,并勾选 全量数据初始化, 其他选项默认
  • 选择需要迁移同步的表, 此处只要选择待聚合表即可,对端选择聚合表 截屏2021-12-20 下午5.04.03.png
  • 修改自定义代码,并打包 截屏2021-12-20 下午5.08.02.png
    % pwd
    /Users/zylicfc/source/product/cloudcanal/cloudcanal-data-process
    % mvn -Dtest -DfailIfNoTests=false -Dmaven.javadoc.skip=true -
    Dmaven.compile.fork=true clean package
    截屏2021-12-20 下午4.00.36.png
  • 选择列,默认全选,选择上传代码包1 截屏2021-12-20 下午4.57.59.png
  • 确认创建,并自动运行

分库shard_2任务创建

  • 任务管理->任务创建
  • 选择 目标 数据源
  • 选择 数据同步,并勾选 全量数据初始化, 其他选项默认
  • 选择需要迁移同步的表, 此处只要选择待聚合表即可,对端选择聚合表 截屏2021-12-20 下午4.16.00.png
  • 修改自定义代码,并打包 截屏2021-12-20 下午3.59.05.png
    % pwd
    /Users/zylicfc/source/product/cloudcanal/cloudcanal-data-process
    % mvn -Dtest -DfailIfNoTests=false -Dmaven.javadoc.skip=true -
    Dmaven.compile.fork=true clean package
    截屏2021-12-20 下午4.00.36.png
  • 选择列,默认全选,选择上传代码包2
  • 确认创建,并自动运行

分库任务状态

  • 两个分库汇聚任务正常运行 截屏2021-12-20 下午3.57.27.png

校验数据

  • 变更shard_1数据 截屏2021-12-20 下午3.51.54.png
  • 变更shard_2数据 截屏2021-12-20 下午3.50.47.png
  • 查看no_shard汇聚库数据 截屏2021-12-20 下午3.52.30.png

常见问题

是否支持带数字后缀的分表

支持,就是在自定义代码中匹配表名会稍微复杂些,需要自行修改匹配逻辑。

是否支持异构数据库

支持,自定义代码是 CloudCanal 通用功能,可实现自由的数据变幻。但是对于具体的目标数据源,行为可能会发生一些细微变化,需要进行一定的测试和验证。

如果遇到出错或者问题怎么办?

如果会 java 开发,建议打开任务的 printCustomCodeDebugLog 观察输出的数据是否符合预期,如果不符合预期,可以打开任务的 debugMode 参数,对数据转换逻辑进行调试。

如果不会 java 开发, 找 CloudCanal 同学协助。

总结

本文简单介绍了如何使用 CloudCanal 进行分库分表数据汇聚。各位读者朋友,如果你觉得还不错,请点赞、评论加转发吧。

Kafka 数据中转校验

· 阅读需 3 分钟
Barry
Chief Technology Officer

案例简述

CloudCanal 支持 MySQL -> Kafka完整的传输链路。

Kafka是消息系统,不支持SQL查询数据。针对这种异构数据源,我们如何来校验源对端数据的一致性呢?

本案例将通过 CloudCanal 先将Kafka数据再次回流到MySQL, 最后 检验两个 MySQL 的数据是否一致,即可间接地对比数据一致性。本案例使用 DebeziumEnvelope 消息格式。

前置条件

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备好 2 个 MySQL 实例,1 个 Kafka 实例(本例使用自己搭建的两台 MySQL 5.6,阿里云 Kafka 2.2)
  • 两台 MySQL 实例准备两个相同的表 kafka_migration_test.table1
create schema kafka_migration_test collate utf8mb4_unicode_ci;
create table table1 (
`col1` varchar(25) null,
`col2` varchar(25) null,
`col3` varchar(45) null
);
  • 登录 CloudCanal 平台,添加 Kafka,MySQL image.png

  • kafka 自定义一个主题 topic_1,并创建一条 MySQL(1) -> Kafka 链路作为增量数据来源

    • 因为默认的消息格式是不带 Schema 字段的,这里的消息我们会反写到 MySQL 要利用 Schema 的信息,所以创建成功后需要在 **源端配置页面 **设置 SchemaInclude 参数为 true image.png
  • kafka 自定义一个主题 topic_2,并创建消费者组 migration,后续用于反写数据, 并创建一条 Kafka -> MySQL(2) 链路作为增量数据来源

任务创建 1 [ MySQL(1) -> Kafka ]

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库,并选择 DebeziumEnvelope 消息格式,和 topic_1 主题 image.png

  • 选择 数据同步,不勾选 全量数据初始化,其他选项默认 image.png

  • 选择需要迁移同步的表 table1和对应的 Kafka 主题 topic_1 image.png

  • 没有主键自动忽略,方便后序生成数据 image.png

  • 确认创建任务,创建成功后在源端配置页面设置 SchemaInclude 参数为 true image.png

  • 然后点击生效配置,确认重启 image.png

任务创建 2 [ Kafka -> MySQL(2) ]

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库 image.png

  • 选择 数据同步,其他选项默认

  • 选择消费的 Kafka 主题 topic_2和需要迁移同步的表 table1 image.png

  • 确认创建任务,没有主键自动忽略,方便后序生成数据

数据验证

  • 程序造数据,向 MySQL(1) 生成数据,MySQL(1) -> Kafka(topic_1) -> Kafka(topic_2) -> MySQL(2)

  • Kafka(topic_1) 数据会下沉到 topic_2 主题

  • 任务正常运行一段时间后,停止造数据,等待数据全部同步完成

  • 创建 MySQL 到 MySQL 的数据准确校验 image.png

  • 选着数据校验功能,其他的默认 image.png

  • 数据校验 OK image.png

总结

本文展示了 Kafka 的双向同步案例,验证了 CloudCanal 传输数据的准确性。

数据校验与订正

· 阅读需 6 分钟
Barry
Chief Technology Officer

简述

CloudCanal 除了提供最核心的数据迁移和同步能力以外,还提供数据校验和数据订正两种非常实用的能力。

这两种功能为用户保障数据迁移同步链路的数据质量提供了非常大的便利性。

例如对端数据库因为各种原因产生一些异常写入导致的数据不一致或者丢失,用户均可以使用CloudCanal提供的数据校验和数据订正能力来基于同步链路的源端数据来恢复数据,使得对端数据库中相比源端丢失或者不一致的数据得到恢复。

技术点

基于校验结果的针对性订正

执行完CloudCanal的校验任务后,在运行任务的机器上会生成一个文件compre_rs.log用于记录校验的结果信息。日志路径为~/logs/cloudcanal/tasks/taskName/compare_rs.log,其格式如下:

库表名称,结果类型,主键信息

{"tableUnit":"test15.test_huasheng1","type":"DIFF","pkColMap":{"id":"9"}}
{"tableUnit":"test15.test_huasheng1","type":"LOSS","pkColMap":{"id":"12"}}

结果类型分为两种:

  • DIFF:对端相比源端不一致的行,例如上面例子中,源端主键id=9的行和对端存在不一致。
  • LOSS:在源端表中存在,但是在对端表中不存在的行。上面例子中源端主键id=12的行,在对端不存在

主键信息记录的是源端的,支持联合主键。

为了性能考虑,这里DIFF时不展示具体哪一列的数据不一致。如果需要查看这个信息,这个数据信息记录在~/logs/cloudcanal/tasks/${taskName}/diff.log中

利用数据库的upsert能力进行订正

针对支持upsert语义写入的数据源作为对端时,CloudCanal的订正可以正常工作。CloudCanal根据校验结果去源端反查数据后写入对端,如果对端不存在该主键的行,则直接INSERT写入,如果存在则自动转换为UPDATE进行更新。

使用in multi column处理联合主键的情况

针对实现SQL标准中in multi column语法的数据库作为源端时,CloudCanal支持对其进行数据订正。CloudCanal根据主键扫描源端表时,如遇联合主键的场景,会根据in multi column的语法来扫描源端的数据。不支持in multi column SQL语法的数据源CloudCanal不支持订正其数据。in multi column语法的使用例子可以参考如下:

-- works in PostgreSQL, Oracle, MySQL, DB2, HSQLDB
SELECT whatever
FROM t --- you missed the FROM
WHERE (col1, col2) --- parentheses here
IN ((val1a, val2a), (val1b, val2b), ...) ;

使用须知

  • 以下源端、对端之间支持创建订正任务:
    • 源端:Oracle、PostgreSQL、MySQL、OceanBase、PolarDBMySQL
    • 对端: MySQL、PolarDBMySQL、Oracle、PostgreSQL、OceanBase
  • 支持该特性的CloudCanal版本:v2.2.6.8(商业版)
  • 订正是以源端数据为准:校验结果中会记录对端相比源端缺失、不一致的行的源端主键信息。订正则会基于该源端主键进行订正。假设对端多出了一些源端不存在的主键,在订正的时候CloudCanal是不会去删除这些行的请知悉。
  • 基于校验结果的订正依赖校验任务的校验结果文件,因此当关联的校验任务在不同机器上执行过的话,则无法基于该校验任务创建订正任务。在校验任务详情,点击功能列表->重启历史记录 可以查看校验任务是否在多台机器上运行过。 image.png

操作说明

前置条件

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备两个支持数据订正的数据库,一个作为源端,一个作为对端。本次例子采用的源对端数据源类型为阿里云的PolarDBMySQL

校验订正的基本流程

使用CloudCanal的校验订正能力恢复异常数据的典型流程如下图所示。

image.png

数据校验

  • 在任务管理页点击创建任务,进入创建任务的第一步,配置源对端的数据库并且选择需要订阅的库。 image.png

  • 选择任务类型为校验,开启一次性校验,设置自动启动。 image.png

  • 选择需要进行校验的表。 image.png

  • 选择需要进行校验的列,支持映射和裁剪。裁剪的列将不参与校验。 image.png

  • 确认任务整体配置情况,无误后点击创建 image.png

  • 校验完成后可以查看具体每张表相比源端缺失或者不一致的数据。 image.png

数据订正

  • 校验任务的详情页,点击功能列表中的创建订正任务可以直接基于该次校验的结果,创建对应表的订正任务。 image.png

  • 订正任务的源对端信息和订阅的库信息与之前的校验任务保持一致,此处源对端测试连接成功后可直接点击下一步。 image.png

  • 规格与校验任务保持一致,可以直接下一步 image.png

  • 订阅信息仅供确认,无法修改,与校验任务保持一致,直接点击下一步 image.png

  • 确认列的映射、裁剪信息,无法修改,与校验任务保持一致,直接点击下一步 image.png

  • 确认订正任务的配置无误后点击创建任务 image.png

  • 校验完成后可以看到具体订正的统计信息 image.png

总结

本文介绍了如何利用 CloudCanal 的校验订正能力来快速恢复数据。

MySQL 数据历史变更构建

· 阅读需 5 分钟
John Li
John Li
Chief Executive Officer

简述

CloudCanal 为满足用户对于核心数据风控需求,在 MySQL->Greenplum 链路支持了历史数据能力,构建的历史数据特点包括:

  • exactly once(有且仅有一次)
  • 支持事务内多次变更,反复写入删除等特殊场景

本文通过一个实际案例简要介绍如何使用这个功能。

技术点

核心数据变更风控的必要性

用户数据、交易数据的变更追踪,常用方式是通过应用埋点、数据网关或代理、数据管理控制平台等方法进行,通过规则预防事后止损两大方向控制风险。

通过数据库增量日志追踪数据变更历史,从风控角度属于事后止损方向,常见落地方案如准实时数据对账数据历史算法分析与统计等。

CloudCanal 目前根据用户需求,支持 MySQL->Greenplum 链路数据实时变更历史追踪功能。

历史数据结构形态

历史数据构建过程中,幂等性能够大幅度便利用户数据分析,用户无需分辨相类似数据是否是业务层面变更还是系统原因造成的重复数据。为此,我们结合具体数据链路,构建了幂等历史数据表结构。表结构如下。

CREATE TABLE "public"."worker_stats" (
"id" bigint NOT NULL ,
"__action" varchar(64),
"__seq_in_tx" bigint,
"__gtid" varchar(255),
...other column...
PRIMARY KEY ("id","__action","__seq_in_tx","__gtid")
) ;

其中 id 为原始表主键,可以为联合主键(也就会有多个字段),__action 为此变更操作类型,包含 INSERT/UPDATE/DELETE 3个候选值, __seq_in_tx 为本变更在事务中的序号,__gtid 表示事务号。以上几个字段,在历史数据表结构中构成联合主键,达到幂等目标。

操作示例

前置条件:

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备好 MySQL 数据库(本例版本为 8.0) 和 Greenplum 数据库(本例为阿里云 ADB for PG)
  • 准备好 MySQL 示例表以及 Greenplum 中表,并造一些数据。
-- MySQL

CREATE TABLE `worker_stats` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`worker_id` bigint(20) NOT NULL,
`cpu_stat` text,
`mem_stat` text,
`disk_stat` text,
`col_new` varchar(255) NOT NULL DEFAULT '123',
`unsigned_col` bigint(20) unsigned DEFAULT NULL,
`new_col_col` varchar(123) DEFAULT NULL,
`timestamp_col` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=503723 DEFAULT CHARSET=utf8mb4;

-- Greenplum

CREATE TABLE "public"."worker_stats" (
"id" bigint NOT NULL ,
"__action" varchar(64),
"__seq_in_tx" bigint,
"__gtid" varchar(255),
"gmt_create" timestamp NOT NULL,
"worker_id" bigint NOT NULL,
"cpu_stat" text,
"mem_stat" text,
"disk_stat" text,
"col_new" varchar(255) NOT NULL DEFAULT '123',
"unsigned_col" bigint DEFAULT NULL,
"new_col_col" varchar(123) DEFAULT NULL,
"timestamp_col" timestamp NOT NULL,
PRIMARY KEY ("id","__action","__seq_in_tx","__gtid")
) ;

添加数据源

  • 登录 CloudCanal 平台
  • 数据源管理->新增数据源
  • 将源端MySQL和目标端Greenplum 分别添加 截屏2022-01-10 下午3.22.57.png

任务创建

  • 任务管理->任务创建
  • 选择 目标 数据源
  • 选择 数据同步,不勾选 全量数据初始化, DDL 选择 不同步,并且选择 不自动启动任务(为了修改参数) 截屏2022-01-10 下午3.25.29.png
  • 选择需要做历史同步的表 截屏2022-01-10 下午3.28.25.png
  • 选择列,默认全选
  • 确认创建
  • 查看异步任务,确认创建步骤正常 截屏2022-01-10 下午3.31.18.png
  • 进入 任务详情->修改参数->目标DATASOURCE配置,修改参数 incrementHistory 为 true 截屏2022-01-10 下午3.32.39.png
  • 启动任务,并正常同步 截屏2022-01-10 下午3.36.13.png

验证历史数据

  • 手动造几条增删改,查看历史数据效果 截屏2022-01-10 下午3.49.38.png 截屏2022-01-10 下午3.50.12.png
  • 再手动增删改几条数据,重启任务,查看幂等效果 截屏2022-01-10 下午3.54.41.png 截屏2022-01-10 下午3.54.47.png

常见问题

是否支持其他链路

暂时没有支持,并且目前源端强依赖MySQL GTID,以防止主备切换等情况,如果源端是PostgreSQL,SQLServer,Oracle 可以选择对应事务的 lsn 或 scn 作为替代。

总结

本文简单介绍了如何使用 CloudCanal history功能精确构建数据变更历史。

主流 RDB 到 Kudu 数据同步

· 阅读需 6 分钟
John Li
John Li
Chief Executive Officer

简述

Kudu 是 Cloudera 开源的新型列式存储系统,是 Apache Hadoop 生态圈的成员之一。它专门为了对快速变化的数据进行快速的分析,填补了以往Hadoop 存储层的空缺。

本篇文章将会介绍几种数据数据同步到 Kudu 的方案选择,然后从功能和使用角度介绍 CloudCanal 如何帮助我们解决数据实时同步到 Kudu。

几种方案

Kudu 是一个存储层组建,若要同步数据到 Kudu 的可以有三种选择

  • 基于RDB)选用类似 DataX 这类开源中间件,同时 Kudu 搭建上层 SQL 引擎。可选的 SQL 引擎有:Hive、Impala
  • 基于MQ)选用 Kafka 生态,基于 Kafka + Flume + Kudu 实现数据同步。
  • 基于编码)开发对应的数据迁移程序,直接将数据写入 Kudu 存储。

如何选择?

基于RDB方案

  • 需要 Kudu 上层有一个 SQL 引擎。在官方文档上提到了两种使用Kudu 的方式
    • Hive 是基于 MapReduce 架构、Impala 是基于 MPP 架构
  • 单纯的数据写入并不需要复杂的计算逻辑,选择 MPP 架构显然更适合一些。
    • 这种情况下推荐 Impala + Kudu + DataX

基于MQ方案

  • 比较适合源端数据变化有多个不同的消费者,数据同步仅仅是其中一条链路。

基于编码方案

  • 比较适合在数据同步过程中需要对数据进行加工。

同步的技术点

建表

  • 对于 Kudu 上的建表需要通过 kudu-client 来进行,例如:
List<ColumnSchema> columns = new ArrayList(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
List<String> rangeKeys = new ArrayList<>();
rangeKeys.add("key");
Schema schema = new Schema(columns);
client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys));
  • Kudu 由于存储引擎限制,每张表必须要指定 Partition Column。
  • 被设置为 Partition 的列不允许 update,如若修改 Partition Column 列的值。需要删除整行数据重新插入。

数据写入

  • Kudu 数据写入 Kudu 支持 InsertUpdateDeleteUpsert 四种操作
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
session.setTimeoutMillis(60000);
for (int i = 0; i < 3; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addInt(0, i);
row.addString(1, "value " + i);
session.apply(insert);
}
session.flush();
session.close();

数据类型

  • Kudu 1.6 之前,不支持 Decimal。因此源端数据类型如果是浮点数只可以选择:float、double、int、string 来承载
  • Kudu 1.15 开始,提供了 varchar 类型可以与 数据库的 varchar 相互对应。在此之前应当选择 string 类型。

基于 CloudCanal 快速实现数据同步

前面介绍的三种方式中 RDB方案要求增加配置 SQL 引擎、MQ 方案 则要求增加 Kafka 和 Flume。它们的数据同步链路都较长一旦出现问题,比较不容易排查原因所在。CloudCanal 是采用了第三种方式。

  • 从源端直接订阅数据变更
  • 通过 kudu-client 直接将数据写入到 Kudu 存储引擎
  • 整个过程无需 MQ 或 SQL 引擎的支持。

对比前面两种方式具有下列几个优势

  • 资源开销小
  • 链路简单高效
  • 产品化操作便捷

举个"栗子"

准备 CloudCanal

配置 Host

  • 由于 Kudu Client 通信时会通过链接串配置的 master 地址获取到所有 Tablet Server 节点信息,这些节点信息通常是 host 名。因此 sidecar 无法直接解析,需要通过 Host 配置文件来指定。
  • (Docker 方式)配置 hosts 时候需要在 sidecar 容器中进行
  • (Tar)版本部署,直接配置物理机 host 文件
# 比如 笔者机器的配置是如下的
192.168.0.244 kudu001
192.168.0.245 kudu002
192.168.0.246 kudu003

192.168.0.176 cdh6-1
192.168.0.248 cdh6-2
192.168.0.254 cdh6-3
192.168.0.247 cdh6-4

添加数据源

  • 登录 CloudCanal 平台
  • 数据源管理 -> 添加数据源
  • 选择 自建数据源 ,并填写相关数据库信息,其中 网络地址 请按提示带上端口号 3a8f9a9c-b778-4dd5-a509-4032691b37a5-image.png
  • 这里需要有两点提示
    • kudu-client 默认访问的是 7051 端口,并非提供 Web 界面的 8051
    • 如果 Kudu 是集群化部署,那么在配置网络地址时需要填写集群 ip:port 列表。逗号间隔。

同步任务

  • 任务管理管理 -> 创建任务
    71e902c8-dae2-466f-ba43-4f0fac04314c-image.png

  • 在创建任务的时候,默认高级选项中会要求至少 3副本。可以根据 Kudu 集群情况来修改对应配置。

  • 任务类型环节选择 数据同步 fd4e3aec-71fb-4c3e-93e1-4a48e1111f8a-image.png

  • 在表选择环节可以选择要同步的源端表以及同步的动作 9fbd24b3-e7c6-4a49-8574-ee589fe5f864-image.png

  • 最后到确认环节创建任务即可。 3d7cd041-9fcc-4305-acc6-b73263e2d60f-image.png

  • 在结构迁移任务跑完之后,就可以看到 Kudu 控制台上已经把对应的 Kudu 表全部创建完毕 53f2f62b-6b2b-4beb-ad1f-dbe9ae19fbe9-image.png

在最后举例使用 Impala 来查询位于 Kudu 中的数据,如下是建表语句:

CREATE EXTERNAL TABLE `canal_test_case_column_default` STORED AS KUDU
TBLPROPERTIES(
'kudu.table_name' = 'my-j52hri3880d6dka.canal_test_case.column_default',
'kudu.master_addresses' = '192.168.0.254:7051')
  • 任务在进入增量同步之后尝试对源端就会发现数据已经实时的进入到 Kudu 啦,此时通过 SQL 查询 Impala 上的 Kudu 表就可以看到数据 072225fc-562f-4313-a1f8-3e4caa3b0cd9-image.png

能力和限制

  • 支持 MySQL、PostgreSQL、Oracle 作为源端到 Kudu 的同步
  • 支持主键变更同步,转换为 Kudu 删除在插入
  • 对端 Kudu 要求 1.6 版本
  • 不支持源端 无主键表
  • 不支持 DDL 增量同步

总结

本文简单介绍了几种将数据同步到 Kudu 的方式,以及基于 CloudCanal 是如何实现实时同步数据到 Kudu。

双向数据同步(进阶)

· 阅读需 5 分钟
John Li
John Li
Chief Executive Officer

简述

本文主要介绍 CloudCanal MySQL 双向同步新版本功能,特点包括:

  • 不依赖 GTID
  • 不依赖事务的顺序,可并行
  • 对端操作减少
  • 对云数据库(MySQL)的普遍支持
  • 支持库表列裁剪、映射以及自定义数据处理

技术点

防冲突标记

GTID 防冲突标记包含 MySQL 的 server_uuid 和事务号,新方案我们选择 binlog 数据标记。

DML 在 Binlog 中正常的事件顺序依次为 QueryEvent(TxBegin)、TableMapEvent、WriteRowEvent(IUD)、QueryEvent(TxEnd) , 如果需要对同步的数据打上标记,除非对 WriteRowEvent 做标记,否则没有可下手的地方。但是要达成这个目标,以我们的认知来看,除非改引擎代码。

然后,我们盯上了 MySQL binlog 中一个描述变更行 statement 的事件 RowsQueryLogEvent ,这个事件只在打开 MySQL binlog_rows_query_log_events 参数才会出现,而这个事件可以带上执行 sql 的注释。

为此,我们设计了 CloudCanal 写入对端时自动带上 /*ccw*/ 标记,这样在 RowsQueryLogEvent 的 sql 语句中也会出现该标记,在双向同步中,如果遇到这个标记,过滤即可。

新的 DML 事件顺序变成了 QueryEvent(TxBegin)、TableMapEvent、RowsQueryLogEvent、WriteRowEvent(IUD)、QueryEvent(TxEnd)。

操作示例

准备 CloudCanal

添加数据源

  • 本案例采用 阿里云 RDS for MySQL, 为测试便利起见,2台数据库都位于 hangzhou 区域
  • 在 RDS 实例详情->参数设置,设置 binlog_rows_query_log_events 为 on
  • 登录 CloudCanal 平台 ,数据源管理 -> 添加数据源 , 将准备的数据库逐步添加进来
  • 建议对数据源进行描述修改,防止配置正反链路时,识别错数据库 loop_sync_1

创建正向同步任务

  • 任务管理->新建任务

  • 双向同步中,正向任务一般指源端有数据,目标端无数据的链路,涉及对端数据初始化

  • 第一个页面,选择源端和目标端数据源和相关信息,点击下一步 loop_sync_2

  • 第二个页面

    • 选择 数据同步,并且勾选 全量数据初始化
    • 勾选 DDL 同步
    • 置灰自动启动,以便创建任务后设置双向同步参数
    • 点击 下一步

    loop_sync_3

  • 第三个、第四个页面,表、列映射裁剪...此处省略,点击 下一步

  • 第五个页面

    • 点击确认创建 loop_sync_4
  • 任务详情 -> 参数设置

    • 设置目标数据源配置 deCycle 参数为 true
    • 此处和 GTID 方案有较大差别, 不需要开启 enableTransaction 和 gtidMode
    • 生效配置并启动

    截屏2021-10-28 下午2.26.17.png

创建反向同步任务

  • 任务管理->新建任务

  • 第一个页面,选择源端和目标端选择数据源(请和正向任务所选数据源对调)和相关信息,点击下一步 loop_sync_5

  • 第二个页面

    • 选择 数据同步,并去除全量数据初始化勾选
    • 勾选 DDL 同步
    • 置灰自动启动,以便创建任务后设置双向同步参数
    • 点击 下一步

    loop_sync_6

  • 第三、四个页面,表、列映射裁剪...此处省略,点击 下一步

  • 第五个页面,点击确认创建

  • 任务详情 -> 参数设置

    • 设置目标数据源配置 deCycle 参数为 true
    • 此处和 GTID 方案有较大差别, 不需要开启 enableTransaction 和 gtidMode
    • 生效配置并启动

    loop_sync_7

  • 任务正常运行 loop_sync_8

测试

  • 源端数据库做数据变更,正向任务监控有变更,反向任务没有(即无循环) loop_sync_9 loop_sync_10 loop_sync_11

  • 目标端数据库做数据变更,反向任务监控有变更,正向任务没有(即无循环) loop_sync_12 loop_sync_13 loop_sync_14

常见问题

新方案有什么不利因素

需要修改 MySQL 全局变量 binlog_rows_query_log_events 为 on ,这个参数默认是关闭的,相比 GTID 普遍打开,这是不利因素。

再者 , binlog 增长相对快速,可能带来磁盘增长烦恼,清理 binlog 周期会变短。

最后,对于 CloudCanal 而言,增加了语句文本的内存占用,导致资源损耗加大。

不过相对于新方案带来的好处-性能和稳定性大幅度提升,我们认为这些损失是值得的。

新方案除了 MySQL->MySQL 还支持哪些链路?

其他数据源是否存在 DML 语句或者数据可标记,我们还没来得及深入研究,不过这个实现方向(数据打标)我们认为是比较友好的。

总结

本文简单介绍了如何使用 CloudCanal 构建 MySQL->MySQL 双向同步链路新功能。

主流 RDB 到 Flink 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

实时数据处理领域中,使用 Flink 方式,除了从日志服务订阅埋点数据外,总离不开从关系型数据库订阅并处理相关业务数据,这时就需要监测并捕获数据库增量数据,将变更按发生的顺序写入到消息中间件以供计算(或消费)。

本文主要介绍如何通过 CloudCanal 快速构建一条高效稳定运行的 MySQL -> Kafka -> Flink 数据同步链路。

技术点

兼容多种常见消息结构

CloudCanal 目前支持 Debezium Envelope (新增)CanalAliyun DTS Avro 等多种流行消息结构,对数据下游消费比较友好。 本次对 Debezium Envelope 消息格式的支持,我们采用了一种轻量的方式做到完全兼容,充分利用 CloudCanal 增量组件,扩展数据序列化器 (EnvelopDeserialize),得到 Envelop 消息并发送到 Kafka 中。 其中 Envelop 的消息结构分为 PayloadSchema 两部分

  • Payload:存储具体数据
  • Schema:定义 Payload 的解析格式 (默认关闭)
{
"payload":{
"after":{
"column_1":"3",
...
},
"before":null,
"op":"c",
"source":{
"db":"kafka_test",
"table":"new_table"
"pos":110341861,
"ts_ms":1659614884026,
...
},
"ts_ms":1659614884026
},
"schema":{
"fields":[
{
"field":"after",
"fields":[
{
"field":"column_1",
"isPK":true,
"jdbType":4,
"type":"int(11)"
},
...
],
"type":"struct"
},
...
],
"type":"struct"
}
}

高度可视化的CDC

CDC 工具如 FlinkCDCMaxwellDebezium ... 各有特色,CloudCanal 相对这些产品,最大的特点是高度可视化,自动化,下表针对目标端为Kafka 的 CDC 简要做了一些对比。

CloudCanalFlinkCDCMaxwell
产品化完备基础
同步对象配置可视化代码配置文件
封装格式多种常用格式自定义JSON
高可用
数据初始化(snapshot)实例级实例级单表
源端支持ORACLE,MySQL,SQLServer,MongoDB,PostgreSQL...ORACLE,MySQL,SQLServer,MongoDB,PostgreSQL...MySQL

CloudCanal 在平衡性能的基础上,提供多种关系型数据源的同步,以及反向同步。

提供便捷的可视化操作、轻巧的数据源添加、轻便的参数配置。

提供多种常见的消息格式,仅仅通过鼠标点击,就可以使用其他 CDC 的消息格式的传输,让数据处理变的异常的快捷、方便。

其中经过我们在相同环境的测试下, CloudCanal 在高写入的 MySQL 场景中,处理数据的效率表现的很出色,后续我们会继续对 CloudCanal 进行优化,提升整体的性能。

综上,相比与类似的 CDC 产品来说,CloudCanal 简单轻巧并集成一体化的操作占据了很大的优势。

Flink 流式计算中不仅要订阅日志服务器的日志埋点信息,同样需要业务数据库中的信息,通过 CDC 工具订阅数据,能减少查询对业务数据库产生的压力还能以流的形式传输,方便与日志服务器中的数据进行关联处理。

实际开发中,可以将业务数据库中的信息提取过滤之后动态的放入 Hbase 中作为维度数据,方便相关联的宽表进行关联查询。

也可以对数据进行开窗、分组、聚合,同样也可以下沉到其他的 Kafka 消费者组中,实现数据的分层。 image.png

操作示例

前置条件

  • 本例使用 Envelop 消息格式,关系型数据库 MySQL 为示例,展示 MySQL 对接 Flink 的 Demo

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档

  • 准备好 1 个 MySQL 实例,1 个 Kafka 实例(本例使用自己搭建的 MySQL 5.6,阿里云 Kafka 2.2)

  • 准备好 Flink 消费端程序,配置好相关信息:flink-demo 下载

  • 登录 CloudCanal 平台,添加 Kafka,MySQL 截屏2022-08-17 17.12.13.png

  • Kafka 自定义一个主题 topic_1,并创建一条 MySQL -> Kafka 链路作为增量数据来源

任务创建

  • 首先配置 FlinkDemo程序的阿里云 Kafka 相关信息 截屏2022-08-17 17.09.12.png

  • 运行 FlinkDemo 程序,等待消费 MySQL 同步 Kafka 的数据(程序不要关闭) 截屏2022-08-17 17.08.50.png

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库,并选择 DebeziumEnvelope 消息格式,和 topic_1 主题(在阿里云里提前创建) 截屏2022-08-17 17.08.18.png

  • 选择 数据同步,不勾选 全量数据初始化,其他选项默认 截屏2022-08-17 17.07.46.png

  • 选择需要迁移同步的表 table1和对应的 Kafka 主题 topic_1 截屏2022-08-17 17.07.19.png

  • 持续点击下一步,并创建出数据同步任务。

  • MySQL生成数据,MySQL-> Kafka(topic_1) -> Flink
  • FlinkDemo 接收到 Kafka(topic_1) 数据,下沉到 topic_2 主题,打印并输出;这里 Flink 程序可以做更多的流式计算的操作,FlinkDemo 只是演示了最基本的数据传输案例截屏2022-08-17 17.10.05.png

常见问题

还支持哪些源端数据源呢?

目前开放 MySQL、Oracle,SQLServer,Postgres,MongoDB 到 Kafka,如果各位有需求,可以在社区反馈给我们。

支持 DDL 消息同步吗?

目前 关系型数据到 kafka 是支持 DDL 消息的同步的,可以将 关系型数据库 DDL 的变化同步到 Kafka 当中。

总结

本文简单介绍了如何使用 CloudCanal 进行 MySQL -> Kafka -> Flink 数据迁移同步。

主流数据库到 OceanBase 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

CloudCanal 2.2.0.7 版本开始支持 OceanBase 作为对端的数据迁移同步能力

本文通过 MySQL->OceanBase的数据迁移同步案例简要介绍这个源端的能力。链路特点:

  • 结构迁移、全量迁移、增量同步(数据)
  • 流程全自动化
  • 高度产品化:任务管理、监控、审计一应俱全

使用须知

  • 仅支持 OceanBases MySQL 模式
  • 支持的源端数据源类型为 Oracle/PostgreSQL/MySQL,本文主要以 MySQL 源端为例说明使用方法。
  • DDL同步当前仅支持 MySQL->OceanBase

技术点

面向在线业务的编辑订阅能力

数据长周期增量同步过程中,常有订阅表增减的情况,CloudCanal 编辑订阅 能力,可在原有任务基础上进行变更。其中新增表会产生一个子任务,自动完成数据全量迁移和增量同步,然后和原有主任务合并,自动完成整个过程。 截屏2022-03-04 上午10.47.57.png

全自动化

CloudCanal 自动帮用户完成 结构迁移全量数据迁移增量数据同步,大大提升创建数据同步任务的效率。

自定义代码加工

CloudCanal 允许用户添加自定义代码处理数据,应用场景包括数据清洗数据脱敏宽表构建新系统数据库重构等。可参考文章《5分钟搞定 MySQL 到 ElasticSearch 宽表构建和同步》 以了解基本使用。

库表列裁剪映射

CloudCanal 提供了数据迁移同步中常用的产品化能力-在库、表、列等级别进行裁剪和映射,有效提升数据迁移同步任务的适配性。 截屏2022-03-04 上午10.44.41.png

断点续传

CloudCanal 支持迁移和同步任务的断点续传,通过定期记录的位点,让任务重启后自动从上一次位点开始继续迁移或同步。

操作示例

添加数据源

  • 登录 CloudCanal 平台

  • 选择 数据源管理->新增数据源

  • 选择 自建数据库中的OceanBase 截屏2022-03-04 上午10.50.05.png

    截屏2022-03-04 上午10.51.28.png

创建任务

  • 任务管理->任务创建

  • 选择 源 和 目标 数据库

  • 点击 下一步 截屏2022-03-04 上午10.52.47.png

  • 选择 增量同步,并且启用 全量数据初始化

  • 点击下一步 截屏2022-03-04 上午10.55.35.png

  • 选择订阅的表,结构迁移自动创建的表会按照默认类型映射进行处理。对端表如果已经提前建好,这里也可以直接映射对端已经存在的表

  • 点击下一步 截屏2022-03-04 上午10.56.54.png

  • 配置列映射、点击下一步 截屏2022-03-04 上午10.58.23.png

    如果是通过 CloudCanal 结构迁移自动建表,这边不允许重命名、裁剪以及列映射; 如果映射的是对端已经提前建好的表,这边支持列的裁剪和映射

  • 创建任务 截屏2022-03-04 上午10.59.39.png

  • 查看任务状态。任务创建后,会自动完成结构迁移、全量、增量阶段。 截屏2022-03-04 上午11.01.03.png

总结

本文简单介绍了如何使用 CloudCanal 进行 MySQL 到OceanBase 的数据迁移同步。

OceanBase 到主流数据库数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

CloudCanal 2.2.2.1 版本开始支持 OceanBase 作为对端的数据迁移同步能力.

本文通过 OceanBase->OceanBase的数据迁移同步案例简要介绍这个源端的能力。链路特点:

  • 结构迁移、全量迁移、增量同步(数据)
  • 流程全自动化
  • 高度产品化:任务管理、监控、审计一应俱全

使用须知

  • 仅支持 OceanBases MySQL 模式
  • 支持OceanBase 3.x版本
  • 支持的对端数据源类型为 OceanBase/StarRocks/MySQL,本文主要以 OceanBase 对端为例说明使用方法。
  • DDL同步当前仅支持 MySQL->OceanBase和OceanBase->OceanBase。开启方式为创建任务的时候设置同步DDL,并且在任务参数writeParallel中设置目标端执行并行度为1(执行DDL需串行避免写入异常)
  • 安装的oblogproxy默认保存增量24小时,如果需要调整,可以调整oblogproxy的参数log_clean_cycle_time_in_hours
  • 现在ob log proxy client不支持clientID复用,重启增量任务会启动新的client,ob log proxy所在机器会有较多磁盘占用,请留意或者定时清理
  • 当前支持的oblog proxy可以通过如下命令安装
docker run --name oblogproxy --net=host   -e OB_SYS_USERNAME=密文   -e OB_SYS_PASSWORD=密文   -d whhe/oblogproxy

技术点

面向在线业务的编辑订阅能力

数据长周期增量同步过程中,常有订阅表增减的情况,CloudCanal 编辑订阅 能力,可在原有任务基础上进行变更。其中新增表会产生一个子任务,自动完成数据全量迁移和增量同步,然后和原有主任务合并,自动完成整个过程。 截屏2022-03-04 上午10.47.57.png

全自动化

CloudCanal 自动帮用户完成 结构迁移全量数据迁移增量数据同步,大大提升创建数据同步任务的效率。

自定义代码加工

CloudCanal 允许用户添加自定义代码处理数据,应用场景包括数据清洗数据脱敏宽表构建新系统数据库重构等。可参考文章《5分钟搞定 MySQL 到 ElasticSearch 宽表构建和同步》 以了解基本使用。

库表列裁剪映射

CloudCanal 提供了数据迁移同步中常用的产品化能力-在库、表、列等级别进行裁剪和映射,有效提升数据迁移同步任务的适配性。 截屏2022-03-04 上午10.44.41.png

断点续传

CloudCanal 支持迁移和同步任务的断点续传,通过定期记录的位点,让任务重启后自动从上一次位点开始继续迁移或同步。

准备工作

添加数据源

  • 登录 CloudCanal 平台

  • 选择 数据源管理->新增数据源

  • 选择 自建数据库中的OceanBase 截屏2022-03-04 上午10.50.05.png

    8b6cf03f-dfba-494e-8057-e344d5a60036-image.png

信息

添加 OceanBase 数据源请提前安装好 oceanbase log proxy 用于订阅增量。

创建任务

  • 任务管理->任务创建

  • 选择 源 和 目标 数据库

  • 点击 下一步 09de85cb-04de-41d2-9d2b-d0651f631a0a-image.png

  • 选择 增量同步,并且启用 全量数据初始化

  • 点击下一步 截屏2022-03-04 上午10.55.35.png

  • 选择订阅的表,结构迁移自动创建的表会按照默认类型映射进行处理。对端表如果已经提前建好,这里也可以直接映射对端已经存在的表

  • 点击下一步 截屏2022-03-04 上午10.56.54.png

  • 配置列映射、点击下一步 截屏2022-03-04 上午10.58.23.png

    如果是通过 CloudCanal 结构迁移自动建表,这边不允许重命名、裁剪以及列映射; 如果映射的是对端已经提前建好的表,这边支持列的裁剪和映射

  • 创建任务 截屏2022-03-04 上午10.59.39.png

  • 查看任务状态。任务创建后,会自动完成结构迁移、全量、增量阶段。 7bfb7478-aa5d-4b86-823c-3ad377a9219b-image.png

总结

本文简单介绍了如何使用 CloudCanal 进行 OceanBase 到 OceanBase 的数据迁移同步。

跨互联网数据互通(RocketMQ)

· 阅读需 2 分钟
John Li
John Li
Chief Executive Officer

操作示例

准备 CloudCanal

添加数据源

  • 本案例采用 MySQL 和阿里云 RocketMQ
  • 登录 CloudCanal 平台 ,数据源管理 -> 添加数据源 , 将准备的数据库逐步添加进来
  • 建议对数据源进行描述修改,防止配置正反链路时,识别错数据库 RocketMQ 的 topic 创建要选择分区顺序消息 53bdb8eb-d4cc-45cb-b114-ea8fb0dc7755-image.png

创建 MySQL->RocketMQ 同步任务

  • 任务管理->新建任务

  • 任务同步一般指源端有数据,目标端无数据的链路,涉及对端数据初始化

  • 第一个页面 ·选择源端和目标端数据源和相关信息,点击下一步

  • 第二个页面 ·选择全量/增量等功能配置相关信息,点击下一步

  • 第三个、第四个页面 ·表、列映射裁剪…此处省略 ·点击 下一步

  • 确认创建 696865b5-1121-4ba4-a4e5-8617302148cf-image.png

  • 任务详情 -> 参数设置 ·设置目标 DATASOURCE 参数配置 nameServer 参数为阿里云RocketMQ 的 HTTP 协议客户端公网访问接入点 ·设置目标 DATASOURCE 参数配置 protocol 为 HTTP ·生效配置并启动 8d7ed86c-d152-4e29-a6da-3f84c25135f0-image.png

创建RocketMQ->MySQL同步任务

  • 任务管理->新建任务

  • 任务同步一般指源端有数据,目标端无数据的链路,涉及对端数据初始化

  • 第一个页面 ·选择源端和目标端数据源和相关信息,点击下一步

  • 第二个页面 ·选择全量/增量等功能配置相关信息,点击下一步

  • 第三个、第四个页面 ·表、列映射裁剪…此处省略 ·点击 下一步

  • 确认创建 1473637d-5475-4061-baf6-5571fd74e9f1-image.png

  • 任务详情 -> 参数设置 ·设置源端 DATASOURCE 参数配置 nameServer 参数为阿里云RocketMQ 的 HTTP 协议客户端公网访问接入点 ·设置源端 DATASOURCE 参数配置 protocol 为 HTTP ·生效配置并启动 dfb7282d-31b4-4db2-9598-a057106dbcb0-image.png

跨互联网数据互通 (Kafka)

· 阅读需 7 分钟
John Li
John Li
Chief Executive Officer

简介

本文主要介绍如何使用 CloudCanal 快速构建安全的混合云在线数据生态。

此方案具有以下几个特点

  • 采用 Kafka 做数据中转
  • 双边数据库都不开公网端口
  • 互联网数据经过 SSL 加密
  • 数据出口经过用户名、密码验证,并设置 ip 白名单加强安全管控

例子中的云数据库、云消息产品、自建数据库等都可以替换成自己当前环境的自建资源或各种云资源。

技术点

混合云数据生态主要的难点在于 网络安全 ,部分用户因为传输同步数据较多,也比较在意流量资源损耗。

基于 CloudCanal 实现的方案,更加关注 网络安全 层面的问题,做到敏感资源 网络单向隔离链接鉴权传输加密

下图简要示例了下 互联网模式专线模式 数据上下云。 截屏2021-08-04 下午7.48.27.png

举个"栗子"

本文案例主要演示 互联网模式 的跨云数据迁移和同步,具体场景是如何进行数据上云(自建机房数据库-> 阿里云云数据库),并且长期维持混合云数据体系。当然,相同的方案也可以反过来使用,只是在数据源选择外网内网有所区别。

安装 CloudCanal

准备 Kafka

  • 阿里云 Kafka 购买页 购买相应的 Kafka , 验证能力可先购买按量实例 购买kafka_new.png
    • 注意选择 公网/VPC实例,并且选择稍大的公网流量
  • 部署实例请选择 2.x.x 版本,最大消息大小 建议调整稍大些(比如 4MB) 部署kafka.png
  • 进入实例,创建 Consumer Group,并记下名称 创建consumer_group.png

子账号授权并添加数据源

  • 按照 阿里云子账号准备 文档,创建或者授权子账号,并记住子账号 AK 和 SK , 请授予基本的数据库访问权限 AliyunRDSFullAccess,AliyunKafkaFullAccess

  • 分别添加云下自建数据库阿里云 RDS for MySQL阿里云 Kafka 添加数据源.png

    • 添加阿里云资源时,请在第二步选择自动添加迁移机器白名单
    • Kafka 用户名密码可以在 阿里云 Kafka 实例详情页最底下安全配置处找到
    • TLS 文件请从 阿里云Kafka根证书下载

开始造数据

  • 源端数据库为云下自建数据库,IUD 20:60:20, 12 KB/条数据, 每个表 24 并发, 每个事务 2~4 条变更, RPS 1000 左右。 造数据.png

使用 CloudCanal 创建云下数据同步任务

  • 选择数据源,并选择合适的选项 创建云下任务_1.png
    • 1 处请选择云下或自建集群
    • 2 ,3 源端自建数据库选择内网,对端 Kafka 选择公网
    • 4 处可以选择兼容 开源 Canal 消息格式,或自带的 CloudCanal 消息格式
  • 选择数据同步,并勾选初始化数据 创建云下任务_2.png
  • 选择表,此处不要修改对端 topic,按照默认规则生成即可。 创建云下任务_3.png
  • 选择列,可以裁剪掉一部分列不进行迁移同步 创建云下任务_4.png
  • 创建确认 创建云下任务_5.png
  • 任务正常流转运行中 云下任务运行中.png

使用 CloudCanal 创建云上迁移同步任务

  • 选择数据源,并选择合适的选项 创建云上任务_1.png
    • 1 请选择 ECS 上客户端所在集群
    • 2,3 都选择内网分别访问 Kafka 和 RDS for MySQL
    • 4 填写之前在 Aliyun Kafka 控制台创建的 Consumer Group
    • 5 选择和云下任务一致的消息格式
  • 中间略过表、列选择,一路点击下一步即可
  • 创建确认 创建云上任务_5.png
  • 两者任务正常运行中 任务正常运行.png
  • 云上消费任务创建因为是在云下任务运行之后,所以需要将云上消费任务位点回溯到云下任务创建之前,以涵盖全部数据。 重置云上任务位点_1.png 重置云上任务位点_2.png

校验下数据

此次案例数据较多,我们偷个懒,直接打开 RDS for MySQL 公网链接,用云下 CloudCanal 集群链接过来直接做一个数据校验 (生产环境禁止!!!!)

  • 为了让校验结果更加清晰,停止造数据
  • 申请 RDS for MySQL 公网地址,并修改 CloudCanal 数据源管理页面对应实例的公网地址 RDS开公网.png
  • 创建校验任务 创建数据校验任务_1.png
    • 1 选择云下或本地集群
    • 2 ,3 源端自建 MySQL 选择内网访问,目标端 RDS for MySQL 选择外网访问
  • 选择任务类型为数据校验 创建数据校验任务_2.png
  • 中间略过表、列选择,一路点击即可
  • 创建确认 创建数据校验任务_5.png
  • 任务运行完毕,结果正确 创建数据校验任务_6.png

总结

本文简要介绍如何使用 CloudCanal 快速构建一条安全、跨互联网数据迁移同步方案。

此方案有以下特点:

  • 双边数据库都不开公网端口
  • 互联网数据经过 SSL 加密
  • 数据出口经过用户名、密码验证,并设置 ip 白名单加强安全管控

MongoDB 到 MongoDB 数据同步

· 阅读需 3 分钟
John Li
John Li
Chief Executive Officer

简述

MongoDB 是一种广泛使用的文档型数据库,对于 schema 要求低、可扩展性强,让其在很多场景普遍适用。

本文主要介绍如何使用 CloudCanal 快速构建一条稳定高效运行的 MongoDB 到 MongoDB 数据同步链路。示例中 MongoDB 均为副本集(Replica Set)。

技术点

MongoDB 源端增量技术

CloudCanal 通过 local 库的 oplog.rs 集合(collection)获取增量变更数据(需要搭配副本集),其中事件包含以下子文档(不同版本 MongoDB 有些许差异)。CloudCanal 通过解析事件记录同步增量数据。

子文档名称数据含义
op操作类型,CloudCanal 支持的类型包括 c (控制操作) i (INSERT) u (UPDATE) d (DELETE)
ns命名空间(namespace),格式为 dbName.collectionName,其中 collectionName 可以为 $cmd,表示在对应数据库上的操作
ts执行操作的时间戳,单位秒
o变更的数据,对应 INSERT/UPDATE 后镜像数据,DELETE 为前镜像数据
需要注意的是,MongoDB 4.x 版本和其他版本在这个文档有所不一样
o2只在 UPDATE 事件中有值,可以理解为主键或者定位数据的标识符

CloudCanal 支持 MongoDB 分片模式及副本集数据同步,兼容 MongoDB 最高至 8.x 版本。

MongoDB 的数据类型支持

无论直接从 MongoDB 做全量迁移,还是消费 oplog 进行增量同步,类型转换对自定义代码处理和下游数据源写入都有重要意义。CloudCanal 从 2021 年 8 月份支持 MongoDB 开始,经历多个版本迭代,逐步丰富了对 MongoDB 数据类型的支持。

从 MongoDB 全量读取支持的类型包括: null、ObjectId、Date、Number、String

从 MongoDB oplog 增量同步支持的类型包括:ObjectId、Date、Number、String、Integer、Long、BigInteger、Double、BigDecimal

随着越来越多用户使用 CloudCanal,支持的数据类型还在不断扩展中。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源,并分别点击 测试连接

  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的集合。

  5. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    MongoDB 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  6. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: MongoDB 源端的集合定义将会迁移到对端,如果同名集合在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 MongoDB 到 MongoDB 数据迁移同步。

PostgreSQL 到 Doris 数据同步

· 阅读需 4 分钟
John Li
John Li
Chief Executive Officer

简述

Apache Doris 是一个现代化的 MPP 分析型数据库产品,仅需 亚秒级 响应时间即可获得查询结果,能有效地支持实时数据分析

本文主要介绍如何使用 CloudCanal 快速构建一条稳定高效运行的 PostgreSQLDoris 数据同步链路。

技术点

基于 StreamLoad 的导入方式

Doris 提供了多种导入方式。CloudCanal 采用了 StreamLoad 的方式进行导入,源端的消息会转成字节流,最后会以 Batch 的形式通过 HTTP 协议发往 Doris

相比直接通过 SQL 写入的方式,StreamLoad 方式会有更好的性能,写入的数据直接经 FE 转发给 BE 处理。如果直接采用 SQL 写入,在 FE 侧,会有额外的 SQL 解析开销。

CloudCanal 默认采用 json 格式来进行StreamLoad导入,如果用户内容特殊字符较较少,也可以开启 csv 格式导入,分隔符可以通过参数 columnSeparatorlineSeparator 设置。

基于 StreamLoad 的写入方式,实际写入对端的操作均为 INSERT,CloudCanal 同步时会自动将 UPDATE / DELETE 转成 INSERT 语句,并修改 __op 值。

PG -> Doris 的数据类型支持

CloudCanal 从 2020 年开始支持 PG 同步后就不断的丰富 PG 的对端数据源,支持 PG 到 DORIS 是一个非常重要的数据源扩充。

Doris 可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析等,可以让数据分析工作更加简单高效!

PG 到 DORIS 全量和增量同步,不仅覆盖主流使用的数据类型,对地理信息相关类型也有很好的支持。关于CloudCanal对于地理信息的支持可以参考文章如何利用现代化数据栈高效处理地理信息数据

Doris 关键技术

Doris 内部自行管理数据的多副本自动修复。保证数据的高可用高可靠。在服务器宕机的情况下,服务依然可用,数据也不会丢失。

MySQL 兼容性好,兼容 MySQL 的网络协议,兼容 MySQL 语法

支持 MMP 一条 SQL 如果包含了合并、聚合计算、排序等多种操作;在执行计划的时候,MPP 会将其拆分成多份,分布到每台机器执行,最后再将结果汇总,大大提升了效率

操作示例

前置条件

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档

  • 准备一个 PG 数据库,和 DORIS 实例(本例分别使用自建 PG 12.4 和 Doris 1.0)

  • 登录 CloudCanal 平台 ,添加 PG 和 DORIS image.png

  • 创建一条 PG -> DORIS 链路作为增量数据来源

任务创建

  • 任务管理-> 任务创建

  • 测试链接并选择 目标 数据库

  • 点击下一步 image.png

  • 选择 数据同步,并勾选 全量数据初始化,其他选项默认 image.png

  • 选择需要迁移同步的 image.png image.png

  • 确认创建任务 image.png

  • 任务自动做结构迁移全量迁移增量同步 image.png

校验数据

  • 我们使用程序对源端制造了一些数据 image.png

  • 任务正常运行一段时间后,停止造数据

  • 点击 PG -> DORIS 任务详情功能列表 -> 创建相似任务,在创建任务的第二步选择数据校验 image.png

  • 数据校验 OK image.png

常见问题

支持什么版本的 PG 和 DOIRS ?

目前源端 PG 12.x, 13.x, 14.x 皆可使用 CloudCanal 进行迁移同步,对端 DORIS 支持 1.x 版本,后序将不断扩展源端 PG 的数据类型。

总结

本文简单介绍了如何使用 CloudCanal 进行 PG -> Doris 数据迁移同步。

SQLServer 到 MySQL 数据同步 (一)

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

CloudCanal 2.1.0.x 版本开始支持 SQLServer 作为源端的数据迁移同步能力。

本文通过 SQLServer 到 MySQL 的数据迁移同步案例简要介绍这个源端的能力。链路特点:

  • 结构迁移、全量迁移、增量同步(数据)、数据校验俱全
  • 流程全自动化

技术点

SQLServer开CDC

SQLServer 开启 CDC (change data capture) 首先需要安装并启动 SQL Server Agent , 通过 agent 异步解析其变更日志写到 [目标db].cdc 的一系列表中。

安装并启动 Agent 后, 对 目标db 需要开启 CDC 能力。USE [目标db]切换到目标数据库,再执行EXEC sys.sp_cdc_enable_db;打开这个库的 CDC。 执行完毕后会在 [目标db].cdc schema下出现一系列表。

目标db 开启 CDC 后,还需要开启针对哪些表的 CDC, 命令为

EXEC sys.sp_cdc_enable_table   
@source_schema = N'dbo',
@source_name = N'worker_stats',
@role_name = NULL,
@supports_net_changes = 0;

默认会生成 source_shema_source_name_CT 的表,当然创建过程中也可以直接指定 capture_instance 指定这个表除 _CT 这个后缀之前部分的命名。

以上步骤运维稍显繁琐,但是使用 CloudCanal , 创建任务时都会自动准备好,前提是给到足够权限的账号。

增量 DDL 同步

DDL 对于数据同步的影响在于两方面,如果没有处理好,可能导致数据同步根本无法成立的致命问题。

第一,大部分数据库的增量日志都不带字段类型长度等属性,需要依赖从源端数据库独立获取的元数据进行日志解析,DDL 如果更改了解析日志相关元数据,则需要刷新,否则会导致字段不存在对不齐类型错误等问题,这个刷新需要严格按照事件变更顺序,一般和位点紧相关。

其次,对于上下游需要保证结构一致性的同步链路,DDL 变更则需要准确应用到下游数据库中,这中间步骤包括 DDL 获取、转换、执行等多个步骤,a 种源端 * b 种目标端 * c 种 DDL , 导致全数据库相互同步难度相当高,其中还伴随着 DDL 应用导致的链路延迟,应用失败导致位点回溯等棘手问题。

以上两点,第一点几乎无法回避,第二点可以通过预先做 DDL 执行解决,特别通过平台化方案自动关联执行,相对方便且合理。

CloudCanal 对 SQLServer DDL 同步解决了第一点,但是对于第二点,暂时没有解决,需要手动到对端进行 DDL 变更。

操作示例

前置条件:

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备好 SQLServer 数据库(本例版本为 2016)和 MySQL 数据库(本例版本为 8.0)
  • 在 SQLServer 的准备一些库表和数据(本例使用另外一个 MySQL 迁移同步数据到 SQLServer)

添加数据源

  • 登录 CloudCanal 平台
  • 数据源管理->新增数据源
  • 将源端SQLServer和目标端MySQL 分别添加 截屏2021-12-28 上午11.25.43.png

任务创建

  • 任务管理->任务创建
  • 选择 目标 数据源 截屏2021-12-28 下午12.12.48.png
  • 选择 数据同步,勾选 全量数据初始化, 勾选 DDL 不同步 截屏2021-12-28 下午12.14.38.png
  • 选择需要迁移同步的表
  • 选择列,默认全选
  • 确认创建 截屏2021-12-28 下午12.15.51.png
  • 查看异步任务,确认创建步骤正常 截屏2021-12-28 下午12.16.34.png
  • 任务自动运行 截屏2021-12-28 下午12.17.26.png

校验数据

  • 持续造增量数据,INSERT & UPDATE & DELETE 比例 2:7:1 截屏2021-12-28 下午12.18.12.png
  • 停止增量造数据
  • 创建校验任务,并校验任务结果 截屏2021-12-28 下午12.24.38.png

常见问题

是否支持 DDL 同步

暂时没有支持,不过 cdc schema下存在 ddl_history 表,可能可以结合 lsn 找到具体 DDL 语句,从而支持 DDL 同步。

是否支持其他目标端

最近会支持 Kafka , 其他数据源按具体需求逐步进行开放。

总结

本文简单介绍了如何使用 CloudCanal 进行 SQLServer 到 MySQL 的数据迁移同步。

SQLServer 到 MySQL 数据同步 (二)

· 阅读需 7 分钟
Barry
Chief Technology Officer

SQL Server 是一个值得信赖的老牌数据库系统,自从 1988 年由 Microsoft、Sybase 和 Ashton-Tate 三家公司共同推出之后就一直不断迭代更新。而如今我们提到 SQL Server 通常是指 Microsoft 从 SQL Server 2000 之后的版本。至今 SQL Server 家族已经非常繁茂涵盖了 云上(Azure SQL Server)、IoT 设备(边缘 SQL Server)、以及经典版本(本地 SQL Server)。

实现 SQL Server 作为源端的实时数据同步,一般都会用到它的 CDC 功能,这个功能是从 2008 版本才开始支持。

本文主要也是基于 SQL Server 2008 版本介绍如何使用 CloudCanal 快速构建一条稳定高效运行的 SQL ServerMySQL 数据同步链路。

技术点

基于 SQL Server 的 CDC

image.png SQL Server 将用户的每一个数据操作都记录在后缀为 ldf 日志文件中。这些日志会保存在 ldf 文件中。当数据库启用 CDC 能力后,SQL Server 代理上会生成一个专门分析ldf文件的作业,再将具体的表启用 CDC, 则该作业开始持续分析文件中的变更事件到指定的表中。

作业执行用到 SQL Server 代理,该组件如果处于非启动状态,则生成任何可消费的变更数据。通常,我们可以在 Windows 对象资源管理器中查看是否已经开启了 SQL Server 代理。

image.png

由于 SQL Server 执行作业时无法设置起始位置,因此对于一个表的变更记录我们最早只能追溯到表启用 CDC 的那个时间点。具体的起始位点可以在 “cdc.change_tables” 表中查询得到。

还需要注意的另外一个细节是 CDC 表也是一张普通的表它和用户共享同一个数据空间。为了防止 CDC 表数据无限膨胀 SQL Server 会每天定时执行清理作业,清理过期的数据(具体时间视数据库配置而定)。

SQL Server -> MySQL 的数据类型支持

CloudCanal 从 2021 年开始支持 SQL Server 同步后就不断地丰富它的对端数据源,支持 SQL Server 到 MySQL 是一个非常重要的同步链路。 目前 CloudCanal 已经可以支持的类型和映射关系如下:

SQL Server 类型MySQL 类型备注
BITBIT
DECIMALDECIMAL
NUMERICDECIMAL
SMALLINTSMALLINT
TINYINTTINYINT映射为 tinyint unsigned�
INTINT
BIGINTBIGINT
SMALLMONEYFLOAT
MONEYFLOAT
FLOATFLOAT
REALDOUBLE
DATEDATE
DATETIMEOFFSETDATETIME由于 MySQL 类型限制,会丢弃时区信息同时最多保留 6 位精度
DATETIME2DATETIME由于 MySQL 类型限制,会保留最多 6 位精度
SMALLDATETIMEDATETIME
DATETIMEDATETIME由于 MySQL 类型限制,会保留最多 6 位精度
TIMETIME由于 MySQL 类型限制,会保留最多 6 位精度
CHARCHAR
VARCHARVARCHAR源端 SQL Server 如果为 VARCHAR(MAX),则按照 TEXT 来处理
TEXTTEXT
NCHARCHAR
NVARCHARVARCHAR源端 SQL Server 如果为 NVARCHAR(MAX),则按照 NTEXT� 来处理
NTEXTTEXT
BINARYBINARY
VARBINARYVARBINARY源端 SQL Server 如果为 VARBINARY(MAX),则按照 IMAGE� 来处理
IMAGEBLOB
TIMESTAMPBIGINT会映射为 bigint unsigned
ROWVERSIONBIGINT会映射为 bigint unsigned
HIERARCHYID--暂不支持
UNIQUEIDENTIFIERVARCHAR(36)
SQL_VARIANT--暂不支持
XMLTEXT
GEOMETRY--暂不支持
GEOGRAPHY--暂不支持
SYSNAMEVARCHAR(128)

操作示例

前置条件

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备一个 SQL Server 数据库,和 MySQL 实例(本例分别使用自建 SQL Server 2008 和 MySQL 8.0)
  • 登录 CloudCanal 平台 ,添加 SQL Server 和 MySQL

image.png

  • 创建一条 SQL Server -> MySQL 链路作为增量数据来源

任务创建

  • 任务管理-> 任务创建
  • 测试链接并选择 目标 数据库
  • 点击下一步

image.png

  • 选择 数据同步,并勾选 全量数据初始化,其他选项默认

image.png

  • 此时如果 SQL Server 上数据库还没有启用 CDC 功能,则会在点击下一步的时候提示如何启用 CDC。只要按照提示的参考语句执行即可。

image.png

  • 选择需要迁移同步的

image.png image.png

  • 确认创建任务

image.png

  • 任务自动做结构迁移全量迁移增量同步

image.png

校验数据

  • 程序造数据, SQL Server -> MySQL,在源端以 1:1:1 的比例随机执行Insert、Update、Delete三种类型语句。使用20个线程并发写入变更。
    image.png
  • 任务正常运行一段时间后,停止造数据
  • 点击 SQLServer -> MySQL 任务详情功能列表 -> 创建相似任务,在创建任务的第二步选择数据校验

image.png

  • 数据校验 OK
    • 下面这个是校验结果。如果我们对端和源端一旦出现数据不一致就会像下面这样非常醒目的提示给用户,有多少数据不一致,有多少数据丢失。

image.png

常见问题

支持什么版本的 SQL Server 和 MySQL ?

  • 目前源端 SQL Server 2008 及以上版本皆可使用 CloudCanal 进行迁移同步(推荐使用 SQL Server 2016 或 SQL Server 2008)
  • 对端 MySQL 支持 5.6、5.7、8.0 版本,也可以选用 阿里云 RDS for MySQL 对应的版本,或者其它云服务商的 MySQL 版本

数据不同步了都有哪些情况?

  • SQL Server CDC 需要依赖 SQL Server 代理,首先要确定 SQL Server 代理服务是否启动
  • 表在启动 CDC 的时候会确定要捕获的列清单,此时如果修改列的类型可能会导致 CDC 中断。目前解决办法只能重建任务。
  • 增/减 同一个列名的列,对一个列删除后在增加。虽然 CDC 表中字段依然存在但是也会导致整个 CDC 中断。

什么情况下会影响稳定的数据同步?

  • 如果任务在同步期间出现了异常导致任务延迟。这时候需要格外注意,如果过长时间的延迟,即便是修复了延迟的问题(比如对端数据库长时间出现不可用)在后续数据同步上也可能存在丢失数据的风险。
  • SQL Server 为了防止 CDC 表数据无限膨胀 SQL Server 会每天定时执行清理作业,清理超过 3天的数据。
  • 为了增加延迟的容忍度可以执行这条 SQL 来增加 CDC 数据的保存时间,代价是这些数据需要存放到数据库表中,如果每日数据变更很多对磁盘开销会有额外的要求。
    • execute sys.sp_cdc_change_job @job_type = n'cleanup', @retention = 4320
    • msdb.dbo.cdc_jobs 表中保存了具体 捕获任务的数据保存时间。

总结

本文简单介绍了如何使用 CloudCanal 进行 SQL Server -> MySQL 数据迁移同步。各位读者朋友,如果你觉得还不错,请点赞、评论加转发吧。

ORACLE 到 MySQL 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

CloudCanal 2.1.0.x 版本开始支持 Oracle 作为源端的数据迁移同步能力。

本文通过 Oracle 到 MySQL 的数据迁移同步案例简要介绍这个源端的能力。链路特点:

结构迁移、全量迁移、增量同步(数据)、数据校验俱全流程全自动化

此文章简要介绍如何快速构建一条长期稳定运行的 Oracle->MySQL 数据链路。

技术要点

将数据从 Oracle 中同步出来有两种方式可以选择

  • 物化视图日志
  • 使用 Redo 日志

权限问题

请确保添加的数据源账号可以访问如下 13 张表 ,或者使用一个具有 DBA 权限的 Oracle 账号。

  • 表 SYS.DBA_USERS
  • 表 SYS.DBA_TABLES
  • 表 SYS.DBA_TAB_COLS
  • 表 SYS.DBA_TAB_COMMENTS
  • 表 SYS.DBA_COL_COMMENTS
  • 表 SYS.PRODUCT_COMPONENT_VERSION
  • 表 SYS.DBA_CONSTRAINTS
  • 表 SYS.DBA_CONS_COLUMNS
  • 表 SYS.DBA_INDEXES
  • 表 SYS.DBA_IND_COLUMNS
  • 表 v$version
  • 表 v$database
  • 表 v$tablespace

对于物化视图方案来讲需要有额外的下列权限

  • 语句 CREATE MATERIALIZED VIEW LOG ON xxx
  • 语句 CREATE INDEX xxxx

对于 Redo 方案来将需要有 LOGMNR 相关的权限

  • 表 SYS.ALL_LOG_GROUPS
  • 表 v$logfile
  • 表 v$log
  • 表 v$archived_log
  • 表 v$logmnr_logs
  • 存储过程 SYS.DBMS_LOGMNR_D.BUILD
  • 存储过程 SYS.DBMS_LOGMNR.ADD_LOGFILE
  • 存储过程 SYS.DBMS_LOGMNR.START_LOGMNR
  • 存储过程 SYS.DBMS_LOGMNR.END_LOGMNR
  • 语句 ALTER TABLE xxxx DROP SUPPLEMENTAL LOG xxx
  • 语句 ALTER TABLE xxxx ADD SUPPLEMENTAL LOG xxx
  • 语句 ALTER SYSTEM ARCHIVE LOG CURRENT

在配置同步任务之前需要确保上面的 Oracle 权限,另外作为源端 Oracle 全量阶段还需要读取对应表的权限。

操作示例

准备 CloudCanal

添加数据源

  • 登录 CloudCanal 平台

  • 数据源管理 -> 添加数据源

  • 选择 自建数据源 ,并填写相关数据库信息,其中 网络地址 请按提示带上端口号 9d87303b-45dd-4bd1-bd73-f053b59046cf-image.png

  • 如下已添加完 Oracle 和 MySQL a718b8f7-7961-4a1d-bb41-445ba81c0037-image.png

创建同步任务

  • 任务管理->新建任务

  • 源端选择刚添加的 Oracle 数据源,目标选择 MySQL, 分别点击 测试连接 按钮以测试数据库连通性和获取 schema 级别元信息

  • 点击下一步 1a3d90f2-4efb-49d8-b579-8919e9debca6-image.png

  • 选择 数据同步,并且勾选全量数据初始化

  • 规格可以根据任务重要度以及部署机器的内存容量合理选择,一般 2GB 内存规格即可

  • 点击下一步 a3722630-7625-4485-ad56-3340d3fd904d-image.png

  • 勾选需要同步的表,如果目标表为橙色,表示不存在同名表,任务创建完成后自动进行结构迁移。也可以下拉框选择表进行映射

  • 勾选需要同步的 INSERT/UPDATE/DELETE 操作,默认全选

  • 点击下一步 c70a7c0f-cbd9-4f6a-849c-21c607cf769e-image.png

  • 通过勾选做列映射列裁剪

  • 点击下一步 d20d2906-6ba5-4fea-bd43-f2033aa3509e-image.png

  • 对任务内容进行创建 ,如果任务不需要立刻运行 , 可置灰自动启动任务 按钮

  • 点击确认创建 77f49016-0cec-4d33-93d1-a94b741d788f-image.png

任务同步

  • 任务分为 3 个阶段:结构迁移数据初始化数据同步,每一个阶段完成时,状态自动流转,直到同步稳态 afd38301-a854-4543-b814-8782ed1c094d-image.png c2772d7c-2ab7-470c-8551-280f723221ad-image.png
    • 结构迁移:当对端数据库不存在对应的库表时 CloudCanal 会自动将 Oracle 的表在对端创建出来
    • 数据初始化:将源端所选库表数据以全量迁移方式搬迁到对端
    • 数据同步:准实时的同步增量数据,即源端数据库上发生的增、删、改操作同步到对端数据库上

FAQ

目前 Oracle 源端还支持哪些数据源?

除了 Oracle 到 MySQL 之外,截止社区版 2.0.1.1 版本,还支持下面这些链路

  • Oracle -> PostgreSQL
  • Oracle -> Greenplum
  • Oracle -> TiDB
  • Oracle -> Oracle
  • Oracle -> Kudu 。

预检失败会有哪些影响?

一些小伙伴可能在创建任务的时候遇到类似如下报错信息,可能会有一些疑惑。 8f3f8d80-437a-4ce9-b3d6-450a0f5e0466-image.png 在创建任务的最后阶段我们会进行一些检测,Oracle 作为源端会存在如下一些检测项目。

物化视图模式下

  • 如果表已经创建了物化视图日志表那么预检失败。

Redo 模式下

  • 开启日志归档模式 alter database archivelog 开启过程需要数据库离线。
  • 需要开启最小补全日志 alter database add supplemental log data

总结

本文简单介绍了如何使用 CloudCanal 快速构建Oracle-> MySQL 数据迁移同步链路,更多的源端和目标端陆续开放。

MySQL 到 PolarDB-X 数据同步

· 阅读需 6 分钟
John Li
John Li
Chief Executive Officer

简述

CloudCanal 近期支持了 PolarDB-X 对端, 目前开放的链路为 MySQL 到 PolarDB-X 。

本链路特点包括

  • 完整支持结构迁移、全量迁移、增量同步、数据校验
  • 支持 PolarDB-X 云版本 API 级对接(自动获取实例、添加白名单)
  • 支持 PolarDB-X 开源自建版

PolarDB-X 前身 DRDS (内部产品名称 TDDL),经过 10 几年发展, 很好解决了 ToC 端业务对数据库超高并发、严苛事务的需求,并且近几年也努力尝试解决企业级数据需求(复杂SQL、分布式事务、在线数据的实时计算),而这样的一个产品,目前有云版本,同时近期也进行了开源。所以我们认为有必要对其生态做良好支撑。

技术点

结构迁移

PolarDB-X 是分布式数据库领域产品,所以存在 partition 概念 ,提供了两种拆分模型:sharding(即分库分表)和partitioning。 前者按用户自定义拆分,后者对应用透明。可以通过类似 create database d1 partition_mode="sharding"create database d1 partition_mode="partitioning" 指定。

sharding模式下,具体创建语句和算法可参考 官方文档

对于响应时间、RPS 要求的严苛应用场景(相对较窄),设定业务感知的分库分表算法是合理的,这也是为何很多分布式数据库在类似 sysbench 或 tpcc 或 天猫双十一 场景中达到难以置信性能的朴素原理。

CloudCanal 目前自动创建 database 为 sharding 模型,并且通过产品化方式支持这个能力-选择相应的算法和字段。

目前支持分库分表类型包括

  • DB(只分库不分表)
  • DB_TAB(既分库又分表)
  • NONE(单表)

设定分库分表字段的时候,支持的算法包括

  • HASH
  • WEEK
  • MM
  • DD
  • MMDD

一张普通的表,如果应用 DB_TAB 分库分表类型,并且选择常用的 HASH 算法,字段都选择worker_id, 经过 CloudCanal 结构迁移,会在 PolarDB-X 中生成如下表

CREATE TABLE `worker_stats` (
`id` bigint(20) NOT NULL AUTO_INCREMENT BY GROUP,
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`worker_id` bigint(20) NOT NULL,
`cpu_stat` text,
`mem_stat` text,
`disk_stat` text,
`col_new` varchar(255) NOT NULL DEFAULT '123',
PRIMARY KEY (`id`),
KEY `auto_shard_key_worker_id` USING BTREE (`worker_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 278489 DEFAULT CHARSET = utf8mb4 DEFAULT COLLATE = utf8mb4_0900_ai_ci dbpartition by hash(`worker_id`) tbpartition by hash(`worker_id`) tbpartitions 4

任务对分库分表的处理

与普通单机数据库不同, 如果需要达到 PolarDB-X 最好的写入性能, 增量同步需要处理分库分表字段,对于 update 和 delete 操作, 带上拆分字段。数据校验进行对端数据获取时,也需要带上拆分字段,并且保证数据的获取效率。

if (col.isKey() || (partitionKeys != null && partitionKeys.contains(col.getName()))) {
if (col.isUpdated()) {
// put before column in it , PK/UKs may be change.
pkCols.add(SqlUtilCommon.pickBeforeColumn(rowData, col.getName()));
} else {
pkCols.add(col);
}

if (firstPk) {
where.append("`").append(col.getName()).append("`= ?");
firstPk = false;
} else {
where.append(" AND `").append(col.getName()).append("`= ?");
}
}

另外,如同变更主键,如果变更拆分字段值, 目前 PolarDB-X 能够自行处理这种变化,无需用户自己删除老数据,插入新数据。

操作示例

前置条件:

造数据

  • 混合负载,IUD 比例 2:7:1 截屏2021-10-29 下午12.12.22.png

添加数据源

  • 登录 CloudCanal 平台
  • 数据源管理->新增数据源
  • 分别选择 自建 部署模式下的 PolarDB-XMySQL 并添加 截屏2021-10-29 下午12.03.15.png

任务创建

  • 任务管理->任务创建

  • 选择源和目标数据源 截屏2021-10-29 下午12.04.55.png

  • 选择数据同步,并勾选 全量数据初始化, 其他选项默认 截屏2021-10-29 下午12.07.05.png

  • 选择需要迁移同步的表 截屏2021-10-29 下午12.08.54.png

  • 选择列,默认全选

  • 设定分库分表算法和字段 截屏2021-10-29 下午12.11.08.png

  • 确认创建,并自动运行 截屏2021-10-29 下午12.13.18.png

校验任务

  • 停止增量负载
  • 创建校验任务
    • 目前类似任务创建开发中,暂时库表列选择和拆分选择请和同步任务保持一致
  • 校验完毕数据一致 截屏2021-10-29 下午12.14.32.png

常见问题

是否会支持 PolarDB-X 源端?

目前 PolarDB-X 版本支持 CDC , 和 MySQL binlog 交互很相似(外在表现出些许差别),我们将在不久推出 PolarDB-X 源端,目标端仍首选 MySQL (让业务有去有回)。

是否会支持更多源端?

CloudCanal 新增数据源之后,后续相互打通相对简单,目前 Oracle 是首选源端。之后 PostgreSQL、SqlServer(开发中)都是候选。

目前这条链路还存在什么不足?

功能层面目前自动创建数据库未支持 PolarDB-X 的 partitioning 模式,另外已存在表,无法感知对端分库分表字段(急需修复)。性能层面则需要更多调优。

总结

本文简单介绍了使用 CloudCanal 进行 MySQL 到 PolarDB-X 的数据迁移同步,帮助用户快速构建一条验证分布式数据库的数据链路。

MySQL 到 TiDB 数据同步

· 阅读需 5 分钟
John Li
John Li
Chief Executive Officer

简述

TiDB 是国内非常火热的一款分布式数据库,参考 Google Percolator 和 Spanner 模型进行构建,具备很好的扩展性,并且支持强一致事务和一定的计算能力,应用广泛。

CloudCanal 提供了从传统关系型数据库实时同步到 TiDB 的能力,并且附带数据迁移数据校验数据订正等能力。此文章简要介绍如何快速构建一条长期稳定运行的 MySQL->TiDB 数据链路。

技术要点

MySQL 协议兼容性

TiDB 对于 MySQL 协议兼容做得不错(4.X 版本),但是其中也有瑕疵点,对于数据迁移同步来说,关注 2 个方面的信息:数据类型DDL 支持层面

数据类型 ,层面主要是要考虑 TiDB 对于 MySQL 数据类型的兼容性,基于MySQL 8.0 官方文档中类型的定义在 TiDB 5.1 版本上

  • 支持的类型如下: a6bd4083-7894-44a1-a0c5-63aeabb532ef-image.png

DDL 支持层面,比较明显的是不支持同时做多次 DDL action , 比如: alter table add column col1 varchar(255), add col2 bigint(20) not null, modify col3 datetime 。而往往这种 SQL 在数据库运维中非常常见(提高 DDL 效率)

针对差异点,CloudCanal 都做了兼容,特别是 DDL 同步的兼容,加上库、表、列映射,存在一定的工作量。

举个 "栗子"

准备 CloudCanal

添加数据源

  • 登录 CloudCanal 平台

  • 数据源管理 -> 添加数据源

  • 选择 自建数据源 ,并填写相关数据库信息,其中 网络地址 请按提示带上端口号 ba3134f2-82ad-4867-bec1-a833b0f7653a-image.png

  • 如下已添加完 MySQL 和 TiDB 5bb89e47-2db9-4bd9-8a1c-898a8fd69806-image.png

创建同步任务

  • 任务管理->新建任务 3e504a08-f52d-4e69-ac89-220e35266094-image.png

  • 源端选择刚添加的 MySQL 数据源,目标选择 TiDB, 分别点击 测试连接 按钮以测试数据库连通性和获取 schema 级别元信息

  • 选择源端和目标端 schema , 可以选取多个

  • 点击下一步 8d8114fc-1e8f-4b7c-9750-506b28ff17ce-image.png

  • 选择 数据同步,并且勾选全量数据初始化

  • 规格可以根据任务重要度以及部署机器的内存容量合理选择,一般 2GB 内存规格即可

  • 勾选 DDL 同步, CloudCanal 将同步常用的 create table /alter table/rename table DDL ,但是不同步其他 DDL

  • 勾选 开启xx校验 , 则自动为同步任务创建一个子任务,在同步 catch up 后 ,自动运行数据校验。当然也可以单独创建数据校验任务

  • 点击下一步 26c40f3a-20c1-4497-9570-dab4d3e51913-image.png

  • 勾选需要同步的表,如果目标表为橙色,表示不存在同名表,任务创建完成后自动进行结构迁移。也可以下拉框选择表进行映射

  • 勾选需要同步的 INSERT/UPDATE/DELETE 操作,默认全选

  • 点击下一步 6e23a877-fa8c-4f59-8c29-7d4b108e6533-image.png

  • 通过勾选做列映射列裁剪

  • 点击下一步 9c31a51b-56f9-4f93-be6f-8b487c3fd40c-image.png

  • 对任务内容进行创建 ,如果任务不需要立刻运行 , 可置灰自动启动任务 按钮

  • 点击确认创建 6d891550-d1ca-4407-b4ec-99879746b410-image.png

任务同步

  • 任务分为 3 个阶段:结构迁移数据初始化数据同步,每一个阶段完成时,状态自动流转,直到同步稳态 e9c4abe2-32e4-4401-a36b-867470502693-image.png

    ef85e50d-6303-4a49-ab37-e99d5ad95cc2-image.png

    • 结构迁移:当对端数据源不存在对应的库表结构时自动创建,包括RDB库表、消息 topic、搜索引擎 index 等
    • 数据初始化:将源端所选库表数据以全量迁移方式搬迁到对端
    • 数据同步:准实时的同步增量数据,即源端数据库上发生的增、删、改操作将以亚秒级延迟出现在对端数据源上

FAQ

目前源端还支持哪些数据源?

除了 MySQL 到 TiDB 之外,截止社区版 1.0.2 版本,还支持 Oracle -> TiDB 链路,更多的链路如果有需求,可以按需添加,请到我们需求贴反馈

总结

本文简单介绍了如何使用 CloudCanal 快速构建 MySQL->TiDB 数据迁移同步链路,更多的源端和目标端陆续开放。

MySQL 到 Elasticsearch 宽表构建

· 阅读需 7 分钟
John Li
John Li
Chief Executive Officer

简述

CloudCanal 2.0.X 版本近期支持了宽表构建能力,在数据预处理领域向前走了一步。

方案特点

  • 相对灵活,对业务数据和结构贴合性好
  • 能很好的支持事实表与维表打宽表需求

本文以 MySQL 到 Elasticsearch 单事实表双维表为案例,介绍 CloudCanal 宽表构建和同步的操作步骤。

技术点

打宽表的必要性

关系型数据库为了应对在线业务对于并发、毫秒级响应,同时操作相对趋向 kv 化,一般基于关系范式进行设计,通过外键或约定外键(非物理约束)进行关联。

当业务需求涉及到多张关联表(业务运营、报表、BI 需求),查询表的先后顺序成为操作响应时间的关键要素,但排列组合式种类随关联表数量(n)增加会膨胀非常快(n!),导致查询效率低下。

关联表数量排列种类
22
36
424
5120
6720
75040
840320
9362880
103628800
1139916800

数据库或者数仓做 join 查询(特别是5张表以上),最难的事情变成了如何从这么多可能性中(搜索空间)找到最好的那一个。

数据迁移和同步 以相对比较小的代价,将多张关联表进查询库或者数据仓库之前,通过 反查 或者 窗口等待变更数据做关联,打成一张宽表写入对端,显著减轻后续查询对于 SQL 优化器的要求。

宽表的种类和方式

这里的宽表种类指其数据来源表种类(常见但不全面),常见的我们分 事实表维度表,比如订单表被定义成事实表,用户表被定义成维度表,商品表被定义成维度表。

一般事实表和维度表数据具备类似 n:1 的关系,也就是 1 个用户会有 n 个订单 (1个订单属于1个用户),1 个商品也会存在 n 个订单 (1个订单会关联 1 个或有限个数商品)。

打宽表的方式有多种,根据场景,最常见的包含以下两种

  • 多事实表 打宽表
    • 一般场景是在有限的时间内,关联的变更在这些表上发生(多流join),打宽表工具只要在一定的时间范围内等到这些数据(window),即可打成宽表数据。
  • 单事实表和多维度表打宽表
    • 一般场景是事实表变更,但是维度表没有任何变化,这时打宽表工具需要通过事实表变更数据反查维度表数据,打成宽表。

CloudCanal 目前打宽表的方式主要通过反查实现,对于多流 join , 实际上也可以通过反查实现,只是可能缺乏数据一致性。

操作示例

前置条件:

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备好 MySQL 数据库(本例使用 5.7 版本)和 Elasticsearch 数据库(本例使用 6.x 版本)
  • MySQL 上创建 1 张事实表(order)和 2 张维表 (user 、product)
  • Elasticsearch 上创建 1 个索引 order , 并额外包含两张维表相关数据
    • user_id (关联user.id), user_name(对应user.name)
    • product_id(关联product.id) ,product_name(对应product.name),product_price (对应product.price)

开发宽表代码

打包

  • 进入工程目录,使用命令进行打包
    % pwd
    /Users/zylicfc/source/product/cloudcanal/cloudcanal-data-process
    % mvn -Dtest -DfailIfNoTests=false -Dmaven.javadoc.skip=true -Dmaven.compile.fork=true clean package

自定义代码包

  • 打包命令后,代码包位于工程目录下的 wide-table/target 目录 截屏2021-12-10 下午12.30.39.png

添加数据源

  • 登录 CloudCanal 平台
  • 数据源管理->新增数据源
  • MySQLElasticsearch 分别添加 数据源.png

任务创建

  • 任务管理->任务创建
  • 选择 目标 数据源
  • 选择 数据同步,并勾选 全量数据初始化, 其他选项默认
  • 选择需要迁移同步的表, 此处只要选择事实表即可,维表会通过自定义代码反查补充 截屏2021-12-10 下午12.35.53.png
  • 选择列,默认全选,选择上传代码包(路径如上所示) 截屏2021-12-10 下午12.37.18.png
  • 确认创建,并自动运行 截屏2021-12-10 下午12.38.33.png

校验数据

  • 变更事实表数据 截屏2021-12-10 下午12.40.05.png 截屏2021-12-10 下午12.40.30.png
  • 变更维表数据 截屏2021-12-10 下午12.41.19.png 截屏2021-12-10 下午12.41.41.png

数据变化规律

  • 事实表插入,更新都会反查维表最新数据并写入对端
  • 维表更新,基础模式中,需要触发事实表更新才能带上最新的维表变更数据写入对端
  • 维表数据删除,基础模式中,如果触发事实表更新,默认将会把对应的维表数据(已删除)置为null, 但是根据对端数据源不同,效果可能会有所差别(比如不会置空)

常见问题

维表变化后怎么办?

目前我们提供的基础模式,维表变化不会直接触发事实表更新,因为这个基本上意味着大规模对端数据变更,可能影响对端数据服务稳定性。所以源端触发事实表更新(比如变更一个时间字段),带上最新的维表数据进行对端数据刷新。

另外对于维表数据的删除,如果触发事实表更新从而刷新对端数据,则默认置为null。

不会开发 java 代码怎么办?

如果能打包不会 java 开发,在 cloudcanal-data-process 寻找相应模版,修改配置即可。

如果不能打包也不会开发,找 CloudCanal 同学协助。

如果遇到出错或者问题怎么办?

如果会 java 开发,建议打开任务的 printCustomCodeDebugLog 观察输出的数据是否符合预期,如果不符合预期,可以打开任务的 debugMode 参数,对数据转换逻辑进行调试。

如果不会 java 开发, 找 CloudCanal 同学协助。

还支持其他数据源么?

这个是 CloudCanal 通用能力,只要源和目标之间实现了全量迁移和增量同步,即支持。

总结

本文简单介绍了如何使用 CloudCanal 进行 MySQL -> Elasticsearch 的宽表构建,帮助业务快速构建数据搜索服务。

MySQL 到 Elasticsearch 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

本文介绍如何通过 CloudCanal ,五分钟内创建一条长期稳定运行的 MySQL -> Elasticsearch (以下简称 ES) 实时数据迁移同步链路 。

技术内幕

限流

MySQL 到 ES 数据迁移同步过程中,往往会面临源端写入对端 RPS 较大问题,导致 ES 负载较大,影响业务对 ES 的正常读写。CloudCanal 为了应对这个情况,提供限流能力。同步任务创建完毕后,可在 任务详情 -> 参数设置 对源端流量进行限流。 8ab5e2b2-3a48-4042-b53b-1e469734b157-image.png

时区处理

CloudCanal 允许用户在创建数据迁移同步任务时指定时区。写入ES 时,源端时间类型数据将会格式化并带上时区信息 , 支持用户在跨国、跨地域场景下使用。

自动创建索引和 Mapping 结构

CloudCanal 迁移同步任务支持自动将源端数据库表结构映射成 ES 索引,该过程中允许用户在 列(column/field) 级别上,个性化设置自己需要的索引和 Mapping 结构。这些设置包括:

  • 每个列可以指定是否需要索引
  • 可以对 text 类型的 field 设置 ES mapping 中的分词器(标准分词器)
  • 索引分片数、副本数自定义设置

映射已建索引

用户可能已经在 ES 中提前建好了索引,这种情况下 CloudCanal 会自动探测,并允许用户配置映射,一张表可映射对端一个索引。

类型支持

不支持父子文档join,支持NESTED/OBJECT。NESTED有更好的索引性能,建议用NESTED,避免ES侧join

支持用户自定义分词器

映射用户已经创建的索引时候,支持自定义分词器

内置 _id 生成和 routing field 指定

写入 ES 时候 _id 用于唯一标识一个 doc。CloudCanal 数据同步默认遵循以下原则:

  • routing 使用 _id 值
  • 单主键表,会默认使用源端关系表的主键列的列值作为 _id 的值
  • 多主键表,会通过分隔符$连接多个主键列的值,组成唯一的 _id 值
  • 无主键表,会将所有列的值通过$连接,生成唯一的 _id 值

举个"栗子"

准备 CloudCanal

添加数据源

  • CloudCanal 支持 6.8 及以上版本 ES,我们点击 数据源管理->新增数据源 添加 ES 数据源 1048936c-46bb-4495-b33f-6bbdece681ba-image.png

  • 填写必要 host 信息后点击 新增数据源 2c28c66a-6392-4994-a98d-e4dd3a672c98-image.png

创建任务

  • 点击任务管理,选择创建任务 8637011b-9477-45d1-b9c5-f2e166bed7d0-image.png

数据源设置

  • 勾选源端和目标端数据库,并且选择相应的数据库 deb89653-e04d-430a-b6c6-1a27d0d785df-image.png

功能配置

  • 选择数据同步,并勾选数据初始化(带全量迁移) 450e225c-db1b-477f-8a81-bd5c5e8ea87e-image.png

表&action过滤

  • 此处可以进行的操作主要是:

    • 勾选需要订阅的表
    • 选择需要映射的索引(支持映射已经存在的索引)
    • 勾选 IUD 过滤
    • 批量设置分片数
    信息

    CloudCanal 的结构迁移支持自动帮用户按照源端表结构创建索引。

    cbd229cc-2803-482a-bddc-26048f392571-image.png

数据处理

  • 本页面提供的主要能力有:

    • 列裁剪设置(包括批量筛选和设置)
    • 设置源端where过滤条件
    • 索引设置
    • 分词器设置
    • 列映射(如果同步的是已经存在的索引,支持列映射)

    c250f412-12c1-45bd-b1a2-bbf77a0b152c-image.png

创建确认

  • 最后一步,确认创建内容无误后点击确认创建。 e308cb9d-0016-41d1-982a-84f742b4a12f-image.png

查看任务状态

  • 回到 CloudCanal 控制台,刷新并查看任务实时状态,从结构迁移、数据初始化,到数据同步。 253e90e4-6d8e-41c4-b421-eb95a0f73d9f-image.png

  • 登录 ES Kibana 控制台,查看迁移同步过去的结构和数据。 9ddba516-afa8-4d9b-aeec-f64a1485f3ef-image.png

总结

本文简单介绍了如何使用 CloudCanal 快速构建 MySQL->Elasticsearch 数据迁移同步链路,更多的源端和目标端陆续开放。

MySQL 到 ClickHouse 宽表构建

· 阅读需 6 分钟
Barry
Chief Technology Officer

简述

本文简要介绍 CloudCanal 如何支持 MySQL -> ClickHouse 的宽表构建。

技术点

ClickHouse 表关联之觞

ClickHouse 作为标准的列存数据库,其特点相当鲜明,对于多维度数据聚合、筛选特别高效,对于列存面向计算的特点,用得相当不错,包括但不限于以下特点

  • io 效率高
  • 列压缩
  • 少数列数据存取io放大效应较小
  • 极致计算优化
    • 向量化
    • 利用 SSE 等 SIMD 指令集加速
    • 未来可选 AVX 512 等指令集优化
    • 未来对于计算卸载到 FPGA、GPU 较便利

但是 ClickHouse 对于数据关联(join), 相比于其 多维聚合筛选 能力要弱一些。对于这个问题,我们觉得有必要通过 CloudCanal 的宽表能力,让其适用性得到进一步提升。大宽表 + 突出的数据 多维聚合筛选 能力,几乎等于交互式分析的杀手锏。

操作示例

前置条件:

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档

  • 准备好 MySQL 数据库(本例使用 5.7 版本)和 ClickHouse 数据库(本例使用 21.8.X 版本)

  • MySQL 上创建 1 张事实表(my_order)和 2 张维表 (user 、product)

     CREATE TABLE `my_order` (
    `id` bigint(19) NOT NULL AUTO_INCREMENT,
    `gmt_create` datetime NOT NULL,
    `gmt_modified` datetime NOT NULL,
    `product_id` bigint(20) NOT NULL,
    `user_id` bigint(20) NOT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1460 DEFAULT CHARSET=utf8;

    CREATE TABLE `product` (
    `id` bigint(19) NOT NULL AUTO_INCREMENT,
    `gmt_create` datetime NOT NULL,
    `gmt_modified` datetime NOT NULL,
    `name` varchar(255) NOT NULL,
    `price` decimal(20,2) NOT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2719 DEFAULT CHARSET=utf8;

    CREATE TABLE `user` (
    `id` bigint(19) NOT NULL AUTO_INCREMENT,
    `gmt_create` datetime NOT NULL,
    `gmt_modified` datetime NOT NULL,
    `name` varchar(255) NOT NULL,
    `level` varchar(255) NOT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2224 DEFAULT CHARSET=utf8
  • ClickHouse 上创建 1 张宽表 my_order , 并额外包含两张维表相关数据

    • user_id (关联user.id), user_name(对应user.name)
    • product_id(关联product.id) ,product_name(对应product.name),product_price (对应product.price)
    CREATE TABLE trade.my_order
    (
    `id` Int64,
    `gmt_create` DateTime,
    `gmt_modified` DateTime,
    `product_id` Int64,
    `user_id` Int64,
    `user_name` Nullable(String),
    `product_name` Nullable(String),
    `product_price` Nullable(Decimal(20, 2))
    )
    ENGINE = ReplacingMergeTree
    ORDER BY id
    SETTINGS index_granularity = 8192

开发宽表代码

打包

  • 进入工程目录,使用命令进行打包
    % pwd
    /Users/zylicfc/source/product/cloudcanal/cloudcanal-data-process
    % mvn -Dtest -DfailIfNoTests=false -Dmaven.javadoc.skip=true -Dmaven.compile.fork=true clean package

自定义代码包

  • 打包命令后,代码包位于工程目录下的 wide-table/target 目录 截屏2021-12-10 下午12.30.39.png

添加数据源

  • 登录 CloudCanal 平台
  • 数据源管理->新增数据源
  • MySQLClickHouse 分别添加 截屏2021-12-16 下午6.33.14.png

任务创建

  • 任务管理->任务创建
  • 选择 目标 数据源
  • 选择 数据同步,并勾选 全量数据初始化, 其他选项默认
  • 选择需要迁移同步的表, 此处只要选择事实表即可,维表会通过自定义代码反查补充 截屏2021-12-16 下午7.07.18.png
  • 选择列,默认全选,选择上传代码包(路径如上所示) 截屏2021-12-16 下午7.07.45.png
  • 确认创建,并自动运行 截屏2021-12-16 下午7.12.16.png

校验数据

  • 变更事实表数据 截屏2021-12-16 下午7.08.18.png 截屏2021-12-16 下午7.08.50.png
  • 变更维表数据 截屏2021-12-16 下午7.09.17.png 截屏2021-12-16 下午7.09.42.png

数据变化规律

  • 事实表插入,更新都会反查维表最新数据并写入对端
  • 维表更新,需要触发事实表更新才能带上最新的维表变更数据写入对端
  • 维表数据删除,如果触发事实表更新,默认将会把对应的维表数据(已删除)置为null

常见问题

维表变化后怎么办?

维表变化不会直接触发事实表更新。需要源端触发事实表更新(比如变更一个时间字段),带上最新的维表数据进行对端数据刷新。

另外对于维表数据的删除,如果触发事实表更新从而刷新对端数据,则默认置为null。

不会开发 java 代码怎么办?

如果能打包不会 java 开发,在 cloudcanal-data-process 寻找相应模版,修改配置即可。

如果不能打包也不会开发,找 CloudCanal 同学协助。

如果遇到出错或者问题怎么办?

如果会 java 开发,建议打开任务的 printCustomCodeDebugLog 观察输出的数据是否符合预期,如果不符合预期,可以打开任务的 debugMode 参数,对数据转换逻辑进行调试。

如果不会 java 开发, 找 CloudCanal 同学协助。

还支持其他数据源么?

这个是 CloudCanal 通用能力,只要源和目标之间实现了全量迁移和增量同步,即支持。

总结

本文简单介绍了如何使用 CloudCanal 进行 MySQL -> ClickHouse 的宽表构建,以最常见的单事实表多维表方式举例。各位读者朋友,如果你觉得还不错,请点赞、评论加转发吧。

MySQL 到 MySQL 数据迁移同步

· 阅读需 6 分钟
John Li
John Li
Chief Executive Officer

简述

MySQL 到 MySQL 在线数据迁移同步不是一个新鲜话题了,但是面对数据源异构、高度产品化创建、并且稳定运行于在线严苛场景,需要做的工作会比一个单纯工具或者脚本多得多。

本篇文章仅从功能角度介绍 CloudCanal 如何快速创建并运行此种数据链路。

技术点

"异构"和面临的问题

数据库异构主要体现在 数据库差异业务表结构约束差异业务数据差异 三个方面。后两者属于传统 ETL Transform 阶段要处理的内容,相对弹性。而数据库本身差异则是刚性需求,对产品架构和核心数据结构有较高要求,分解下原因,包括 增量数据获取方式多样SQL 方言差异约束差异元数据差异部署形态(如分布式)等。

增量数据获取方式

如 ORACLE 的 redo 日志、物化视图,MySQL binlog 、Postgres WAL、MongoDB oplog等,相差较大,系统级设计和抽象,也是一个漫长的过程。

SQL 方言差异

包含如数据分页差异、写入冲突处理、大小写处理、表结构差异等。

约束差异

如索引 owner 归属偏差、partition key/sharding key 和 PK/UK 关系差异等。上述两者,对于数据迁移同步影响较小,但是 DDL 同步、结构迁移存在巨大挑战,在库、表、列映射、裁剪,数据自定义处理等情况下,更加复杂。

元数据差异

包括数据类型转换、层次对应关系、partition key / sharding key 等特定结构支持、分布式数据库分片差异、缓存/消息等schemaless系统的挑战,让数据互融互通工作量大增。

但是今天,我们并不展开介绍 数据库类型不同 的数据迁移同步功能,而是介绍 MySQL 到 MySQL 在线数据迁移同步所面临的异构问题,更多是业务需求所产生的 表结构不一致数据差异,其中包括库、表、列映射、裁剪 和 数据过滤。希望此篇文章能够让你在 5 分钟内搞定这个事情(环境 ready 的情况下)。

举个"栗子"

准备 CloudCanal

数据库准备

  • 创建 3个 MySQL 数据库,源实例上的库叫 drds_1drds_2 ,目标实例上有 drds_merge 库,其中源库里面有若干张表,并且存在一些测试数据,有些表正常迁移(worker_stats表), 有些表需要汇聚(shard_x表),有些表需要结构迁移(kbs_question),有些表有映射(shard_x表),有些表不同步(kbs_article表),有些表字段需要映射和裁剪(data_job)。接下来我们花 5 分钟时间来搞定这个事情,并且做一次数据校验。
信息

订阅源端MySQL增量和全量,需要提前授予权限:select, replication_client, replication_slave

造些数据

  • 启动源端造增量数据程序,IUD 比例 30:50:20

    开始造数据.png

创建任务

  • 选择源数据库和目标数据库,做好库映射。操作完毕点击下一步。

    创建任务第一步.png

  • 选择数据同步,并默认勾选数据初始化、DDL 同步,此处暂不勾选做一次性数据校验。在任务增量追上后,我们停掉造数据程序,创建逻辑一致的校验任务,那么结果更加清晰。

    截屏2021-07-31 下午2.27.52.png

  • 选择表,并做好表映射,以及去除不想迁移同步的表。操作完毕点击下一步。

    数据同步表选择.png

  • 选择列,并做好列映射、裁剪掉某些不需要表的列。

    列选择.png

  • 批量设置唯一键为主键。

    截屏2021-07-31 下午1.54.01.png

  • 设置数据过滤,如图中 id < 3000。操作完毕点击下一步。

    截屏2021-07-31 下午1.53.11.png

  • 确认创建任务。确认完毕点击创建任务。

    数据同步总结.png

任务运行

  • 如果选择的库、表、列在对端不存在,则进行结构迁移

  • 结构迁移完成后自动进行数据初始化,映射、裁剪、数据过滤按照设定运行。目前 CloudCanal 数据初始化都是逻辑初始化,速度可能没有某些物理初始化快,但是灵活度高。

  • 数据初始化完毕,开始进行增量同步。增量同步会自动从数据初始化位点拉取 MySQL binlog 进行回放,同样会执行映射、裁剪、以及数据过滤等操作。

    任务追平前夕.png

数据校验

  • 停止造数据进程,并且在 CloudCanal 上按同步任务相同逻辑创建一个校验任务。(暂未推出创建相似任务)

  • 校验任务跑完,获取差异数据,并抽样对比,数据一致。校验的差异主要是数据多次更新、同步延迟等造成,一般非大规模不一致或丢失,则数据正常。

  • 数据差异数据目前需要到任务日志目录获取,暂时未提供日志下载能力。

    校验结果.png

总结

此篇文章主要从功能角度简单介绍 CloudCanal 支持 MySQL 到 MySQL 在线数据迁移同步,包含库、表、列白名单、映射、裁剪,分表汇聚,数据过滤等主要能力,对于在线大部分场景,能够较好满足,但是在严苛的核心场景上,监控告警、容灾、特殊数据处理、抗峰值、问题快速排查等都是不可缺少的。