MariaDB 到 Pulsar
CloudCanal 支持从 MariaDB 到 Pulsar 的数据迁移、同步、校验和链路能力。
| 功能 | 说明 |
|---|---|
结构迁移 | 如目标端不存在指定映射规则后的 Topic,则自动进行 Topic 创建,并支持设置分区数 |
全量数据迁移 | 逻辑迁移,通过顺序扫描表数据,将数据分批写入到消息中间件 |
增量实时同步 | 支持 INSERT、UPDATE、DELETE 常见 DML 同步 |
修改订阅 | 新增、删除、修改订阅表,支持历史数据迁移,文档:修改订阅 |
重置位点 | 支持按照文件位点、时间戳 回溯位点,重新消费过去一段时间或指定 Binlog 文件和位点开始的增量日志 |
元数据检索 | 从源端表查对端,查询设置过过滤条件的,查询设置过对端主键的 |
高级功能
| 功能 | 说明 |
|---|---|
消息格式 | 支持以下消息格式,文档:消息格式说明
|
Topic 映射规则 | 默认按 和源端一致 进行映射,额外支持按 转小写、转大写、以'_数字'后缀截取、按 SCHEMA_TABLE 拼接(元数据镜像)、按 SCHEMA_TABLE 拼接(元数据转大写)、按 SCHEMA_TABLE 拼接(元数据转小写) 映射 |
表级别 Topic | 最小按照源端表级别设置对应的 Topic,支持自动获取表分区 |
定时全量迁移 | 文档1:创建定时全量任务 |
自定义代码 | 文档1:创建自定义代码任务 |
数据过滤条件 | 支持 WHERE 条件进行数据过滤,内容为 SQL 92 子集,文档:创建数据过滤任务 |
设置目标主键 | 支持变更主键为其他字段 |
限制和注意点
| 限制项 | 说明 |
|---|---|
MySQL 存储引擎 | 支持 InnoDB, MySIAM, 阿里云 XEngine, 其他存储引擎暂未测试 |
MySQL 字符集 | 支持 utf8, utf8mb4, latin1, 其他编码暂未测试 |
前置条件
| 条件 | 说明 |
|---|---|
账号权限 | |
开启 Binlog | [mysqld] |
任务参数
| 参数名称 | 说明 |
|---|---|
parseBinlogParallel | 增量解析 Binlog 的并发数 |
parseBinlogBufferSize | 用于增量解析 Binlog 的环形队列大小 |
maxTransactionSize | 单事务最大数据条数,超过则分段刷出 |
limitThroughputMb | 限制增量 Binlog 流量 |
extraDDL | 兼容额外的 DDL 同步,包括 PT, GHOST, ALI_DMS, PT_GHOST |
fullDataSqlConditionEnabled | 将过滤条件拼 入 SQL 中进行源端数据扫描,此参数只针对全量迁移有效 |
srcTimeZone | 源端时区,例如 +08:00, Asia/Shanghai, America/New_York 等 |
Tips: 通用参数配置请参考 通用参数及功能
任务参数
| 参数名称 | 说明 |
|---|---|
schemaFormat | 消息格式,文档:消息格式说明 |
batchWriteSize | 单条消息最大数据条数,超过则拆分消息 |
enableBatching | Pulsar 是否启用批量发送 |
batchingMaxBytes | Pulsar 批量发送最大字节数,单位为字节 |
connectionTimeoutMs | Pulsar Client 连接超时时间,单位为毫秒 |
compressionType | 设置 Pulsar 消息压缩算法,支持 LZ4, ZLIB, ZSTD, SNAPPY 算法 |
envelopSchemaInclude | 当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息 |
Tips: 通用参数配置请参考 通用参数及功能
