🎉 CloudCanal 上线 V6.1.0.0:KingbaseES 分区表迁移性能大幅提升
跳到主要内容

PostgreSQL 到 Kafka

CloudCanal 支持从 PostgreSQL 到 Kafka 的数据迁移、同步、校验和链路能力。

选择对端数据库:

数据链路

基本功能

功能说明
全量数据迁移

逻辑迁移,通过顺序扫描表数据,将数据分批写入到消息中间件

增量实时同步

支持 INSERTUPDATEDELETE 常见 DML 同步

修改订阅

新增、删除、修改订阅表,支持历史数据迁移,文档:修改订阅

DDL 同步

PostgreSQL DDL 同步基于 触发器 实现,需具备相应的 触发器 权限。文档:PostgreSQL 需要的权限

元数据检索

从源端表查对端,查询设置过过滤条件的

高级功能

功能说明
消息格式

支持以下消息格式,文档:消息格式说明

  • CloudCanal内置格式
  • AlibabaCanal兼容格式
Topic 映射规则

默认按 . 拼接源端 实例idschema 形成对端 topic 进行匹配或待创建(如 pg-vgpq6q097174t6t.pg_db.dingtax.app_key),额外支持按 源端一致转小写转大写 映射

表级别 Topic

最小按照源端表级别设置对应的 Topic, 支持自动获取表分区

定时全量迁移

文档1:创建定时全量任务
文档2:定时全量实现增量数据迁移

自定义代码

文档1:创建自定义代码任务
文档2:自定义代码任务 debug
文档3:在自定义代码中打日志

数据过滤条件

支持 WHERE 条件进行数据过滤,内容为 SQL 92 子集,文档:创建数据过滤任务

设置目标主键

变更主键为其他字段,方便数据聚合等操作

使用示例

标题详情
跨互联网数据互通 (Kafka)

文档:跨互联网数据互通 (Kafka)

Kafka 数据中转校验

文档:Kafka 数据中转校验


源端数据源

前置条件

条件说明
账号权限

需要权限如下(以自建数据库为例):

  • GRANT ALL PRIVILEGES ON DATABASE 同步库 TO 同步账号(或同步库 information_schema 中所有视图的 SELECT 权限和需要同步表、索引、约束的 SELECT 权限)
  • ALTER USER 同步账号 REPLICATION
增量同步准备

准备动作按如下步骤进行:

  • 修改 postgresql.conf, 设置 wal_level=logical 和 wal_log_hints = on
  • 修改 pg_hba.conf, 设置 host replication 同步账号 CIDR网段 md5 , host 同步库 同步账号 CIDR网段 md5, host postgres 同步账号 CIDR网段 md5
  • 重启 PostgreSQL
网络准备

迁移同步节点(sidecar)可连接 PostgreSQL 标准交互接口(如 5432)

任务参数

参数名称说明
fullFetchSize

全量扫描数据设置的 fetch size

eventStoreSize

缓存解析完毕的增量事件缓存大小

ignoreGisSRID

解析 GIS 数据类型时是否忽略 SRID

defaultGisSRID

设置 GIS 数据类型的 SRID

Tips: 通用参数配置请参考 通用参数及功能


目标端数据源

前置条件

条件说明
网络准备

迁移同步节点(sidecar)可连接 Kafka 各节点

任务参数

参数名称说明
schemaFormat

消息格式,文档:消息格式说明

batchWriteSize

单条消息最大数据条数,超过则拆分消息

defaultTopic

无法找到对应 Topic 的消息则发送到此 Topic (如新增表)

ddlTopic

专门发送 DDL 的 Topic, 为空则发送到对应 Topic 的第 0 个分区

compressionType

Kafka compression.type 参数, 设置压缩算法, 支持 GZIP, SNAPPY, LZ4, ZSTD 算法

batchSize

Kafka batch.size 参数

acks

Kafka acks 参数, 默认 all

maxRequestBytes

Kafka max.request.size 参数

lingerMs

Kafka linger.ms 参数, 默认 1

envelopSchemaInclude

当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息

customClientProps

自定义传入到 Kafka Client 参数,JSON 格式,key为参数名,value为参数值。此配置项以最高优先级生效。例如:AWS IAM 访问控制

Tips: 通用参数配置请参考 通用参数及功能

联系我们
微信二维码

扫码添加微信,获取技术支持