MongoDB
功能 | 说明 |
---|---|
全量数据迁移 | 逻辑迁移,通过顺序扫描表数据,将数据分批写入到对端数据库,支持的 _id 类型为 ObjectId、Long、Integer |
增量实时同步 | 支持 INSERT、UPDATE、DELETE 同步 |
修改订阅 | 新增、删除、修改订阅表,支持历史数据迁移,文档:修改订阅 |
增量位点回溯 | 支持按照 时间戳 回溯位点,重新消费过去一段时间的 oplog |
部署形态支持 | 支持 主备、副本集、分片集群 |
高级功能
功能 | 说明 |
---|---|
消息格式 | 支持以下消息格式(文档):
|
表级别 topic | 最小按照源端表级别设置对应的 topic, 支持自动获取表分区 |
DDL 专用 topic | 支持指定 topic 发送 DDL, 如未指定,则放置 DDL 时间在对应表 topic 分区 0 中 |
限制和注意点
限制项 | 说明 |
---|---|
oplog 大小和保留时间设置 | MongoDB 默认配置 replication.oplogSizeMB 过小 或 storage.oplogMinRetentionHours 过小, 如数据同步延迟过大,可能导致未消费的 oplog 被清除。需调大此参数 |
MongoDB 主备架构的任务设置 | 源端 MongoDB 主备架构模式,需要将源端任务参数 oplogCollection 设置为 oplog.$main |
changeStream 模式 | MongoDB 3.6 以上支持 changeStream 获取增量变更,同步任务可以设置源端参数 captureMode 为 CHANGE_STREAM。MongoDB 分片集群可以填写 Mongos 连接串进行同步。 |
oplog 模式 | 当使用 oplog 模式进行 MongoDB 源端同步时,需要确保该能够访问到 local 库。 |
目标端需要提前创建 Topic | MongoDB 到 Kafka 不支持自动创建 Topic |
使用示例
标题 | 详情 |
---|---|
跨互联网数据互通(Kafka) | |
Kafka 数据中转校验 | 文档:Kafka 数据中转校验 |
任务参数
参数名称 | 说明 |
---|---|
captureMode | 配置 MongoDB 增量源端模式,支持 OP_LOG 和 CHANGE_STREAM 模式 |
changeStreamBatchSize | 配置 MongoDB Change Stream 每一批拉取变更事件的最大条数 |
oplogCollection | 配置 MongoDB oplog 的集合名,默认是 oplog.rs |
timezone | 需要转换的源端时区(默认UTC) |
Tips: 通用参数配置请参考 通用参数及功能
任务参数
参数名称 | 说明 |
---|---|
schemaFormat | 消息格式,文档:消息格式说明 |
batchWriteSize | 单条消息最大数据条数,超过则拆分消息 |
defaultTopic | 无法找到对应 topic 的消息则发送到此 Topic (如新增表) |
ddlTopic | 专门发送 DDL 的 Topic, 为空则发送到对应 Topic 的第 0 个分区 |
compressionType | Kafka compression.type 参数, 设置压缩算法, 支持 GZIP, SNAPPY, LZ4, ZSTD 算法 |
batchSize | Kafka batch.size 参数 |
acks | Kafka acks 参数, 默认 all |
maxRequestBytes | Kafka max.request.size 参数 |
lingerMs | Kafka linger.ms 参数, 默认 1 |
envelopSchemaInclude | 当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息 |
Tips: 通用参数配置请参考 通用参数及功能