🎉 CloudCanal 上线 V6.1.0.0:KingbaseES 分区表迁移性能大幅提升
跳到主要内容

Kafka 到 Elasticsearch

CloudCanal 支持从 Kafka 到 Elasticsearch 的数据迁移、同步、校验和链路能力。

选择对端数据库:

数据链路

基本功能

功能说明
增量实时同步

支持订阅源端 Topic 的消息,并转换为 INSERTUPDATEDELETE DML

修改订阅

新增、删除、修改订阅 Topic,文档:修改订阅

重置位点

时间戳 回溯位点,重新消费过去一段时间的数据

高级功能

功能说明
消息格式

支持以下消息格式,文档:消息格式说明

  • CloudCanal内置格式
  • AlibabaCanal兼容格式
全量前清空目标数据

运行全量任务前清除老数据,包括重跑任务、定时全量迁移都会触发此能力

重建目标表

运行全量任务前重建目标表,包括重跑任务、定时全量迁移都会触发此能力

ES 时间写入格式

以该字段的第一个时间格式写入 Elasticsearch,如果未设置时间格式,则使用 yyyy-MM-dd'T'HH:mm:ss 格式

ES 时区设置

只有当时间格式的时区为 ZZZZZ 时,才会将页面设置的时区写入到 Elasticsearch

目标端需要提前创建索引

Kafka / AutoMQ 到 Elasticsearch 不支持自动创建索引

设置目标主键

变更主键为其他字段,方便数据聚合等操作

限制和注意点

限制项说明
目标端需要提前创建表

仅支持消息自动创建 Topic

原始消息格式

仅支持 Kafka 到 Kafka,且两端的消息格式都需要选择 原始消息格式

使用示例

标题详情
跨互联网数据互通 (Kafka)

文档:跨互联网数据互通 (Kafka)

Kafka 数据中转校验

文档:Kafka 数据中转校验

Elasticsearch 对端同步技术详解

文档:Elasticsearch 对端同步技术详解


源端数据源

前置条件

条件说明
网络准备

迁移同步节点(sidecar)可连接 Kafka 各节点

任务参数

参数名称说明
schemaFormat

MQ 消息格式,文档:消息格式说明

consumerGroupId

Kafka 消费组 Id

consumeParallel

消费 Kafka 的并行度

sessonTimeoutMs

Kafka Session 超时时间(毫秒)

maxPollRecords

Kafka 一次最大拉取消息数量

dbHeartbeatIntervalSec

配置对源端数据库发起心跳操作的间隔时长

dbHeartbeatToleranceStep

配置对源端数据库心跳操作可容忍的位点差值

customClientProps

自定义传入到 Kafka Client 参数,JSON 格式,key为参数名,value为参数值。此配置项以最高优先级生效。例如:AWS IAM 访问控制

Tips: 通用参数配置请参考 通用参数及功能


目标端数据源

前置条件

条件说明
账号权限

具备索引的 create, delete, create_index, delete_index, read, write 权限

网络准备

迁移同步节点(sidecar)可连接 Elasticsearch 节点

任务参数

参数名称说明
maxBulkSizeMb

单表最大攒批容量,超过此容量则刷出数据到写入队列

totalDataInMemMb

攒批写入,内存中最大数据容量,超过此容量或超过 asyncFlushIntervalSec 则刷出数据到写入队列

asyncFlushIntervalSec

攒批写入,等待刷出的间隔时间,超过此时间或超过 totalDataInMemMb 则刷出数据到写入队列

realFlushPauseSec

使用 Bulk Write 刷出数据到 ElasticSearch 的等待时间,0 则不等待

pkSeparator

拼接 _id 的分隔符(字段数 > 1)

enableBulkSizeThreshold

启用批量写入模式(默认开启)

Tips: 通用参数配置请参考 通用参数及功能

联系我们
微信二维码

扫码添加微信,获取技术支持