Kafka 到 ClickHouse
CloudCanal 支持从 Kafka 到 ClickHouse 的数据迁移、同步、校验和链路能力。
| 功能 | 说明 |
|---|---|
增量实时同步 | 支持订阅源端 Topic 的消息,并转换为 INSERT、UPDATE、DELETE DML |
修改订阅 | 新增、删除、修改订阅 Topic,文档:修改订阅 |
重置位点 | 按 时间戳 回溯位点,重新消费过去一段时间的数据 |
高级功能
| 功能 | 说明 |
|---|---|
消息格式 | 支持以下消息格式,文档:消息格式说明
|
追加模式写入 | INSERT 和 UPDATE 以追加模式批量写入, DELETE 单独通过 ALTER 方式执行 |
定时优化表 | 通过设置 autoOptimizeThresholdSec 参数,定时优化表 |
限制和注意点
| 限制项 | 说明 |
|---|---|
目标端需要提前创建表 | 仅支持消息自动创建 Topic |
原始消息格式 | 仅支持 Kafka 到 Kafka,且两端的消息格式都需要选择 原始消息格式 |
特殊操作 | DELETE 操作过多(>50 条/秒)将大幅影响数据同步性能 |
目标端表引擎 | 仅支持以下表引擎以及对应的源端表类型:
|
使用示例
| 标题 | 详情 |
|---|---|
跨互联网数据互通 (Kafka) | |
Kafka 数据中转校验 | 文档:Kafka 数据中转校验 |
任务参数
| 参数名称 | 说明 |
|---|---|
schemaFormat | MQ 消息格式,文档:消息格式说明 |
consumerGroupId | Kafka 消费组 Id |
consumeParallel | 消费 Kafka 的并行度 |
sessonTimeoutMs | Kafka Session 超时时间(毫秒) |
maxPollRecords | Kafka 一次最大拉取消息数量 |
dbHeartbeatIntervalSec | 配置对源端数据库发起心跳操作的间隔时长 |
dbHeartbeatToleranceStep | 配置对源端数据库心跳操作可容忍的位点差值 |
customClientProps | 自定义传入到 Kafka Client 参数,JSON 格式,key为参数名,value为参数值。此配置项以最高优先级生效。例如:AWS IAM 访问控制 |
Tips: 通用参数配置请参考 通用参数及功能
任务参数
| 参数名称 | 说明 |
|---|---|
multiReplica | 是否为多副本集群 |
clusterName | 集群名称,当 multiReplica 为 true, 则自动在 DDL/DML 中加入 ON CLUSTER clusterName 子句 |
ckTableEngine | 当前支持以下表引擎:
|
autoOptimizeThresholdSec | 定时优化表(optimize table final)间隔,<=0 则关闭此功能 |
enableTimeRangeClamping | 是否启用时间范围裁剪,强制将时间和日期值收束到 ClickHouse JDBC 的合法区间内,超出的数值将被截断至最小值或最大值。默认关闭(false)。 收束后范围(UTC):
|
Tips: 通用参数配置请参考 通用参数及功能
