跳到主要内容

Oracle 到 ClickHouse 同步

简述

本文主要介绍 CloudCanal 如何将 Oracle 中的数据同步到 ClickHouse, 默认使用 ReplacingMergeTree 作为 ClickHouse 表引擎, 链路特点包括

  • 支持 ReplaceMergeTree _sign, _version 字段
  • 支持 DDL 同步

技术点

优化 ReplacingMergeTree

在 CloudCanal 的早期实现中,数据同步到 ClickHouse 的 ReplacingMergeTree 表时,采用了以下策略:

  • 将 Oracle 数据库中的 Insert 和 Update 操作统一转换为 Insert 操作。
  • 对于 Delete 操作,则通过 ALTER TABLE DELETE 语句单独处理。

虽然这种方式有效,但在遇到大量 Delete 操作时,容易导致同步性能下降,影响数据的实时性。

CloudCanal 在最新版本中对同步逻辑进行了优化,支持 ReplacingMergeTree 表引擎中的 _sign_version 字段。

其中,所有 InsertUpdateDelete 操作都会被转换为带有版本信息的 Insert 操作。

结构迁移

在执行 Oracle 数据向 ClickHouse 的结构迁移时,CloudCanal 默认选择 ReplacingMergeTree 作为表引擎,并自动为表添加 _sign 和 _version 字段:

CREATE TABLE `console`.`worker_stats`
(
`id` Int64,
`gmt_create` DateTime,
`worker_id` Int64,
`cpu_stat` String,
`mem_stat` String,
`disk_stat` String,
`_sign` UInt8 DEFAULT 0,
`_version` UInt64 DEFAULT 0,
INDEX `_version_minmax_idx` (`_version`) TYPE minmax GRANULARITY 1
) ENGINE = ReplacingMergeTree(`_version`, `_sign`) ORDER BY `id`

数据导入

操作转换

在数据导入过程中,CloudCanal 采用如下的转换策略:

  • 源端的 Insert 操作:

    # 插入新数据,_sign 设置为 0
    INSERT INTO <schema>.<table> (columns, _sign, _version) VALUES (..., 0, <new_version>);
  • 源端的 Update 操作(会转换为两条 Insert):

    # 逻辑删除旧数据,_sign 设置为 1
    INSERT INTO <schema>.<table> (columns, _sign, _version) VALUES (..., 1, <new_version>);

    # 插入新数据,_sign 设置为 0
    INSERT INTO <schema>.<table> (columns, _sign, _version) VALUES (..., 0, <new_version>);
  • 源端的 Delete 操作:

    # 逻辑删除旧数据,_sign 设置为 1
    INSERT INTO <schema>.<table> (columns, _sign, _version) VALUES (..., 1, <new_version>);

数据版本

在写入数据时,CloudCanal 会维护每个表的版本信息:

  • 版本初始化:在进行第一次写入时,CloudCanal 会通过以下 SQL 语句获取当前表的最新版本号

    SELECT MAX(`_version`) FROM `console`.`worker_stats`;
  • 版本递增:每次写入新数据时,CloudCanal 都会基于上次获取的最大版本号递增,确保每次写入操作都有一个独立且递增的版本号。

查询时,通过添加 final 关键字来过滤未删除的行,从而确保查询结果的数据准确性。

SELECT `id`, `gmt_create`, `worker_id`, `cpu_stat`, `mem_stat`, `disk_stat`
FROM `console`.`worker_stats` final;

操作示例

  1. 下载安装 CloudCanal 私有部署版本
  2. 造 Insert、Update、Delete 负载,比例为 1:1:1。
  3. 添加数据源。 1.png
  4. 创建任务,选择数据源和库,并连接成功。
  5. ClickHouse 侧点开高级选项,确保 表引擎为 ReplacingMergeTree (ReplicatedReplacingMergeTree),点击下一步。 2.png
  6. 选择数据同步,建议规格至少选择 1 GB,点击下一步。 3.png
  7. 选择表、列、创建确认默认下一步。
  8. 等待任务自动结构迁移、全量迁移、数据同步追上。
  9. 延迟追平状态,停止负载。
  10. 手动触发 ClickHouse 优化后,创建一个校验任务,跑完结果一致。 4.png

常见问题

优化后还存在的问题

由于 ClickHouse 的自动合并是不可预测的,可能会导致数据校验显示不准。

可以手动触发合并运行 OPTIMIZE TABLE xxx FINAL; 命令,但这个操作可能不成功。

或者执行 CREATE VIEW xxx v AS SELECT * FROM xxx FINAL; 命令创建视图,并对视图执行查询,以确保数据完全合并。

总结

本文简要介绍了 CloudCanal 实现 Oracle 到 ClickHouse 数据迁移同步的能力,帮助业务快速构建实时数据分析环境。