AutoMQ 到 Doris
CloudCanal 支持从 AutoMQ 到 Doris 的数据迁移、同步、校验和链路能力。
| 功能 | 说明 |
|---|---|
增量实时同步 | 支持订阅源端 Topic 的消息,并转换为 INSERT、UPDATE、DELETE DML |
修改订阅 | 新增、删除、修改订阅 Topic,文档:修改订阅 |
重置位点 | 按 时间戳 回溯位点,重新消费过去一段时间的数据 |
高级功能
| 功能 | 说明 |
|---|---|
消息格式 | 支持以下消息格式,文档:消息格式说明
|
全量前清空目标数据 | 运行全量任务前清除老数据,包括重跑任务、定时全量迁移都会触发此能力 |
重建目标表 | 运行全量任务前重建目标表,包括重跑任务、定时全量迁移都会触发此能力 |
Stream Load 数据写入 | 采用 Stream Load 到 Doris / SelectDB be 写入数据, 默认攒批写入,可动态调节刷出数据节奏和批次大小 |
0 值时间处理 | 支持将 0 值时间设置成不同类型的值,防止写入对端报错 |
限制和注意点
| 限制项 | 说明 |
|---|---|
目标端需要提前创建表 | 仅支持消息自动创建 Topic |
原始消息格式 | 仅支持 AutoMQ 到 AutoMQ,且两端的消息格式都需要选择 原始消息格式 |
对端表类型 | 仅支持 唯一键模型(Unique) |
源端表类型 | 不支持 无主键表 迁移同步 |
数据类型 | 不支持 BINARY, BLOB 等二进制类型 |
增量写入冲突策略 | Stream Load 写入以主键进行整行替换 |
使用示例
| 标题 | 详情 |
|---|---|
CloudCanal x AutoMQ 数据迁移同步 | |
跨互联网数据互通 (AutoMQ) | |
AutoMQ 数据中转校验 | 文 档:AutoMQ 数据中转校验 |
任务参数
| 参数名称 | 说明 |
|---|---|
schemaFormat | MQ 消息格式,文档:消息格式说明 |
consumerGroupId | AutoMQ 消费组 Id |
consumeParallel | 消费 AutoMQ 的并行度 |
sessonTimeoutMs | AutoMQ Session 超时时间(毫秒) |
maxPollRecords | AutoMQ 一次最大拉取消息数量 |
Tips: 通用参数配置请参考 通用参数及功能
前置条件
| 条件 | 说明 |
|---|---|
账号权限 | 具备 SELECT, DDL 权限(可选) |
网络准备 | 迁移同步节点(sidecar)可连接 Doris / SelectDB FE QueryPort 和 FE/BE HttpPort |
任务参数
| 参数名称 | 说明 |
|---|---|
host | MySQL 协议交互链接,对应 Doris / SelectDB FE QueryPort |
httpHost | Doris stream load 链接,对应 Doris / SelectDB FE/BE HttpPort |
totalDataInMemMb | 攒批写入,内存中最大数据容量,超过此容量或超过 asyncFlushIntervalSec 则刷出数据到写入队列 |
asyncFlushIntervalSec | 攒批写入,等待刷出的间隔时间,超过此时间或超过 totalDataInMemMb 则刷出数据到写入队列 |
flushBatchMb | 单表最大攒批容量,超过此容量则刷出数据到写入队列 |
realFlushPauseSec | 使用 stream load 刷出数据到 Doris / SelectDB 的等待时间,0 则不等待 |
soTimeoutSec | 在 QueryPort 或 HttpPort 执行操作时 tcp 超时链接(so_timeout) |
enableTimeZoneProcess | 是否对时间字段进行时区转换 |
timezone | 目标端 Doris/SelectDB 时区,例如 +08:00 Asia/Shanghai America/New_York |
maxInSizePerQuery | 校验任务中,对端单次查询的最大 IN 条件值数量,大于该值会自动拆分多次查询 |
Tips: 通用参数配置请参考 通用参数及功能
