Kafka
选择对端数据库:
Kafka
功能 | 说明 |
---|---|
结构迁移 | 如目标端不存在的 Topic , 则自动进行 Topic 创建,并支持设置分区数 |
增量实时同步 | 支持订阅源端 Topic 的消息 |
修改订阅 | 新增、删除、修改订阅 Topic,文档:修改订阅 |
重置位点 | 按 时间戳 回溯位点,重新消费过去一段时间的数据 |
高级功能
功能 | 说明 |
---|---|
消息格式 | 支持以下消息格式,文档:消息格式说明
|
使用示例
标题 | 详情 |
---|---|
跨互联网数据互通 (Kafka) | |
Kafka 数据中转校验 | 文档:Kafka 数据中转校验 |
任务参数
参数名称 | 说明 |
---|---|
schemaFormat | MQ 消息格式,文档:消息格式说明 |
consumerGroupId | Kafka 消费组 Id |
consumeParallel | 消费 Kafka 的并行度 |
sessonTimeoutMs | Kafka Session 超时时间(毫秒) |
maxPollRecords | Kafka 一次最大拉取消息数量 |
dbHeartbeatIntervalSec | 配置对源端数据库发起心跳操作的间隔时长 |
dbHeartbeatToleranceStep | 配置对源端数据库心跳操作可容忍的位点差值 |
Tips: 通用参数配置请参考 通用参数及功能
任务参数
参数名称 | 说明 |
---|---|
schemaFormat | 消息格式,文档:消息格式说明 |
batchWriteSize | 单条消息最大数据条数,超过则拆分消息 |
defaultTopic | 无法找到对应 Topic 的消息则发送到此 Topic (如新增表) |
ddlTopic | 专门发送 DDL 的 Topic, 为空则发送到对应 Topic 的第 0 个分区 |
compressionType | Kafka compression.type 参数, 设置压缩算法, 支持 GZIP, SNAPPY, LZ4, ZSTD 算法 |
batchSize | Kafka batch.size 参数 |
acks | Kafka acks 参数, 默认 all |
maxRequestBytes | Kafka max.request.size 参数 |
lingerMs | Kafka linger.ms 参数, 默认 1 |
envelopSchemaInclude | 当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息 |
Tips: 通用参数配置请参考 通用参数及功能