Kafka
选择对端数据库: 
Kafka
| 功能 | 说明 | 
|---|---|
| 结构迁移 | 如目标端不存在的 Topic , 则自动进行 Topic 创建,并支持设置分区数 | 
| 增量实时同步 | 支持订阅源端 Topic 的消息 | 
| 修改订阅 | 新增、删除、修改订阅 Topic,文档:修改订阅 | 
| 重置位点 | 按 时间戳 回溯位点,重新消费过去一段时间的数据 | 
高级功能
| 功能 | 说明 | 
|---|---|
| 消息格式 | 支持以下消息格式 ,文档:消息格式说明 
 | 
使用示例
| 标题 | 详情 | 
|---|---|
| 跨互联网数据互通 (Kafka) | |
| Kafka 数据中转校验 | 文档:Kafka 数据中转校验 | 
任务参数
| 参数名称 | 说明 | 
|---|---|
| schemaFormat | MQ 消息格式,文档:消息格式说明 | 
| consumerGroupId | Kafka 消费组 Id | 
| consumeParallel | 消费 Kafka 的并行度 | 
| sessonTimeoutMs | Kafka Session 超时时间(毫秒) | 
| maxPollRecords | Kafka 一次最大拉取消息数量 | 
| dbHeartbeatIntervalSec | 配置对源端数据库发起心跳操作的间隔时长 | 
| dbHeartbeatToleranceStep | 配置对源端数据库心跳操作可容忍的位点差值 | 
Tips: 通用参数配置请参考 通用参数及功能
任务参数
| 参数名称 | 说明 | 
|---|---|
| schemaFormat | 消息格式,文档:消息格式说明 | 
| batchWriteSize | 单条消息最大数据条数,超过则拆分消息 | 
| defaultTopic | 无法找到对应 Topic 的消息则发送到此 Topic (如新增表) | 
| ddlTopic | 专门发送 DDL 的 Topic, 为空则发送到对应 Topic 的第 0 个分区 | 
| compressionType | Kafka compression.type 参数, 设置压缩算法, 支持 GZIP, SNAPPY, LZ4, ZSTD 算法 | 
| batchSize | Kafka batch.size 参数 | 
| acks | Kafka acks 参数, 默认 all | 
| maxRequestBytes | Kafka max.request.size 参数 | 
| lingerMs | Kafka linger.ms 参数, 默认 1 | 
| envelopSchemaInclude | 当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息 | 
Tips: 通用参数配置请参考 通用参数及功能