跳到主要内容

Kafka

选择对端数据库:

数据链路

基本功能

功能说明
结构迁移

如目标端不存在的 Topic , 则自动进行 Topic 创建,并支持设置分区数

增量实时同步

支持订阅源端 Topic 的消息

修改订阅

新增、删除、修改订阅 Topic,文档:修改订阅

重置位点

时间戳 回溯位点,重新消费过去一段时间的数据

高级功能

功能说明
Message Format

The following message formats are supported. See Message Format.

  • CloudCanal format
  • AlibabaCanal format

限制和注意点

限制项说明
Creating Tables in the Target in Advance

Only support automatic Topic creation for messages.

Raw Message Format

Only support raw message replication from Kafka to Kafka, and Raw Message Format needs to be selected at both the Source and the Target.


源端数据源

前置条件

条件说明
Port Preparation

Allow the migration and sync node (Worker) to connect to the Kafka ports.

任务参数

参数名称说明
schemaFormat

MQ Message format. For more information, see Message Format.

consumerGroupId

Kafka consumer group ID.

consumeParallel

Degree of consuming Kafka topics in parallel.

sessionTimeoutMs

Kafka session timeout in milliseconds.

maxPollRecords

Maximum number of messages fetched in one poll from Kafka.

dbHeartbeatIntervalSec

Interval for initiating heartbeat on the source database.

dbHeartbeatToleranceStep

The threshold of gap between the latest offset and the current offset. If the actual gap is bigger than the threshold, BladePipe won‘t send heartbeat message.

Tips: 通用参数配置请参考 通用参数及功能


目标端数据源

前置条件

条件说明
Port Preparation

Allow the migration and sync node (Worker) to connect to the Kafka ports.

任务参数

参数名称说明
schemaFormat

Message format. For more information, see Message Format.

batchWriteSize

The maximum data size of a single message. If the size exceeds the limit, the message will be split.

defaultTopic

Messages that cannot find a corresponding topic are sent to this topic (such as adding a new table)

ddlTopic

A topic specifically used to receive DDL events. If it is empty, the DDL events will be sent to the 0th partition of the corresponding topic.

compressionType

Kafka compression.type parameter to set compression algorithm. Support GZIP, SNAPPY, LZ4, ZSTD.

batchSize

Kafka batch.size parameter.

acks

Kafka acks parameter. By default, it is all.

maxRequestBytes

Kafka max.request.size parameter.

lingerMs

Kafka linger.ms parameter. By default, it is 1.

envelopSchemaInclude

When schemaFormat is set to DEBEZIUM_ENVELOP_JSON_FOR_MQ, it means whether the message body contains schema information.

Tips: 通用参数配置请参考 通用参数及功能

数据链路

基本功能

高级功能

限制和注意点

使用示例

链路FAQ

源端数据源

前置条件

任务参数

目标端数据源

前置条件

任务参数