跳到主要内容

Kafka

选择对端数据库:

数据链路

基本功能

功能说明
结构迁移

如目标端不存在的 Topic , 则自动进行 Topic 创建,并支持设置分区数

增量实时同步

支持订阅源端 Topic 的消息

修改订阅

新增、删除、修改订阅 Topic,文档:修改订阅

高级功能

功能说明
消息格式

Kafka 源端支持 CloudCanal内置格式AlibabaCanal兼容格式DebeziumEnvelope兼容格式原始消息格式,详细格式说明文档:MQ 消息同步格式说明

限制和注意点

限制项说明
原始消息格式

只支持 Kafka -> Kafka,且两端的消息格式都需要选择 原始消息格式

使用示例

标题详情
跨互联网数据互通(Kafka)

文档:跨互联网数据互通(Kafka)

Kafka 数据中转校验

文档:Kafka 数据中转校验


源端数据源

前置条件

条件说明
网络准备

迁移同步节点( sidecar )可连接 Kafka 各节点

任务参数

参数名称说明
schemaFormat

消息格式,文档:消息格式说明

consumerGroupId

Kafka 消费组 Id

consumeParallel

消费 Kafka 的并行度

sessonTimeoutMs

Kafka Session 超时时间(毫秒)

maxPollRecords

Kafka 一次最大拉取消息数量

Tips: 通用参数配置请参考 通用参数及功能


目标端数据源

前置条件

条件说明
网络准备

迁移同步节点( sidecar )可连接 Kafka 各节点

任务参数

参数名称说明
schemaFormat

消息格式,文档:消息格式说明

batchWriteSize

单条消息最大数据条数,超过则拆分消息

defaultTopic

无法找到对应 topic 的消息则发送到此 Topic (如新增表)

ddlTopic

专门发送 DDL 的 Topic, 为空则发送到对应 Topic 的第 0 个分区

compressionType

Kafka compression.type 参数, 设置压缩算法, 支持 GZIP, SNAPPY, LZ4, ZSTD 算法

batchSize

Kafka batch.size 参数

acks

Kafka acks 参数, 默认 all

maxRequestBytes

Kafka max.request.size 参数

lingerMs

Kafka linger.ms 参数, 默认 1

envelopSchemaInclude

当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息

Tips: 通用参数配置请参考 通用参数及功能

数据链路

基本功能

高级功能

限制和注意点

使用示例

字段类型与映射

链路FAQ

源端数据源

前置条件

任务参数

目标端数据源

前置条件

任务参数