Oracle 到 Kafka
CloudCanal 支持从 Oracle 到 Kafka 的数据迁移、同步、校验和链路能力。
| 功能 | 说明 |
|---|---|
全量数据迁移 | 逻辑迁移,通过顺序扫描表数据,将数据分批写入到消息中间件 |
增量实时同步 | 支持 INSERT、UPDATE、DELETE 常见 DML 同步 |
修改订阅 | 新增、删除、修改订阅表,支持历史数据迁移,文档:修改订阅 |
重置位点 | 按 时间戳 或 Scn 回溯位点,重新消费过去一段时间 Oracle Redo Log |
元数据检索 | 从源端表查对端,查询设置过过滤条件的 |
高级功能
| 功能 | 说明 |
|---|---|
自动建字典 | 如果使用离线字典解析 Oracle Redo, 则在创建任务时自动创建字典 |
消息格式 | 支持以下消息格式,文档:消息格式说明
|
Topic 映射规则 | 默认按 . 拼接 源端 实例id、库、schema、表 形成对端 topic 进行匹配或待创建(如 ora-vgpq6q097174t6t.ORCL.dingtax.app_key),额外支持按 源端一致、转小写、转大写 映射 |
表级别 Topic | 最小按照源端表级别设置对应的 Topic, 支持自动获取表分区 |
DDL 专用 Topic | 支持指定 Topic 发送 DDL, 如未指定,则放置 DDL 时间在对应表 Topic 分区 0 中 |
定时全量迁移 | 文档1:创建定时全量任务 |
自定义代码 | 文档1:创建自定义代码任务 |
数据过滤条件 | 支持 WHERE 条件进行数据过滤,内容为 SQL 92 子集,文档:创建数据过滤任务 |
设置目标主键 | 支持变更主键为其他字段 |
使用示例
| 标题 | 详情 |
|---|---|
Oracle 数据迁移同步优化与思考 | |
Oracle 数据迁移同步优化(三) | |
跨互联网数据互通 (Kafka) | |
Kafka 数据中转校验 | 文档:Kafka 数据中转校验 |
前置条件
| 条件 | 说明 |
|---|---|
账号权限 | 文档:Oracle 需要的权限 |
增量同步准备 | |
网络准备 | 迁移同步节点(sidecar)可连接 ORACLE 标准交互接口(如 1521) |
任务参数
| 参数名称 | 说明 |
|---|---|
fullFetchSize | 全量扫描数据设置的 fetch size |
eventStoreSize | 缓存解析完毕的增量事件缓存大小 |
logminerUser | 执行 Logminer SQL 的 Oracle 连接用户 |
logminerPasswd | 执行 Logminer SQL 的 Oracle 连接密码 |
logminerConnectType | 执行 Logminer SQL 的 Oracle 连接类型(PDB),包括 ORACLE_SID, ORACLE_SERVICE 两种可选 |
logminerSidOrService | 执行 Logminer SQL 的 Oracle 连接串 SID 或服务名(PDB) |
parseRedoSqlParallel | 解析 Logminer 数据的并发度 |
parseRedoSqlBufferSize | 解析 Logminer 数据的环形队列大小 |
redoFetchSize | 单次获取 Logminer 分析数据条数 |
redoOfferTransMaxSize | 未消费但已提交事务最大缓存数量 |
oraMiningSessionPauseSec | 使用 Logminer 挖掘日志间隙停顿时间,单位为秒 |
maxEventCountPerTxInMem | 内存中每个事务的最大事件数 |
logMiningScnStep | Oracle Logminer 分析 redo log 时指定的分析范围大小 |
abandonUnCommitTxTimeoutSec | 不带数据变更的事务未提交超过设置的值,则自动放弃该事务 |
restartTxWithDataTimeoutSec | 带数据变更的事务未提交超过设置的值,则自动重启任务 |
oraUseOnlineDic | 是否使用在线日志,false 使用离线日志对 Oracle 压力较大 |
oraReleaseIntervalSec | 重建分析链接的间隔,以释放 Oracle 服务端资源 |
fallBackScnStep | 和 Redo log 最新数据保持的距离,0 表示紧跟 |
sqlCaseConversionEnabled | 是否打开 DDL 大小写转换(根据当前数据库默认大小写规则) |
Tips: 通用参数配置请参考 通用参数及功能
任务参数
| 参数名称 | 说明 |
|---|---|
schemaFormat | 消息格式,文档:消息格式说明 |
batchWriteSize | 单条消息最大数据条数,超过则拆分消息 |
defaultTopic | 无法找到对应 Topic 的消息则发送到此 Topic (如新增表) |
ddlTopic | 专门发送 DDL 的 Topic, 为空则发送到对应 Topic 的第 0 个分区 |
compressionType | Kafka compression.type 参数, 设置压缩算法, 支持 GZIP, SNAPPY, LZ4, ZSTD 算法 |
batchSize | Kafka batch.size 参数 |
acks | Kafka acks 参数, 默认 all |
maxRequestBytes | Kafka max.request.size 参数 |
lingerMs | Kafka linger.ms 参数, 默认 1 |
envelopSchemaInclude | 当 schemaFormat 设置为 DEBEZIUM_ENVELOP_JSON_FOR_MQ 时,消息体是否包含 schema 信息 |
customClientProps | 自定义传入到 Kafka Client 参数,JSON 格式,key为参数名,value为参数值。此配置项以最高优先级生效。例如:AWS IAM 访问控制 |
Tips: 通用参数配置请参考 通用参数及功能
