跳到主要内容

Kafka

选择对端数据库:

数据链路

基本功能

功能说明
结构迁移

如目标端不存在的 Topic , 则自动进行 Topic 创建,并支持设置分区数

增量实时同步

支持订阅源端 Topic 的消息

修改订阅

新增、删除、修改订阅 Topic,文档:修改订阅

重置位点

时间戳 回溯位点,重新消费过去一段时间的数据

高级功能

功能说明
消息格式

支持以下消息格式,文档:消息格式说明

  • CloudCanal内置格式
  • AlibabaCanal兼容格式

限制和注意点

限制项说明
目标端需要提前创建表

仅支持消息自动创建 Topic

原始消息格式

仅支持 Kafka 到 Kafka,且两端的消息格式都需要选择 原始消息格式

使用示例

标题详情
跨互联网数据互通 (Kafka)

文档:跨互联网数据互通 (Kafka)

Kafka 数据中转校验

文档:Kafka 数据中转校验


源端数据源

前置条件

条件说明
网络准备

迁移同步节点(sidecar)可连接 Kafka 各节点

任务参数

参数名称说明
schemaFormat

MQ 消息格式,文档:消息格式说明

consumerGroupId

Kafka 消费组 Id

consumeParallel

消费 Kafka 的并行度

sessonTimeoutMs

Kafka Session 超时时间(毫秒)

maxPollRecords

Kafka 一次最大拉取消息数量

dbHeartbeatIntervalSec

配置对源端数据库发起心跳操作的间隔时长

dbHeartbeatToleranceStep

配置对源端数据库心跳操作可容忍的位点差值

Tips: 通用参数配置请参考 通用参数及功能


目标端数据源

前置条件

条件说明
网络准备

迁移同步节点(sidecar)可连接 Kafka 各节点

任务参数

参数名称说明
schemaFormat

消息格式,文档:消息格式说明

batchWriteSize

单条消息最大数据条数,超过则拆分消息

defaultTopic

无法找到对应 Topic 的消息则发送到此 Topic (如新增表)

ddlTopic

专门发送 DDL 的 Topic, 为空则发送到对应 Topic 的第 0 个分区

compressionType

Kafka compression.type 参数, 设置压缩算法, 支持 GZIP, SNAPPY, LZ4, ZSTD 算法

batchSize

Kafka batch.size 参数

acks

Kafka acks 参数, 默认 all

maxRequestBytes

Kafka max.request.size 参数

lingerMs

Kafka linger.ms 参数, 默认 1

envelopSchemaInclude

当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息

Tips: 通用参数配置请参考 通用参数及功能

数据链路

基本功能

高级功能

限制和注意点

使用示例

链路FAQ

源端数据源

前置条件

任务参数

目标端数据源

前置条件

任务参数