Kafka 到 Elasticsearch
CloudCanal 支持从 Kafka 到 Elasticsearch 的数据迁移、同步、校验和链路能力。
| 功能 | 说明 |
|---|---|
增量实时同步 | 支持订阅源端 Topic 的消息,并转换为 INSERT、UPDATE、DELETE DML |
修改订阅 | 新增、删除、修改订阅 Topic,文档:修改订阅 |
重置位点 | 按 时间戳 回溯位点,重新消费过去一段时间的数据 |
高级功能
| 功能 | 说明 |
|---|---|
消息格式 | 支持以下消息格式,文档:消息格式说明
|
全量前清空目标数据 | 运行全量任务前清除老数据,包括重跑任务、定时全量迁移都会触发此能力 |
重建目标表 | 运行全量任务前重建目标表,包括重跑任务、定时全量迁移都会触发此能力 |
ES 时间写入格式 | 以该字段的第一个时间格式写入 Elasticsearch,如果未设置时间格式,则使用 yyyy-MM-dd'T'HH:mm:ss 格式 |
ES 时区设置 | 只有当时间格式的时区为 ZZZZZ 时,才会将页面设置的时区写入到 Elasticsearch |
目标端需要提前创建索引 | Kafka / AutoMQ 到 Elasticsearch 不支持自动创建索引 |
设置目标主键 | 变更主键为其他字段,方便数据聚合等操作 |
使用示例
| 标题 | 详情 |
|---|---|
跨互联网数据互通 (Kafka) | |
Kafka 数据中转校验 | 文档:Kafka 数据中转校验 |
Elasticsearch 对端同步技术详解 |
任务参数
| 参数名称 | 说明 |
|---|---|
schemaFormat | MQ 消息格式,文档:消息格式说明 |
consumerGroupId | Kafka 消费组 Id |
consumeParallel | 消费 Kafka 的并行度 |
sessonTimeoutMs | Kafka Session 超时时间(毫秒) |
maxPollRecords | Kafka 一次最大拉取消息数量 |
dbHeartbeatIntervalSec | 配置对源端数据库发起心跳操作的间隔时长 |
dbHeartbeatToleranceStep | 配置对源端数据库心跳操作可容忍的位点差值 |
customClientProps | 自定义传入到 Kafka Client 参数,JSON 格式,key为参数名,value为参数值。此配置项以最高优先级生效。例如 :AWS IAM 访问控制 |
Tips: 通用参数配置请参考 通用参数及功能
前置条件
| 条件 | 说明 |
|---|---|
账号权限 | 具备索引的 create, delete, create_index, delete_index, read, write 权限 |
网络准备 | 迁移同步节点(sidecar)可连接 Elasticsearch 节点 |
任务参数
| 参数名称 | 说明 |
|---|---|
maxBulkSizeMb | 单表最大攒批容量,超过此容量则刷出数据到写入队列 |
totalDataInMemMb | 攒批写入,内存中最大数据容量,超过此容量或超过 asyncFlushIntervalSec 则刷出数据到写入队列 |
asyncFlushIntervalSec | 攒批写入,等待刷出的间隔时间,超过此时间或超过 totalDataInMemMb 则刷出数据到写入队列 |
realFlushPauseSec | 使用 Bulk Write 刷出数据到 ElasticSearch 的等待时间,0 则不等待 |
pkSeparator | 拼接 _id 的分隔符(字段数 > 1) |
enableBulkSizeThreshold | 启用批量写入模式(默认开启) |
Tips: 通用参数配置请参考 通用参数及功能
