Elasticsearch 到 Elasticsearch 数据同步
简述
Elasticsearch 是一款流行的搜索引擎,和关系型数据库、缓存数据库、实时数据仓库、消息中间件共同组成应用的现代化数据堆栈。
写入数据到 Elasticsearch 相对容易,但是如何将其数据实时同步出来,则要困难一些。
本文主要介绍如何通过 CloudCanal 结合 Elasticsearch 增量数据捕获插件,实现 Elasticsearch 到 Elasticsearch 数据迁移同步。
技术点
使用 Elasticsearch 插件机制
Elasticsearch 并没有明确给出如何实时获取其中的变更数据,但是其插件 API IndexingOperationListener 可订阅到 INDEX 事件和 DELETE 事件,前者即 INSERT 或 UPDATE 操作, 后者即传统意义上的 DELETE 操作。
明确增量数据的获取机制,如何让下游获取这些数据则成了接下来的问题。
我们采用了一个单独索引 cc_es_trigger_idx 作为增量数据的容器。
这个方式有几个好处:
- 不依赖第三方组件(如消息中间件)。
- Elasticsearch 索引管理方便。
- 和 CloudCanal 其他数据源 Trigger 方式增量数据获取风格一致,机制代码可重用。

索引 cc_es_trigger_idx 结构如下, 其中 row_data 保留 INDEX 操作变更后数据,pk 则保存文档 _id。
{
"mappings": {
"_doc": {
"properties": {
"create_time": {
"type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ssSSS"
},
"event_type": {
"type": "text",
"analyzer": "standard"
},
"idx_name": {
"type": "text",
"analyzer": "standard"
},
"pk": {
"type": "text",
"analyzer": "standard"
},
"row_data": {
"type": "text",
"index": false
},
"scn": {
"type": "long"
}
}
}
}
}
Trigger 数据扫描机制
CloudCanal 使用 Elasticsearch 插件产生的增量数据,只要按照 cc_es_trigger_idx 索引 scn 字段顺序批量扫描消费即可。
消费代码风格和 SAP Hana 源端保持一致。
插件开源
Elasticsearch 插件加载会严格识别插件所依赖的三方包,如和 Elasticsearch 本体三方依赖包重复或版本不一致,则无法加载,所以插件需要和 Elasticsearch 版本保持一致(包括小版本)。
鉴于发布大量预编译包可操作性差,同时为了插件能够有更加广泛的使用,我们将插件开源在了 GitHub 上。
操作示例
步骤 1: 源端 Elasticsearch 安装插件
参考 Elasticsearch 源端同步准备 文档安装增量数据捕获插件。
