MySQL 到 Kafka
CloudCanal 支持从 MySQL 到 Kafka 的数据迁移、同步、校验和链路能力。
| 功能 | 说明 |
|---|---|
结构迁移 | 如目标端不存在指定映射规则后的 Topic , 则自动进行 Topic 创建,并支持设置分区数 |
全量数据迁移 | 逻辑迁移,通过顺序扫描表数据,将数据分批写入到消息中间件 |
增量实时同步 | 支持 INSERT、UPDATE、DELETE 常见 DML 同步 |
修改订阅 | 新增、删除、修改订阅表,支持历史数据迁移,文档:修改订阅 |
重置位点 | 支持按照文件位点、时间戳 回溯位点,重新消费过去一段时间或指定 Binlog 文件和位点开始的增量日志 |
元数据检索 | 从源端表查对端,查询设置过过滤条件的,查询设置过对端主键的 |
高级功能
| 功能 | 说明 |
|---|---|
消息格式 | 支持以下消息格式,文档:消息格式说明
|
Topic 映射规则 | 默认按 . 拼接源端 实例id、库、表 形成对端 topic 进行匹配或待创建(如 my-vgpq6q097174t6t.dingtax.app_key),额外支持按 源端一致、转小写、转大写 映射 |
表级别 Topic | 最小按照源端表级别设置对应的 Topic, 支持自动获取表分区 |
DDL 专用 Topic | 支持指定 Topic 发送 DDL, 如未指定,则放置 DDL 时间在对应表 Topic 分区 0 中 |
定时全量迁移 | 文档1:创建定时全量任务 |
自定义代码 | 文档1:创建自定义代码任务 |
数据过滤条件 | 支持 WHERE 条件进行数据过滤,内容为 SQL 92 子集,文档:创建数据过滤任务 |
设置目标主键 | 支持变更主键为其他字段 |
限制和注意点
| 限制项 | 说明 |
|---|---|
MySQL 存储引擎 | 支持 InnoDB, MySIAM, 阿里云 XEngine, 其他存储引擎暂未测试 |
MySQL 字符集 | 支持 utf8, utf8mb4, latin1, 其他编码暂未测试 |
使用示例
| 标题 | 详情 |
|---|---|
跨互联网数据互通 (Kafka) | |
Kafka 数据中转校验 | 文档:Kafka 数据中转校验 |
MySQL 到 Kafka / AutoMQ 数据迁移同步 |
前置条件
| 条件 | 说明 |
|---|---|
账号权限 | |
开启 Binlog | [mysqld] |
任务参数
| 参数名称 | 说明 |
|---|---|
parseBinlogParallel | 增量解析 Binlog 的并发数 |
parseBinlogBufferSize | 用于增量解析 Binlog 的环形队列大小 |
maxTransactionSize | 单事务最大数据条数,超过则分段刷出 |
limitThroughputMb | 限制增量 Binlog 流量 |
extraDDL | 兼容额外的 DDL 同步,包括 PT, GHOST, ALI_DMS, PT_GHOST |
fullDataSqlConditionEnabled | 将过滤条件拼入 SQL 中进行源端数据扫描,此参数只针对全量迁移有效 |
srcTimeZone | 源端时区,例如 +08:00, Asia/Shanghai, America/New_York 等 |
Tips: 通用参数配置请参考 通用参数及功能
任务参数
| 参数名称 | 说明 |
|---|---|
schemaFormat | 消息格式,文档:消息格式说明 |
batchWriteSize | 单条消息最大数据条数,超过则拆分消息 |
defaultTopic | 无法找到对应 Topic 的消息则发送到此 Topic (如新增表) |
ddlTopic | 专门发送 DDL 的 Topic, 为空则发送到对应 Topic 的第 0 个分区 |
compressionType | Kafka compression.type 参数, 设置压缩算法, 支持 GZIP, SNAPPY, LZ4, ZSTD 算法 |
batchSize | Kafka batch.size 参数 |
acks | Kafka acks 参数, 默认 all |
maxRequestBytes | Kafka max.request.size 参数 |
lingerMs | Kafka linger.ms 参数, 默认 1 |
envelopSchemaInclude | 当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息 |
customClientProps | 自定义传入到 Kafka Client 参数,JSON 格式,key为参数名,value为参数值。此配置项以最高优先级生效。例如:AWS IAM 访问控制 |
Tips: 通用参数配置请参考 通用参数及功能
