Hana 到 PostgreSQL 数据同步
· 阅读需 4 分钟
简述
本篇文章主要介绍如何使用 CloudCanal 构建一条 Hana 到 PostgreSQL 的数据同步链路。
技术点
表级别 CDC 表
CloudCanal 在实现 Hana 源端增量同步时,最初采用的是单 CDC 表的模式,即所有订阅表的增量数据(插入、更新、删除)通过触发器统一写入同一张 CDC 表。这样设计的初衷是简化架构和实现方式,但是同时也带来了一些问题。
-
触发器执行效率低:采用单个 CDC 表时,我们将订阅表的字段值拼接成 JSON 字符串。虽然这种方式统一,但增加了触发器的复杂性。当字段数量超过 300 个时,触发器效率显著下降,影响同步性能。
-
增量数据积压:所有订阅表的变更数据集中写入单个 CDC 表,当 A 表增量数据较多而 B 表较少时,混合写入会导致 无法及时处理 B 表数据,造成 B 表数据积压,影响同步及时性。
后续我们对于单 CDC 表模式进行了优化。本次优化实现了表级别的 CDC 表设计,每张源表都对应一张 CDC 表,CDC 表的结构仅在原表结构的基础上增加了几个位点字段,用于增量同步。
原表:
CREATE COLUMN TABLE "SYSTEM"."TEST" (
"TEST1" INTEGER NOT NULL ,
"TEST2" INTEGER NOT NULL ,
"TEST3" INTEGER,
CONSTRAINT "TEST_KEY" PRIMARY KEY ("TEST1", "TEST2")
)
CDC 表:
CREATE COLUMN TABLE "SYSTEM"."SYSTEM_TEST_CDC_TABLE" (
"TEST1" INTEGER,
"TEST2" INTEGER,
"TEST3" INTEGER,
"__$DATA_ID" BIGINT NOT NULL ,
"__$TRIGGER_ID" INTEGER NOT NULL ,
"__$TRANSACTION_ID" BIGINT NOT NULL ,
"__$CREATE_TIME" TIMESTAMP,
"__$OPERATION" INTEGER NOT NULL
);
-- other index
触发器 (INSERT):
CREATE TRIGGER "SYSTEM"."CLOUD_CANAL_ON_I_TEST_TRIGGER_TEST" AFTER INSERT ON "SYSTEM"."TEST" REFERENCING NEW ROW NEW FOR EACH ROW
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
IF 1=1 THEN
INSERT INTO "SYSTEM"."SYSTEM_TEST_CDC_TABLE" ("__$DATA_ID", "__$TRIGGER_ID", "__$TRANSACTION_ID", "__$CREATE_TIME", "__$OPERATION", "TEST1", "TEST2", "TEST3")
VALUES(
"SYSTEM"."CC_TRIGGER_SEQ".NEXTVAL,
433,
CURRENT_UPDATE_TRANSACTION(),
CURRENT_UTCTIMESTAMP,
2,
:NEW."TEST1" ,
:NEW."TEST2" ,
:NEW."TEST3"
);
END IF;
END;
采用这种方式有以下几个好处:
- 表级别 CDC 表更加独立,方便进行多次订阅。
- 触发器只需要执行 INSERT 语句,因此对于字段较多的表也能够快速执行。
- 扫描消费 CDC 数据时,不需要做额外的处理,消费更简单。