🎉 CloudCanal 上线 V6.1.0.0:KingbaseES 分区表迁移性能大幅提升
跳到主要内容

PolarDB for MySQL 到 Kafka

CloudCanal 支持从 PolarDB for MySQL 到 Kafka 的数据迁移、同步、校验和链路能力。

选择对端数据库:

数据链路

基本功能

功能说明
全量数据迁移

逻辑迁移,通过顺序扫描表数据,将数据分批写入到消息中间件

增量实时同步

支持 INSERTUPDATEDELETE 常见 DML 同步

修改订阅

新增、删除、修改订阅表,支持历史数据迁移,文档:修改订阅

重置位点

支持按照文件位点、时间戳 回溯位点,重新消费过去一段时间或指定 Binlog 文件和位点开始的增量日志

元数据检索

从源端表查对端,查询设置过过滤条件的,查询设置过对端主键的

高级功能

功能说明
消息格式

支持以下消息格式,文档:消息格式说明

  • CloudCanal内置格式
  • AlibabaCanal兼容格式
Topic 映射规则

默认按 . 拼接源端 实例id 形成对端 topic 进行匹配或待创建(如 pc-vgpq6q097174t6t.dingtax.app_key),额外支持按 源端一致转小写转大写 映射

表级别 Topic

最小按照源端表级别设置对应的 Topic, 支持自动获取表分区

DDL 专用 Topic

支持指定 Topic 发送 DDL, 如未指定,则放置 DDL 时间在对应表 Topic 分区 0 中

兼容 Online DDL

兼容 GH-OST, PT-OSC, Aliyun DMS Online DDL, 变更完毕之后,同步具备最新结构的消息(如新增、修改后的列,去掉删除掉的列)

定时全量迁移

文档1:创建定时全量任务
文档2:定时全量实现增量数据迁移

自定义代码

文档1:创建自定义代码任务
文档2:自定义代码任务 debug
文档3:在自定义代码中打日志

数据过滤条件

支持 WHERE 条件进行数据过滤,内容为 SQL 92 子集,文档:创建数据过滤任务

设置目标主键

支持变更主键为其他字段

使用示例

标题详情
跨互联网数据互通 (Kafka)

文档:跨互联网数据互通 (Kafka)

Kafka 数据中转校验

文档:Kafka 数据中转校验


源端数据源

前置条件

条件说明
账号权限

云数据库读写账号或高权限账号

开启 Binlog

云数据实例详情 > 配置与管理 > 参数配置 > 设置 loose_polar_log_bin 为 true

PolarDbMySQL 字符集

支持 utf8, utf8mb4, latin1, 其他编码暂未测试

任务参数

参数名称说明
parseBinlogParallel

增量解析 Binlog 的并发数

parseBinlogBufferSize

用于增量解析 Binlog 的环形队列大小

maxTransactionSize

单事务最大数据条数,超过则分段刷出

limitThroughputMb

限制增量 Binlog 流量

extraDDL

兼容额外的 DDL 同步,包括 PT, GHOST, ALI_DMS, PT_GHOST

needJsonEscape

将 json 中特殊字符进行转义,以写入到对端

Tips: 通用参数配置请参考 通用参数及功能


目标端数据源

前置条件

条件说明
网络准备

迁移同步节点(sidecar)可连接 Kafka 各节点

任务参数

参数名称说明
schemaFormat

消息格式,文档:消息格式说明

batchWriteSize

单条消息最大数据条数,超过则拆分消息

defaultTopic

无法找到对应 Topic 的消息则发送到此 Topic (如新增表)

ddlTopic

专门发送 DDL 的 Topic, 为空则发送到对应 Topic 的第 0 个分区

compressionType

Kafka compression.type 参数, 设置压缩算法, 支持 GZIP, SNAPPY, LZ4, ZSTD 算法

batchSize

Kafka batch.size 参数

acks

Kafka acks 参数, 默认 all

maxRequestBytes

Kafka max.request.size 参数

lingerMs

Kafka linger.ms 参数, 默认 1

envelopSchemaInclude

当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息

customClientProps

自定义传入到 Kafka Client 参数,JSON 格式,key为参数名,value为参数值。此配置项以最高优先级生效。例如:AWS IAM 访问控制

Tips: 通用参数配置请参考 通用参数及功能

联系我们
微信二维码

扫码添加微信,获取技术支持