跳到主要内容
Barry
Chief Technology Officer
查看所有作者

RAG 青铜升级:知识库召回率优化实战攻略

· 阅读需 9 分钟
Barry
Chief Technology Officer

s 在大模型(LLM)驱动的问答系统中,RAG(Retrieval-Augmented Generation)架构正迅速成为主流;然而在实际应用中,即便接入了如 GPT-4 或 Claude 等先进模型,但生成结果仍然不够理想。

问题的根源往往并不在于模型本身,而在于——它没有检索到相关信息,这就引出了评估 RAG 检索质量的核心指标:召回率(Recall)

本文将深入探讨召回率的本质,以及如何通过构建一个结构化、丰富且高质量的知识库,显著提升 RAG 系统的召回效果,从而增强问答系统的准确性与实用性。

召回率是什么?

在 RAG 检索系统中,召回率指的是在所有真正相关的文档中,有多少被成功地检索了出来。

计算方式:

召回率 = 检索到的相关内容数量 ÷ 所有相关内容的总数量

举个例子,假设你有一个技术文档知识库,里面记录着产品安装、配置、调优等各类信息。

某个用户提问:“如何在 Kubernetes 上部署?”

假设知识库中有 6 条与这个问题高度相关的内容,但系统只返回了其中 3 条。那么,召回率就是 3 ÷ 6 = 50%。

在 RAG 系统中,召回率尤为关键,因为大模型能不能“答对”,很大程度取决于有没有拿到相关内容;召回率越高,LLM 在生成答案时能参考的有效信息就越多,回答的质量和准确性也就越有保障。

召回不准的原因

很多人将注意力放在向量数据库、查询优化、模型推理上,却忽视了最根本的基础设施 —— 知识库构建

召回失败,往往源于三个层面:

  1. 数据覆盖不足:知识来源单一,未能全面汇聚 FAQ、产品文档、技术手册、历史工单等高价值内容。
  2. 语义表达偏差:不合理的 分块策略(如按固定字数切割)会割裂上下文;**Embedding 模型 **选择不当则会无法精准捕捉文本的深层语义,导致向量表达失真。
  3. 结构策略粗糙:没有上下文信息、缺少结构化字段或文档元数据。

如何构建高召回率的知识库

提高数据覆盖率

任何检索的前提是知识库中相关信息。因此在构建 RAG 专属知识库时,需要聚焦以下能力:

  • 汇聚多渠道内容:FAQ、文档、部署手册、工单记录等
  • 支持多种接入方式:数据库、OSS/S3、Google Docs、语雀、本地文件等

提高语义嵌入质量

选择合适的 Embedding 模型,决定了 用户问题 能否成功匹配到 知识块

目前业界有许多优秀的 Embedding 模型,以下是一个简单对比:

模型名称适用语言优势局限部署方式
text-embedding-3(OpenAI)英文 / 多语言精度高,覆盖全面,是当前最强通用模型之一需联网,调用成本高API
text-embedding-v3(Alibaba)中文为主中文语义理解深,适合企业知识库模型大,本地部署门槛略高Ollama / API
Qwen-Embedding-7B中文为主中文结构化问答效果好,向量表达自然显存占用高,不适合轻量场景本地部署
BGE-M3中文轻量、开源,适合本地快速部署和测试英文能力较弱本地部署

分块策略合理

分块(Chunking) 是指将长文档切割成适合 RAG 检索的、更小的文本单元:

  • 若分块太小:上下文缺失,回答不准确。
  • 若分块太大:Embedding 过于抽象,无法命中具体问题。

在具体实践中,应考虑:

  • 按语义、标题、段落切块,避免语义断层。
  • 支持 Chunk Overlap,每块有一定重叠,如每 300 个 Token 滑动切,同时根据语义分段,召回命中率更高。

结构化向量库

传统向量检索仅依赖 Embedding 相似度,虽具备语义匹配能力,但仍存在明显短板:向量相似但语义不相关的内容易被误召回

结构化向量库在此基础上引入了丰富的元信息结构字段,进一步提升了召回的准确性。

  • 使用大模型提取 FAQ、摘要、标签、时间字段等,可有效补充语义缺失的上下文信息。
  • 将 Markdown、客服记录等非结构化内容转为统一格式,显著提升整体检索命中率。
  • 利用结构化 Schema 支撑后续精准检索、过滤、排序。

构建高质量知识库

在实际落地过程中,CloudCanal 提供了一套面向企业的自动化知识库构建能力,支持:

  • 多源文档采集(OSS、S3、SSH、Google Docs、语雀)
  • LLM 提取结构化字段,支持自定义 Prompt
  • 段落分块 + 重叠控制 + 元信息附带
  • 向量化写入 StarRocks 等向量库,支持 Qwen 等主流模型接入

下面将使用 StarRocks 作为目标向量库,展示如何在 CloudCanal 中快速构建高质量的知识库。

添加数据源

登录 CloudCanal 控制台,点击 「数据源管理」>「新增数据源」,添加以下数据源:

文件数据源(SshFile)

用于读取本地或远程服务器中的 Markdown 文档。添加步骤如下:

  • 类型选择:自建 > SshFile(同理可配置 S3、OSS 等)
  • 基础配置:填写服务器 IP、端口、用户名和密码。
    • 网络地址:localhost:22
    • 用户名:root****
    • 密码:***
  • 额外参数
    • fileSuffixArray:填写 **.md**,仅处理 Markdown 文件
    • enableLLMExtractiontrue,启用 LLM 提取 额外信息 的功能。
    • defaultLineSchemaJson:定义需要 LLM 提取的 结构化字段(第一行 line 表示原文)
[
{
"column": "line",
"jdbcType": 12,
"typeName": "TEXT"
},
{
"column": "doc_title",
"jdbcType": 12,
"typeName": "TEXT",
"desc": "从文本中提取出文档的明确标题"
},
{
"column": "summary",
"jdbcType": 12,
"typeName": "TEXT",
"desc": "为这段文本生成一个不超过100字的简明摘要"
},
{
"column": "keywords",
"jdbcType": 12,
"typeName": "TEXT",
"desc": "提取5个最能代表文本核心内容的关键词,以逗号分隔"
},
{
"column": "faq_pairs",
"jdbcType": 12,
"typeName": "TEXT",
"desc": "从文本中提取出潜在的问答对(FAQ),以[{'question':'...', 'answer':'...'}]的JSON数组格式返回"
}
]
  • dbsJson:用于指定要同步的文档目录,你可以将其中的 **schema 字段 **修改为你实际存放 Markdown 文件的根目录路径。
[
{
"db":"cc_virtual_fs",
"schemas":[
{
"schema":"/Users/barry/source/cloudcanal-doc-v2",
"tables":[]
}
]
}
]
  • db:虚拟文件库的逻辑名称,默认保持为 cc_virtual_fs 即可,无需修改。
  • schema:表示你希望读取的本地或远程目录路径,CloudCanal 会以该路径作为文档扫描入口。例如:/Users/barry/source/cloudcanal-doc-v2。该字段必须填写为绝对路径。
  • tables:用于指定目录中要处理的具体文件名,若设置为空数组([]),则表示自动抓取该目录下所有符合后缀规则(如 .md)的文件,无需逐一列出文件名。

向量数据库(StarRocks)

用于存储通过大模型编码后的文档向量,是整个 RAG 检索流程的核心数据源。

  • 类型选择:自建 > StarRocks
  • 准备工作:
    • 部署 StarRocks:参考文档
    • 版本要求:3.4 及以上
    • 打开 Vector Index
ADMIN SET FRONTEND CONFIG ("enable_experimental_vector" = "true");
  • 配置说明:
    • 网络地址:localhost:9030
    • 用户名:root** **
  • 额外参数
    • privateHttpHost:填写 localhost:8030。用于 Stream Load 写入,可填写 FE 或 BE 的地址。

大模型平台(Ollama)

CloudCanal 支持通过 Ollama 提供完整的向量生成与推理能力,适用于完全私有化的 RAG 场景。

  • 类型选择:自建 > Ollama
  • 配置说明:
    • 部署 Ollama:参考文档
    • 网络地址:localhost:11434
    • **额外参数 **
      • llmEmbedding:嵌入大模型配置
{
"qwen3:8b": {
"dimension": 4096
}
}
  • **llmChat:**对话大模型配置
{
"deepseek-r1": {
"temperature": 0.9,
"topP": 0.9,
"showReasoning": false
}
}

创建任务:构建知识库

  1. 点击 同步任务 > 创建任务
  2. 选择以下数据源,并点击 测试连接 确认网络与权限正常。
    • 源端:SshFile
    • 目标端:StarRocks

  1. 功能配置 页面,任务类型选择 全量迁移,任务规格选择默认 2 GB 即可。

  2. 表&action过滤 页面,进行以下配置:

    1. 点击 配置大模型 > Ollama,选择刚添加的大模型实例

      1. 嵌入模型选择 qwen3:8b(用于数据嵌入)
      2. 对话模型选择 deepseek:r1(用于结构化信息提取)
    2. 选择需要定时数据迁移的文件,可同时选择多个。

    3. 点击 批量修改目标名称 > 统一表名 > 填写表名(如 my_knowledge),并确认,方便将不同文件向量化并写入同一个表。

  3. 数据处理 页面,点击 批量操作 > 大模型嵌入

    1. 设置数据分隔长度 1000,分隔重叠 100。
    2. 选择需要嵌入的字段,并全选表。

  1. 创建确认 页面,点击 创建任务,开始运行。任务会自动根据源端定义的格式,自动在 StarRocks 中创建向量表,并把源端文件处理分块后,嵌入,最终导入到 StarRocks

  1. 到 StarRocks 中查询知识库数据。
select 
doc_title, -- 标题
summary, -- 总结
keywords, -- 关键词
faq_pairs, -- 问答
__content -- 原始内容
from my_knowledge limit 3 \G

至此,我们就构建好了一个结构化、丰富且高质量内容的知识库,后续可在 CloudCanal 中继续创建任务,构建 RAG API,直接与你的知识库对话。

总结

召回率决定了 RAG 系统的检索表现,也影响最终生成结果的准确性,只有策略得当、工具可靠,才能构建真正智能的问答能力。

通过 CloudCanal RAG,可在 更短周期内,以 更简单的方式 实现更高检索质量的知识库,为企业打造实用、可信的 AI 应用打下坚实基础。

标签:

CloudCanal RAG x Ollama 构建企业级 RAG 服务

· 阅读需 11 分钟
Barry
Chief Technology Officer

在企业级应用中,RAG(Retrieval-Augmented Generation)技术正在逐步从探索走向落地。与面向个人使用者的轻量级问答系统不同,企业对 RAG 的要求更高:它必须可靠、可控、可扩展,最重要的是——安全。许多企业对于数据上传至在线大模型或公有云向量数据库持谨慎甚至禁止态度,因为这可能导致敏感信息暴露给外部服务。

CloudCanal 近期已支持通过 Ollama 本地部署 RAG 服务,有效解决了传统方案中存在的数据安全隐患。本文将聚焦于企业级 RAG 服务,介绍如何通过本地部署实现一个不依赖公网的企业级 RAG 构建方案

什么是企业级 RAG 服务?

企业级 RAG 服务强调端到端的集成能力、安全可控以及对业务系统的适配性。它不仅能实现基于知识的智能问答,还能提供高可靠、高安全、高扩展的服务。在保护企业数据的前提下,真正服务于业务流程的自动化与智能化。

相比于面向个人或实验场景的轻量 RAG 项目,企业级 RAG 服务具有以下核心特征:

  • 技术栈全私有,部署可控
    服务能够完全运行在本地数据中心或私有云环境中,避免数据外泄风险,满足对数据合规性要求较高的行业需求。
  • 数据来源多样,格式丰富
    不局限于文本文件类型,还支持数据库等多种数据来源,实现真正的“全域知识接入”。
  • 支持增量数据同步,确保时效性与一致性
    在企业级场景中,知识信息更新频繁,服务需支持高效的增量同步能力,确保 RAG 索引内容始终与业务系统保持同步。
  • 可与多种工具链协同,完成复杂任务链路(类MCP能力)
    企业级 RAG 服务不仅要检索和生成,还要与函数调用(Function Calling)、工具执行(如 SQL 查询)等能力打通,构建更完整的智能任务执行流程。

那么,如何在不依赖任何在线服务的前提下,安全地构建一个完全私有的企业级 RAG 服务呢

CloudCanal RAG

CloudCanal 推出的 RagApi 封装了向量检索与模型问答能力,并兼容 MCP 协议,让用户能快速上手搭建属于自己的 RAG 服务。

相比传统 RAG 架构手动部署流程,CloudCanal 提供的 RAG 服务具有以下独特优势:

  • 双任务完成全流程:文档导入 + API 发布
  • 零代码部署:无需开发,自定义配置即可构建 API 服务。
  • 参数可调:支持设置向量 Top-K 数量、匹配阈值、Prompt 模板、模型温度等核心参数。
  • 多模型与平台适配:支持阿里云 DashScope、OpenAI、DeepSeek 等主流模型与 API 平台。
  • OpenAI API 兼容接口:直接接入现有 Chat 应用或工具链,无需额外适配。

实例演示

下面将使用以下组件,展示如何快速搭建一个完全离线、数据可控、安全可靠的企业级 RAG 服务。

  • Ollama:用于本地运行大语言模型
  • PostgreSQL 向量数据库:作为本地向量检索存储引擎
  • CloudCanal RagApi:用于构建本地化的 RAG 问答服务

整体工作流程如下:

前置部署

部署本地 Ollama

Ollama 可以让你在本地轻松部署和管理大语言模型。在本方案中,Ollama 不仅用于文档向量化阶段生成嵌入向量 (Embeddings),也作为对话模型用于最终的问答交互。CloudCanal 支持通过 Ollama 提供完整的向量生成与推理能力,完美契合纯私有化 RAG 场景。

  1. 下载与安装 Ollama

访问 Ollama 官方网站下载对应操作系统的安装包:https://ollama.com/download

  1. 拉取并运行模型:

安装完成后,打开终端,执行以下命令来拉取并运行一个适合嵌入和推理的模型,例如 qwen3:8b (也可以根据硬件情况选择其他模型,如 qwen3:14b 等。请注意,部分大型模型对硬件资源要求较高):

ollama run qwen3:8b

模型下载并首次运行后,可以按 Ctrl + D 退出当前模型的交互模式。

部署本地 PostgreSQL 向量数据库

  1. 安装 Docker(如已安装可跳过)

不同操作系统可参考以下步骤进行安装:

## centos / rhel
sudo yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

sudo yum install -y docker-ce-20.10.9-3.* docker-ce-cli-20.10.9-3.*

sudo systemctl start docker

sudo echo '{"exec-opts": ["native.cgroupdriver=systemd"]}' > /etc/docker/daemon.json

sudo systemctl restart docker
  • Ubuntu:参考下面脚本
## ubuntu
curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gpg | sudo apt-key add -

sudo add-apt-repository "deb [arch=amd64] https://mirrors.aliyun.com/docker-ce/linux/ubuntu $(lsb_release -cs) stable"

sudo apt-get update

sudo apt-get -y install docker-ce=5:20.10.24~3-0~ubuntu-* docker-ce-cli=5:20.10.24~3-0~ubuntu-*

sudo systemctl start docker

sudo echo '{"exec-opts": ["native.cgroupdriver=systemd"]}' > /etc/docker/daemon.json

sudo systemctl restart docker
  1. 启动 PostgreSQL + pgvector 容器服务

打开终端,执行以下命令,一键完成 PostgreSQL 环境部署:

cat <<'EOF' > init_pgvector.sh
#!/bin/bash

# 创建 docker-compose.yml 文件
cat <<YML > docker-compose.yml
version: "3"
services:
db:
container_name: pgvector-db
hostname: 127.0.0.1
image: pgvector/pgvector:pg16
ports:
- 5432:5432
restart: always
environment:
- POSTGRES_DB=api
- POSTGRES_USER=root
- POSTGRES_PASSWORD=123456
YML

# 启动容器服务(后台运行)
docker-compose up --build -d

# 等待容器启动后,进入数据库并启用向量扩展
echo "等待容器启动中..."
sleep 5

docker exec -it pgvector-db psql -U root -d api -c "CREATE EXTENSION IF NOT EXISTS vector;"
EOF

# 赋予执行权限并运行脚本
chmod +x init_pgvector.sh
./init_pgvector.sh

执行完毕后,本地 PostgreSQL 将自动启用 pgvector 插件,作为私有化向量检索引擎,为 CloudCanal RAG 提供底层存储支持。

部署 CloudCanal 私有化版本

下载并解压 CloudCanal 私有部署版本,完成全新安装

RAG 服务构建

添加数据源

登录 CloudCanal 控制台,点击 「数据源管理」>「新增数据源」,添加以下数据源:

文件数据源(SshFile)

用于读取本地或远程服务器中的 Markdown 文档。添加步骤如下:

  • 类型选择:自建 > SshFile
  • 配置说明:
    • 网络地址:填写目标服务器的 IP 地址(本机如** 127.0.0.1**)和 SSH 端口(默认端口为 22)。
    • 账号密码:填写用于 SSH 登录该服务器的用户名和密码。

如果你在 Mac 或 Linux 上操作,可以直接填写当前用户名和本地密码即可。

  • 额外参数 fileSuffixArray:填写 .md,用于筛选 Markdown 文件,避免无关文件被加载。
  • 额外参数 dbsJson:用于指定要同步的文档目录。你可以复制系统提供的默认值,并将其中的 schema 字段修改为你实际存放 Markdown 文件的根目录路径。
[
{
"db":"cc_virtual_fs",
"schemas":[
{
"schema":"/Users/barry/source/cloudcanal-doc-v2",
"tables":[]
}
]
}
]
  • db:虚拟文件库的逻辑名称,默认保持为 cc_virtual_fs 即可,无需修改。
  • schema:表示你希望读取的本地或远程目录路径,CloudCanal 会以该路径作为文档扫描入口。例如:/Users/barry/source/cloudcanal-doc-v2。该字段必须填写为绝对路径。
  • tables:用于指定目录中要处理的具体文件名,若设置为空数组([]),则表示自动抓取该目录下所有符合后缀规则(如 .md)的文件,无需逐一列出文件名。

向量数据库(PostgreSQL)

用于存储通过大模型编码后的文档向量,是整个 RAG 检索流程的核心数据源。

  • 类型选择:自建 > PostgreSQL
  • 配置说明:
    • 网络地址:localhost:5432
    • 用户名:root** **
    • 密码:123456

大模型平台(Ollama)

CloudCanal 支持通过 Ollama 提供完整的向量生成与推理能力,适用于完全私有化的 RAG 场景。

  • 类型选择:自建 > Ollama
  • 配置说明:
    • 网络地址:localhost:11434
    • 额外参数 llmEmbedding:
{
"qwen3:8b": {
"dimension": 4096
}
}
  • 额外参数 llmChat:
{
"qwen3:8b": {
"temperature": 1,
"topP": 0.9,
"showReasoning": false
}
}

RagApi 服务(CloudCanal)

RagApi 是最终面向用户提供问答接口的服务模块,用于对接 Chat 界面或上层应用。

  • 类型选择:自建 > RagApi
  • 配置说明:
    • 网络地址localhost:18089
    • API 密钥:自定义一个字符串(例如 my-cc-rag-key),用于后续调用 RagApi 接口时进行身份验证。

创建任务 1:数据向量化

  1. 点击 同步任务 > 创建任务
  2. 选择以下数据源,并点击 测试连接 确认网络与权限正常。
    • 源端:SshFile
    • 目标端:PostgreSQL

  1. 功能配置 页面,任务类型选择 全量迁移,任务规格选择默认 2 GB 即可。

  2. 表&action过滤 页面,进行以下配置:

    1. 选择需要定时数据迁移的文件,可同时选择多个。
    2. 点击 批量修改目标名称 > 统一表名 > 填写表名(如 knowledge_base),并确认,方便将不同文件向量化并写入同一个表。
  3. 数据处理 页面,进行以下配置:

    1. 点击 配置大模型 > Ollama,选择刚添加的大模型实例,并选择 qwen3:8b
    2. 点击 批量操作 > 大模型嵌入,选择需要嵌入的字段,并全选表。
  4. 创建确认 页面,点击 创建任务,开始运行。

创建任务 2:RagApi 服务

  1. 点击 同步任务 > 创建任务
  2. 选择以下数据源,并点击 测试连接 确认网络与权限正常。
    • 源端:已配置的 PostgreSQL(向量表所在库)
    • 目标端:RagApi

  1. 功能配置 页面,任务类型选择 全量迁移,任务规格选择默认 2 GB 即可。
  2. 表&action过滤 页面,选择要使用的向量表(可多选)。

  1. 数据处理 页面,配置大模型
    1. 嵌入模型:选择 Ollama。

      注意: PostgreSQL 中的向量维度需与选定嵌入模型一致。

    2. 聊天模型:选择 Ollama。

  1. 创建确认 页面,点击 创建任务,系统将自动完成 RagApi 服务构建。

  1. 构建完成后,可使用以下命令进行简单测试:
curl http://localhost:18089/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer my-cc-rag-key" \
-d '{
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
],
"stream": false
}'

效果测试

RagApi 支持通过可视化工具 CherryStudio 进行交互测试。CherryStudio 兼容 OpenAI 接口标准,适合用于接口联调、上下文调试和模型效果验证。

  1. 打开 CherryStudio,点击左下角 设置图标
  2. 模型服务 中搜索 OpenAI,并配置如下参数:
    • API 密钥:填写在 CloudCanal 中 RagApi 设置的密钥:my-cc-rag-key
    • API 地址http://localhost:18089

  • 模型名称:CC_RAG

  1. 回到对话页面:
    • 点击 添加助手 > Default Assistant。
    • 右键点击 Default Assistant > 编辑助手 > 模型设置,绑定上一步添加的模型。

  1. 在对话框输入:MySQL 源端权限需要什么?,RagApi 将根据向量数据检索相关内容,并通过对话模型生成响应。

总结

企业级 RAG 服务对数据安全有极高的要求。通过 CloudCanal 与 Ollama 的组合,可以轻松实现全私有部署 RAG 服务,打造一个真正不依赖公网、稳定可靠的企业级 RAG 解决方案。

标签:

CloudCanal + Apache Paimon + StarRocks 实时构建湖仓一体架构

· 阅读需 8 分钟
Barry
Chief Technology Officer

在实时数仓的浪潮下,企业越来越重视如何以低延迟、高一致性的方式将数据写入数据湖,并结合下游分析引擎完成统一分析。Apache Paimon 作为新一代流批一体的数据湖存储引擎,因其高效写入、实时更新等能力,成为构建实时湖仓架构的理想选型。

本文将介绍如何基于 Paimon、CloudCanal、StarRocks 快速构建一套真正实时、灵活、高可维护的数据湖仓架构。

Apache Paimon 介绍

Apache Paimon 是一个开源的、面向流计算的湖仓存储格式,源于 Apache Flink 社区(原 Flink Table Store)。它的核心创新在于将 湖存储格式日志结构合并树(LSM-tree) 技术融合,为数据湖带来了强大的实时流式更新能力,这使得高吞吐、低延迟的数据摄取、变更日志追踪和高效的即时分析成为可能。

核心能力:

  • 流批一体处理:支持 Streaming Write / Snapshot Read
  • 主键支持:高效 UPSERT / DELETE
  • Schema 演进:新增、删除、修改列,无需重写旧数据
  • ACID 事务保障:支持并发读写一致性
  • 生态兼容:支持 StarRocks、Flink、Spark 等引擎
  • 对象存储兼容:支持 S3OSS 等文件系统

举个例子:数据湖中的实时订单状态变更

以大型电商平台的实时销售大屏为例,业务团队希望能够每秒更新订单状态,例如订单从“已支付”变为“已发货”,并立即体现在可视化大屏中。

  • 传统方案的挑战(Merge-on-Read)

在传统的数据湖架构中,变更数据通常会先追加写入日志区,再依赖定期的 Merge-on-Read 批处理任务将新旧数据合并成最新快照。

  • 实时写入采用 追加方式,变更记录不会立即反映在查询结果中,需等待合并操作完成后才可见。
  • 读旧、写新、重写文件的过程通常以分钟为单位,难以满足秒级迟需求。
  • Paimon 的解决方式(LSM-tree)

Paimon 从架构层面对这一痛点进行了优化,引入类似数据库的主键更新能力。

  • 订单在交易库(如 MySQL)中状态变更。
  • 变更事件(如 UPDATE orders SET status = '已发货' WHERE order_id = '123')实时写入 Paimon。
  • Paimon 基于 LSM-tree,更新后读取,通常可在 秒级 内完成。
  • 实际效果

下游如 StarRocks 等分析引擎可以秒级查询到更新后的状态,从而确保实时大屏始终反映业务的最新动态。

Paimon vs. Iceberg:关键特性对比

Apache Paimon 与 Apache Iceberg 是当前数据湖领域的两大主流方案,分别在实时处理和批量分析方向各有侧重,体现了两种不同的技术演进路径。

Paimon 设计之初就面向流式数据处理,底层采用 LSM-tree 架构,原生支持主键和高效变更,特别适合 高频更新CDC等行级数据实时入湖场景。

Iceberg 以快照机制为核心,强调 强一致性,同时也逐步增强对流式写入与变更处理的支持,具备实现近实时入湖的能力。

特性Apache PaimonApache Iceberg
核心更新机制LSM-treeCopy-on-Write / Merge-on-Read
主键与更新模型支持 Primary Key,流式 UPSERT 写入更高效通过 Merge-on-Read 机制支持 UPSERT
流式写入支持支持
数据更新延迟毫秒到秒级,更适合实时性要求极高的场景通常为分钟级,更侧重于批处理和微批处理
生态成熟度较新,在 Flink 社区中发展迅速更成熟,拥有更广泛的引擎和工具支持
适用场景实时数仓、CDC 数据实时入湖、流批一体分析通用数据湖、大规模批处理、数据仓库

总体来看,Paimon 在实时更新、变更频繁的场景中具备更直接的能力优势,而 Iceberg 则在批量处理、版本治理和生态成熟度方面表现更稳健

构建湖仓一体架构

尽管 Paimon 可以通过 Flink CDC 写入数据,但管理作业状态、处理容错和检查点等工作,对多数团队而言仍有不小的技术门槛和运维成本。CloudCanal 提供了一种更轻量、更自动化的方式实现数据入湖。

  • 外部数据源:企业的核心交易库(如 MySQL, PostgreSQL)、日志数据(Kafka)等。
  • CloudCanal
    • 基于日志实时捕获数据的变更,流式批量写入,实现秒级延迟。
    • 支持 **结构自动迁移 **和 DDL 同步。
    • 支持数据迁移同步、数据校验与订正、断点续传、任务监控、告警。
  • Apache Paimon
    • 作为湖仓底座,承接 CloudCanal 写入的实时数据流。
    • 采用 LSM-tree 架构自动去重合并,管理数据分区与后台 Compaction
    • 基于 **S3、OSS **等其他对象存储,构建存算分离架构。
  • StarRocks:直接进行实时查询分析,无需额外的数据导入或转换。

实操演示

接下来将以 MySQL -> Paimon 链路为例,展示如何快速将数据实时同步至 Paimon,并使用 StarRocks 外表实时查询入湖数据。

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

  1. 登录 CloudCanal 平台,点击 数据源管理 > 添加数据源,分别添加 MySQL 和 Paimon 数据源。

  1. 添加 Paimon 数据源时,需配置额外参数,具体可参考:Paimon 数据源配置

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务
  2. 选择源和目标实例,并分别点击 测试连接

  1. 功能配置 页面,选择 增量同步,并勾选 全量初始化

  1. 表和操作过滤 页面,选择需要迁移同步的表,可同时选择多张。

  1. 数据处理 页面,保持默认配置。

6. 在 创建确认 页面,点击 创建任务,开始运行。

任务启动后,CloudCanal 会自动完成全量数据的初始化,并实时捕获增量变更写入 Paimon。

步骤 4: 使用 StarRocks 查询数据

Paimon 中的数据最终需要被下游分析引擎查询,StarRocks 原生支持 Paimon Catalog,可通过外部 Catalog 的方式直接查询 Paimon 中的实时数据,无需额外导入或数据转换。

  1. 在 StarRocks 中创建 Paimon Catalog

在 StarRocks 中执行 CREATE EXTERNAL CATALOG 语句,即可将整个 Paimon 仓库映射进 StarRocks:

CREATE EXTERNAL CATALOG paimon_catalog
PROPERTIES
(
"type" = "paimon",
"paimon.catalog.type" = "filesystem",
"paimon.catalog.warehouse" = "<s3_paimon_warehouse_path>",
"aws.s3.use_instance_profile" = "true",
"aws.s3.endpoint" = "<s3_endpoint>"
);

具体配置可参考:https://docs.starrocks.io/docs/data_source/catalog/paimon_catalog/#examples

  1. 实时查询数据

Catalog 创建成功后,可以像 操作本地表 一样查询 Paimon 中的数据:

-- 查看 Catalog 中有哪些数据库
SHOW DATABASES FROM paimon_catalog;

-- 切换 Catalog
SET CATALOG paimon_catalog;

-- 切换至指定数据库
USE your_database;

-- 查询表数据
SELECT COUNT(*) FROM your_table LIMIT 10;

当 MySQL 中的数据发生变化后,CloudCanal 会将变更实时同步至 Paimon,StarRocks 查询到的数据也将同步更新,从而实现真正的端到端 秒级实时分析能力,无需构建复杂的 ETL 流程。

总结

Apache Paimon 以其创新的 实时更新 能力,为现代数据湖仓架构注入了新的活力,而 CloudCanal + Paimon 的组合,则通过零代码、自动化的方式,快速落地高性能的实时湖仓,最后与 StarRocks 等下游引擎配合,打通了从数据同步到实时查询的完整闭环。

深入浅出数据库宽表

· 阅读需 9 分钟
Barry
Chief Technology Officer

在企业级数据场景中,一个报表查询往往需要 3 张以上表的 JOIN,这类查询在数据量较大的场景下,需要数分钟甚至个把小时才能返回。

本文将简要探讨宽表技术的来龙去脉,以及它如何帮助解决多表关联带来的性能瓶颈,并结合 CloudCanal 最新推出的可视化宽表构建功能,无痛实现跨表数据的实时整合。

数据分析困境

在结构化数据系统中,随着业务模型的复杂化,数据表之间的关联不断增多。以电商系统为例,订单、商品、用户等表结构天然具有关联性:

  • 订单数据:商品 ID (关联 商品数据)、数量、总价、优惠信息、买家 ID (关联 用户数据)等
  • 商品数据:名称、颜色、质地、库存、商家 ID (关联 用户数据)等
  • 用户数据:账号、密码、昵称、手机、邮箱等

关系型数据库通过建立关系范式实现了存储效率最大化(冗余信息少),并优化了事务型操作的性能。但一旦进入数据分析阶段,这种表结构会给查询带来极大挑战。

在进行批量聚合、筛选等复杂分析时,如“计算过去 1 个月商品销售额 Top 10”,往往需要多表 JOIN 操作。而随着关联表数量增加,可能的执行计划(搜索空间)也呈几何级增长:

关联表数量排列种类
22
424
6720
840320
103628800

对于一个 ERP 或 CRM 系统而言,5 张表以上的关联是常态。面对如此庞大的执行计划可能性,如何高效地找到最优解 成为数据库分析查询的核心难题。

面对这一挑战,行业演进出了两种技术路线:查询优化预计算。而 打宽表 是预计算的核心能力之一。

查询优化 vs 预计算

查询优化

面对这么多种可能性,解决方式之一就是通过减少可能性加快搜索,这就是数据库优化器术语——剪枝。其中衍生出两种技术:基于规则的优化器(RBO: Rule-Based Optimizer)基于代价的优化器(CBO: Cost-Based Optimizer)

  • 基于规则的优化器(RBO):RBO不考虑数据实际分布,而是依据预设的静态规则对 SQL 执行计划进行调整。市面上的数据库产品,都有一些通用的优化规则,如谓词下推等。不同数据库产品基于其业务属性和架构特点,还拥有独特的优化规则,如 SAP Hana 承载 SAP ERP 业务,针对全内存和大量 JOIN 的场景,其优化器规则便与其他数据库存在显著差异。

  • 基于代价的优化器(CBO):相较于 RBO,CBO 通过评估不同执行计划的 I/O、CPU 等资源消耗,选择代价最低的计划。这种优化方式会通过数据的具体分布与查询 SQL 的特点来动态调整,即使两条一模一样的 SQL,因为设置的参数值不一样,可能最后的执行计划大相径庭。CBO 一般具备一个精密复杂的统计信息子系统,包括各个表的数据量、基于主键或者关联维度的数据分布直方图等信息。

现代数据库内核一般会联合使用 RBO 和 CBO 对 SQL 进行优化。

预计算

预计算认为 关联表之间的数据关系是确定的,所以在数据生成时就进行聚合与关联,形成一张“宽”的表结构,从而避免后续每次查询都重新进行关联。数据发生变化时,仅需增量更新对应的宽表数据。它的早期实现可以追溯到关系型数据库的 物化视图

从计算量消耗角度而言,预计算大幅优于即时查询优化。然而,其局限性亦不容忽视:

  • JOIN 语义实现受限:预计算对于非 Left Join 语义的实现逻辑复杂且代价巨大,通常难以在流式处理中高效支持。
  • 大量数据更新影响稳定性:在 1 对 N 的数据关系中,若“1”端数据发生变化,可能导致宽表数据大规模更新,对服务稳定性构成挑战。
  • 功能与性能权衡:预计算通常无法像即时查询那样支持各种聚合、过滤条件及函数,部分是功能实现难度,部分是性能代价考量。

最佳实践

在真实场景中,业界普遍倾向于 “双管齐下” 的策略:预计算生成中间数据,而即时查询则在此基础上完成最终的聚合与过滤,实现功能与性能的平衡。

  • 预计算:当前最流行的方案即 流计算,近些年出现的流式数据库也在尝试更好地解决这类问题,传统关系型数据库或者数据仓库实现的物化视图也是一种很好的解决方案。

  • 即时查询:随着 列式行列混合 数据结构普及,AVX 512 等新指令集落地,FPGA、GPU 等专用或高性能计算硬件尝试,以及 知识网格分布式计算 等软件手段应用,实时分析数据库在数据筛选和聚合性能上获得了显著提升。

两者结合使用,为在线数据分析场景带来了巨大的助益。

CloudCanal 的宽表历程

CloudCanal 在宽表构建方面的演进,实现了从“高代码门槛”到“零代码构建”的转变。早期版本中通过自定义代码支持用户在数据迁移同步过程中,自行查询关联表数据并构建宽表,再回写到目标端。尽管功能可行,但用户需要写代码,难度太高,因而未形成大规模应用。

为此,CloudCanal 实现了 可视化宽表构建。用户仅需在界面中选择需要打宽表的表,并选择关联的列,即可构建出一个既支持历史数据迁移,又支持实时更新的宽表任务。

目前,该功能已支持链路包括:

  • MySQL -> MySQL/StarRocks/Doris/SelectDB
  • PostgreSQL/SQL Server/Oracle/MySQL -> MySQL
  • PostgreSQL -> StarRocks/Doris/SelectDB

未来将持续开放更多链路。

CloudCanal 可视化构建宽表

宽表定义

CloudCanal 宽表构建中的核心角色包括:

  • 驱动表:仅可选择一张,作为宽表的基准数据源。
  • 关联表:可选择多张,提供驱动表需要补充的字段信息。

CloudCanal 默认实现 Left Join 语义,确保返回驱动表所有数据,即使关联表中无匹配数据。

目前,CloudCanal 支持 2 种关联拓扑结构:

  • 线形 关系:A.b_id = B.id and B.c_id = C.id and ...,其中所有表均不可重复选择,且不支持形成环状关联。
  • 星形 关系:A.b_id = B.id and A.c_id = C.id and ...,不支持形成环状关联。

其中 A 表定义为驱动表,B、C 表定义为关联表。

数据变更处理策略

对端为关系型数据库(如 MySQL)

  • 驱动表 INSERT:自动补全关联表字段。
  • 驱动表 UPDATE/DELETE:不做关联表字段关联。
  • 关联表 INSERT:如有下一层关联表,则做关联表字段关联,INSERT 操作转换为 UPDATE。
  • 关联表 UPDATE:如有下一层关联表,不做关联表字段关联。
  • 关联表 DELETE:如有下一层关联表,则做关联表字段关联,DELETE 操作转换为全字段为 NULL 的 UPDATE 操作。

对端为整行覆盖类数据库(如 StarRocks/Doris)

  • 驱动表 INSERT/UPDATE/DELETE:自动补全关联表字段。

  • 关联表 INSERT/UPDATE/DELETE:自动忽略。

    信息

    若需实现到覆盖类数据库 关联表变更,可以分阶段完成。

    首先创建 源库 > 关系型数据库宽表 迁移同步任务,再创建 关系型数据库宽表 > 覆盖类数据库 迁移同步任务。

操作示例

  1. 进入 CloudCanal 控制台,点击 同步任务 > 创建任务
  2. 进入 选择表 步骤,选择需要打宽表的表,对端均选择宽表进行映射。
  3. 进入 数据处理 步骤。
    1. 页面左侧选择 驱动表,并点击 操作 > 宽表,设定宽表定义。

      • 关联列 选择和设置,支持多个。

      • 关联表 其他列 选择,以及宽表映射字段填写,此操作主要防止多张来源表字段名在目标宽表中重复。

        信息

        1. 关联表如果级联关联其他表,务必选择其对应关联列。如 A.b_id = B.id and B.c_id = C.id,则选择 B 的其他列时,必须选择 c_id。

        2. 在复杂关联关系中,不同驱动表或关联表存在相同列名,故在选择其他列时,映射到不同列以规避冲突

    2. 点击 确定 完成设置。

    3. 页面左侧选择 关联表,确认列和宽表列映射正确。

  4. 继续完成任务创建流程,即可启动宽表同步任务。

总结

宽表作为一种核心的数据预计算策略,有效解决了关系数据库范式化设计在 OLAP 场景下多表关联查询的性能瓶颈。CloudCanal 提供的可视化宽表构建能力,不仅大幅简化了构建流程,而且显著降低了技术门槛。它为构建具备实时性与灵活性的数据分析平台提供了关键工具,助力数据架构师和 DBA 高效构建实时数据分析层。

CloudCanal RAG 实战|如何让问答机器人“智能”起来

· 阅读需 8 分钟
Barry
Chief Technology Officer

在之前的文章中,我们介绍了如何使用 CloudCanal 和 Ollama 搭建全栈私有的 RAG 问答服务,为企业级 RAG 应用提供了部署简单、安全可靠的解决方案。

最近,我们用这套方案,在 CloudCanal 官网上线了基于 知识库 的智能问答机器人。在发布前的测试过程中,却发现它似乎没有想象中那么“智能”,出现了答非所问、检索失焦、上下文理解有误等问题

今天,我们将从 CloudCanal 官网问答机器人工程实践出发,深入分析传统 RAG 的瓶颈,并介绍 CloudCanal RAG 的针对性优化策略,展示如何构建一个真实可用的智能问答机器人。

传统 RAG 模式的问题

传统 RAG 流程简化如下:

  1. 原始文档 → 切分 → 向量化 → 存入向量数据库
  2. 用户提问 → 转换为查询向量 → 相似度检索 → 拼接上下文
  3. 构造 Prompt → 输入大模型 → 大模型推理并回答

这一流程看似十分合理,但实际操作时却会发现各种各样的问题:

数据处理粗糙,信息密度低

  • 问题表现:直接对原始文档分块向量化,缺乏摘要、关键词、标签等关键元信息,导致向量无法精准表达文本核心内容。
  • 典型场景:一篇关于各数据库版本支持情况的文档,若无摘要或关键词提炼,模型在检索时很难精准匹配到“版本支持”这一核心概念,导致召回失败。

知识库未分类,检索范围模糊

  • 问题表现:将 FAQ、操作指南、更新日志等不同类型的文档混合存储在同一向量空间,未进行有效分类,容易导致检索范围定位错误。
  • 典型场景:当用户提问“社区版支持哪些数据源?”时,系统可能因无法区分“产品功能”与“操作步骤”,错误地返回一篇关于“如何配置数据源连接”的指南,导致答非所问。

对话历史理解脱节,无法有效追溯

  • 问题表现:简单拼接历史对话作为上下文,缺乏结构化标注,导致模型难以理解指代关系和焦点变化。
  • 典型场景:用户先问“CloudCanal 免费吗?”,再问“Hana 呢?”。若缺乏上下文关联,模型可能将第二问理解为“什么是 Hana 数据库”,而不是“CloudCanal 对 Hana 数据源的支持在社区版是否免费”,无法准确理解用户问题的含义。

用户意图识别不清,向量检索偏离目标

  • 问题表现:用户提问往往简洁甚至模糊,如果模型不能主动推断其真实意图,就会导致向量检索偏离甚至完全失效。
  • 典型场景:对于一个孤立的问题“Hana 免费吗?”,若系统未能结合潜在上下文(如用户正在浏览 CloudCanal 文档)推断出“Hana”是指“CloudCanal 的数据源”,“免费”是指“社区版功能”,那么检索结果很可能跑偏,返回无关的 Hana 数据库介绍。

CloudCanal RAG:从“检索驱动”走向“理解驱动”

传统 RAG 模型的问题在于 检索驱动理解缺失,而 CloudCanal RAG 对以上问题做了针对性的优化。

  1. 查询重构:将用户的模糊提问补全为精准问题;
  2. 查询扩展:生成多种等价问法,提升向量召回率;
  3. 查询路由:定位到最相关的知识模块,避免全库检索;
  4. 智能检索:并行将多种查询送入向量库,智能判断最优查询方式后,结合 **分表召回 **与 相关性重排(Rerank),过滤掉弱相关或无关内容;
  5. 结构化提示:用“权威事实+任务指令”框定模型输出,杜绝幻觉。

这些能力均内置于 CloudCanal,用户只需根据自身业务场景,稍作提示词调整,即可快速实现高精度、可落地的智能问答服务。

查询案例

下面通过一个真实的模糊查询案例来说明:

用户先问:CloudCanal 免费吗?

紧接着又问:Hana 支持吗?

在传统 RAG 中,第二问往往被理解为“什么是 Hana?”,没有关联到对话历史中的“免费”,最终返回 Hana 数据库的相关介绍,而非“CloudCanal 社区版是否支持 SAP Hana 同步”。

在 CloudCanal RAG 中,整个执行流程如下:

第一步:查询重构

CloudCanal RAG 首先进行上下文分析,将模糊问题补全并精准化。

  • 对话历史分析:检查到用户之前的对话提及了 社区版
  • 实体关联:Hana 识别为 SAP Hana 数据源,关联到核心内容 数据同步社区版
  • 重构结果:Hana 支持吗?=> CloudCanal 的社区版是否支持免费同步 SAP Hana 数据源?

这一步彻底解决了传统 RAG 中 用户提问模糊、对话历史关联弱 的问题,为后续操作提供了清晰而精准的输入。

第二步:查询扩展

为了避免单一查询方式在向量检索中的失败,CloudCanal RAG 会将重构后的问题扩展为多个语义等价的问法,提升召回率。

  • 社区版 CloudCanal 能连接 SAP Hana 吗?
  • 使用 CloudCanal 社区版可以免费同步 Hana 数据吗?
  • CloudCanal 社区版本包含对 SAP Hana 数据源的支持吗?

这一步有效应对了 数据处理粗糙 的问题,即使原始文档的措辞与用户提问不完全一致,也能精准命中相关知识。

第三步:查询路由

在进入具体向量检索前,CloudCanal RAG 会先判断当前问题应当查阅哪些类别的知识文档,以实现精确查询。

  • 结合问题关键词和意图,判断是属于 FAQ、产品功能说明、价格策略、版本日志还是安装手册。
  • 针对问题 “CloudCanal 社区版支持 SAP Hana 吗?”,系统判断属于“产品功能 + 产品定价”模块,只在相关知识表中查询。

这一步解决了 检索范围模糊 的问题,避免在全局知识库中进行大海捞针式的检索,提升了检索效率和准确性,防止不相关的知识干扰结果

第四步:智能检索

将多个查询并行送入向量数据库,并对返回的结果进行智能筛选。

  • 分表检索:从“产品功能”和“产品定价”知识表中召回初步匹配的片段。例如:
    • 召回片段 1:...当前社区版默认支持 MySQL、PostgreSQL...
    • 召回片段 2:...SAP Hana 数据源仅在商业版中开放...
    • 召回片段 n:...
  • 相关性重排 (Rerank):对所有召回片段进行细致评估,判断其与用户核心意图的关联度,剔除不相关或弱相关的部分。这一步至关重要,它能有效过滤掉噪声信息,确保最终用于生成答案的上下文是高度准确且相关的,避免大模型被无关信息误导。

这一环节确保了最终用于生成答案的上下文是高度准确且相关的。

第五步:结构化提示词生成

最后,构建一个高度结构化的提示词(用户可自定义),让大模型更好地推理。下面为简单的例子(实际会更复杂):

## 角色设定
“你是 CloudCanal 的官方技术支持助手。”

## 权威材料 (事实):
- “SAP Hana 数据源同步功能仅在 CloudCanal 企业版中支持。”
- “CloudCanal 社区版暂不包含对该数据源的支持。”

## 用户原始问题:
“CloudCanal 的 Hana 社区版免费吗?”

## 任务指令
“请基于权威材料,直接回答用户问题,并提供清晰指引。”

通过这种严谨的结构化方式,大模型的回答被严格限定在已验证的事实范围内,从根本上杜绝了“幻觉”的产生,保证了回答的权威性与可信度。

实际应用

构建好 CloudCanal RAG API 后,将其转化为面向用户的智能问答服务变得轻而易举。

通过整合 CloudCanal 自己开发的机器人应用,可以将 RAG 能力快速对接至多种企业协作平台,真正实现智能问答的落地应用。

上线后,CloudCanal 官网的智能问答机器人在多种复杂场景下均能基于文档准确解答用户问题,哪怕面对模糊表达、多轮追问或不规范术语,依然能提供清晰可信的答案。

总结

传统 RAG 偏向“被动检索”,而 CloudCanal RAG 通过一系列优化措施,让 RAG 应用走向 “主动理解 + 智能编排”。

  • 从模糊提问中提炼用户真实意图
  • 从上下文中构建精准查询路径
  • 从海量信息中筛选最关键事实

它不仅能找答案,更能理解问题、规划流程、控制输出,为企业级 RAG 问答系统提供真正可落地的技术方案。

标签:

零代码构建 RAG 私有知识问答服务

· 阅读需 7 分钟
Barry
Chief Technology Officer

在之前的文章中,我们已经厘清了 GenAI 的关键概念:RAG、Function Calling、MCP、AI Agent。接下来的问题在于,如何从概念到实操?

目前,网上可以搜到很多 RAG 构建教程,但大部分教程都基于 LangChain 等,对小白来说仍有一定的入门门槛。

CloudCanal 本身作为数据同步平台,已经具备多源异构数据的接入与加工能力,在 RAG 系统构建语义搜索基础方面具备天然优势。近期 CloudCanal 推出的 RagApi 封装了向量检索与模型问答能力,为用户提供一个即插即用的智能查询接口。只需在 CloudCanal 中创建两个任务,即可获得你的专属 RAG 服务,全程无需使用代码。

CloudCanal RagApi 优势

相比传统 RAG 架构手动部署流程,CloudCanal 提供的 RagApi 服务具有以下独特优势:

  • 双任务完成全流程:文档导入 + API 发布
  • 零代码部署:无需开发,自定义配置即可构建 API 服务。
  • 参数可调:支持设置向量 Top-K 数量、匹配阈值、Prompt 模板、模型温度等核心参数。
  • 多模型与平台适配:支持阿里云 DashScope、OpenAI、DeepSeek 等主流模型与 API 平台。
  • OpenAI API 兼容接口:直接接入现有 Chat 应用或工具链,无需额外适配。

实例演示

本文将以 CloudCanal 官方文档为知识库,构建关于 CloudCanal 产品的 RAG 问答服务。

先展示效果:

RagApi showoff.mp4

创建这样一个 RAG 私有知识问答服务,需要用到:

  • CloudCanal:自动创建 RagApi 服务
  • PostgreSQL:向量数据库
  • 嵌入模型:阿里云百炼平台(DashScope)的 text-embedding-v3
  • 对话模型:阿里云百炼平台(DashScope)的 qwq-plus

整体工作流程如下:

操作步骤

下载 CloudCanal

下载安装 CloudCanal 私有部署版本

准备资源

  1. 登录 阿里云百炼 并创建 API-KEY。
  2. 本地安装免费的 PostgreSQL 数据库
#!/bin/bash

# 创建 docker-compose.yml 文件
cat <<EOF > docker-compose.yml
version: "3"
services:
db:
container_name: pgvector-db
hostname: 127.0.0.1
image: pgvector/pgvector:pg16
ports:
- 5432:5432
restart: always
environment:
- POSTGRES_DB=api
- POSTGRES_USER=root
- POSTGRES_PASSWORD=123456
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
EOF

# 自动执行 docker-compose 启动
docker-compose up --build

# 进入 PG 命令行
docker exec -it pgvector-db psql -U root -d api
  1. 创建高权限账号并登录。
  2. 切换到需要建表的目标 schema (如public)。
  3. 执行以下 SQL 开启向量能力。
CREATE EXTENSION IF NOT EXISTS vector;

添加数据源

登录 CloudCanal 平台,点击 数据源管理 > 新增数据源

添加文件:

选择 自建 > SshFile 数据源,可设定额外参数

  • 网络地址:填写目标文件所在机器和 SSH 端口(默认 22)。
  • 账号密码:即登录目标机器的用户名、密码。
  • 参数 fileSuffixArray:填写 .md 以过滤出所有 markdown 文件。
  • 参数 dbsJson:复制默认值并修改 schema 值(即目标文件所在根目录)。
[
{
"db":"cc_virtual_fs",
"schemas":[
{
"schema":"/Users/johnli/source/cloudcanal-doc-v2",
"tables":[]
}
]
}
]

添加向量数据库:

选择 自建 > PostgreSQL,获取数据源并添加。

添加大模型:

选择 阿里云 > 手动填写 > DashScope 数据源,填写之前步骤获取的 API-KEY。

添加 RagApi 服务:

选择 自建 > RagApi

  • 网络地址:填写为 localhost,端口默认使用 18089
  • API 密钥:自定义一个 API-KEY,用于后续调用 RagApi 接口。

创建任务 1:数据向量化

  1. 点击 同步任务 > 创建任务
  2. 选择以下数据源,并点击 测试连接 确认网络与权限正常。
    • 源端:SshFile
    • 目标端:PostgreSQL

  1. 功能配置 页面,任务类型选择 全量迁移,任务规格选择默认 2 GB 即可。

  2. 表&action过滤 页面,进行以下配置:

    1. 选择需要定时数据迁移的文件,可同时选择多个。
    2. 点击 批量修改目标名称 > 统一表名 > 填写表名(如 file_vector),并确认,方便将不同文件向量化并写入同一个表。
  3. 数据处理 页面,进行以下配置:

    1. 点击 配置大模型 > DashScope,选择刚添加的大模型实例,并选择某一个嵌入模型(如 text-embedding-v3)。
    2. 点击 批量操作 > 大模型嵌入,选择需要嵌入的字段,并全选表。
  4. 创建确认 页面,点击 创建任务,开始运行。

创建任务 2:RagApi 服务

  1. 点击 同步任务 > 创建任务
  2. 选择以下数据源,并点击 测试连接 确认网络与权限正常。
    • 源端:已配置的 PostgreSQL(向量表所在库)
    • 目标端:RagApi

  1. 功能配置 页面,任务类型选择 全量迁移,任务规格选择默认 2 GB 即可。
  2. 表&action过滤 页面,选择要使用的向量表(可多选)。

  1. 数据处理 页面,配置大模型
    1. 嵌入模型:选择 DashScope 实例与向量数据使用的嵌入模型(如 text-embedding-v3)。

      注意: PostgreSQL 中的向量维度需与选定嵌入模型一致。

    2. 聊天模型:选择 DashScope 实例与对话模型(如 qwq-plus)。

  1. 创建确认 页面,点击 创建任务,系统将自动完成 RagApi 服务构建。

效果测试

RagApi 支持通过可视化工具 CherryStudio 进行交互测试。CherryStudio 兼容 OpenAI 接口标准,适合用于接口联调、上下文调试和模型效果验证。

  1. 打开 CherryStudio,点击左下角 设置图标
  2. 模型服务 中搜索 openai,并配置如下参数:

  • 模型名称:CC_RAG

  1. 回到对话页面:
    • 点击 添加助手 > Default Assistant。
    • 右键点击 Default Assistant > 编辑助手 > 模型设置,绑定上一步添加的模型。

  1. 在对话框输入:CloudCanal 增量同步任务延迟是什么原因?应该怎么处理?,RagApi 将根据向量数据检索相关内容,并通过对话模型生成响应。

总结

经过简单的几步,我们完成了从零构建 RagApi 服务的全过程:从数据向量化、接入向量库、配置大模型、构建 Prompt,到部署兼容 OpenAI 接口的对话服务 RagApi。

整个过程无需编写任何代码,借助 CloudCanal 提供的可视化平台和多模型支持,企业可以快速构建具备私有知识问答能力的智能服务。

标签:

深入浅出 GenAI 关键概念—— 从 RAG、Function Calling、MCP 到 AI Agent

· 阅读需 24 分钟
Barry
Chief Technology Officer

简介

随着大语言模型飞速演进,其在知识时效、生成准确性以及与外部系统交互方面的局限也愈发显现。

为此,检索增强生成(RAG)、函数调用(Function Calling)、模型上下文协议(MCP)与 AI 智能体(AI Agent)等一系列技术相继涌现,为模型补足“知识新鲜度”与“操作执行力”。

近期 CloudCanal 也推出了 RagApi 功能,并引入了 MCP 协议。本文将聚焦 RAG、Function Calling、MCP、AI Agent 等核心概念,并介绍 CloudCanal 在 RAG 架构上的具体实现。

RAG:检索增强生成

RAG(Retrieval-Augmented Generation) 是一种将“检索”与“生成”结合的 AI 架构。与传统大模型直接回答问题不同,RAG 会先从外部知识库(如文档库、数据库、向量数据库)中找到与用户问题相关的上下文信息,再将这些内容作为提示输入给大语言模型,从而生成更加准确的回答。

RAG 的优势

  • 模型不再完全依赖预训练知识,可结合实时或特定领域的信息;
  • 对私有数据支持更强,安全性与定制性更高;
  • 减少模型“胡编乱造”的情况,提高回答可靠性。

RAG 的工作流程

  1. 构建知识库:准备大量文本资料,并将其向量化,存储到向量数据库中(如 PGVector)。
  2. 相似度检索:用户提问时,问题也会被向量化,然后通过相似度计算检索出最相关的文本段落。
  3. 生成回答:将这些相关内容作为上下文,提供给生成模型,用于回答用户问题。

什么是向量?

在 RAG 的工作流程中,数据向量化是第一步。那么,什么是向量呢?

为了方便理解,让我们来举个例子。对于“苹果”这个概念,人类靠经验理解,但计算机不懂“苹果”,它需要一种可以量化的方式来表示这个词。

于是,AI 会用一种叫做嵌入的方式,把“苹果”变成一个高维度的向量(Embedding),比如:

[0.12, 0.85, -0.33, ..., 0.07](假设有 768 维)

你可以理解为:计算机试图用很多个“语义维度”来描述“苹果”这件事。例如:

  • 第 12 维可能代表“是不是水果”
  • 第 47 维可能代表“是不是食物”
  • 第 202 维可能代表“是不是公司名字”
  • 第 588 维可能代表“颜色偏红”

每一维都像是在回答一个隐形的问题,而这个维度上的数值就是模型给出的“打分”,越高表示这个特征越明显。

不同的词在这些语义维度上的“打分”不同,最终就构成了不一样的向量。

相似度如何计算?

虽然“苹果”和“香蕉”的词面不同,但它们在语义向量空间中的表示非常相近——因为在很多语义维度上,它们的“打分”都很接近,这就是语义相似性

我们可以用向量来描述这些词的语义特征。例如,每个词用 [类别, 可食性, 颜色] 三个维度表示如下:

词语[类别, 食用属性, 颜色]向量说明
苹果食物 + 可食 + 红色[1.0, 1.0, 0.8]是食物,能吃,颜色偏红
香蕉食物 + 可食 + 黄色[1.0, 1.0, 0.3]是食物,能吃,颜色偏黄
飞机交通工具 + 不可食 + 银色[0.1, 0.1, 0.9]是交通工具,不能吃,金属色居多

在语义向量中,我们判断两个词是否相似,看的不是它们的数值大小,而是它们“指向的方向”是否一致。为此,我们通常使用 余弦相似度

cos(θ) = (A · B) / (||A|| × ||B||)

它的核心思想是:比较两个向量之间的夹角

  • 夹角越小 → 方向越一致 → 语义越相似(cos θ 接近 1)
  • 夹角越大 → 方向越偏离 → 语义差异越大(cos θ 接近 0,甚至为负)

Function Calling:让模型具备调用工具的能力

在日常对话中,大模型通常只需返回文字答案。但当用户提出诸如“帮我查一下明天北京的天气”这类超出模型内置知识范围的问题时,就需要借助 Function Calling,即让 AI 调用外部工具来完成任务。

Function Calling 的核心作用在于让模型具备以下能力:

  1. 判断当前问题是否需要使用工具
  2. 自动提取参数,并以结构化 JSON 形式生成调用指令
  3. 将调用交由程序执行,并接收返回结果,用于后续生成回复。

Function Calling 操作示例

举个例子:用户说

“我明天要去北京旅游,请帮我查天气”

AI 会这样处理:

  • 提取参数:城市 “北京”,时间 “明天”
  • 制定计划:调用 get_weather 工具获取天气信息
  • 生成调用指令:输出包含一次对 get_weather 的 tool_call,并传入所需参数

Function Calling 快速演示

为了让你更直观地理解 Function Calling 的原理和流程,我们准备了一份演示用的 Prompt 模板。你只需将其复制到 Cherry Studio,即可观察模型如何分析用户请求、提取参数,并生成工具调用指令。

{
"role": "AI Assistant",
"description": "You are an AI assistant. Your primary goal is to analyze user queries and respond in a structured JSON format. If a query requires a tool and all necessary parameters are present, prepare for tool use. If a query requires a tool but essential parameters are missing, you MUST ask the user for clarification. If no tool is needed, answer directly. Your entire output MUST be a single JSON object at the root level, strictly adhering to the 'response_format'. Ensure all required fields from the schema (like 'requires_tools') are always present in your JSON output.",
"capabilities": [
"Analyzing user queries for intent and necessary parameters.",
"Identifying when required parameters for a tool are missing.",
"Strictly following instructions to set 'requires_tools' to false and use 'direct_response' to ask *only* for the specific missing information required by the tool.",
"Remembering the initial query context (e.g., 'weather' intent) when a user provides previously missing information, and then proceeding to tool use if all tool requirements are met.",
"Preparing and executing tool calls when the query intent matches a tool and all its defined required parameters are satisfied. Do not ask for details beyond the tool's documented capabilities.",
"Formulating direct answers for non-tool queries or clarification questions.",
"Detailing internal reasoning in 'thought' and, if calling a tool, a step-by-step plan in 'plan' (as an array of strings)."
],
"instructions": [
"1. Analyze the user's query and any relevant preceding conversation turns to understand the full context and intent.",
"2. **Scenario 1: No tool needed (e.g., greeting, general knowledge).**",
" a. Set 'requires_tools': false.",
" b. Populate 'direct_response' with your answer.",
" c. Omit 'thought', 'plan', 'tool_calls'. Ensure 'requires_tools' and 'direct_response' are present.",
"3. **Scenario 2: Tool seems needed, but *required* parameters are missing (e.g., 'city' for weather).**",
" a. **You MUST set 'requires_tools': false.** (Because you cannot call the tool yet).",
" b. **You MUST populate 'direct_response' with a clear question to the user asking *only* for the specific missing information required by the tool's parameters.** (e.g., if 'city' is missing for 'get_weather', ask for the city. Do not ask for additional details not specified in the tool's parameters like 'which aspect of weather').",
" c. Your 'thought' should explain that information is missing, what that information is, and that you are asking the user for it.",
" d. **You MUST Omit 'plan' and 'tool_calls'.** Ensure 'requires_tools', 'thought', and 'direct_response' are present.",
" e. **Do NOT make assumptions** for missing required parameters.",
"4. **Scenario 3: Tool needed, and ALL required parameters are available (this includes cases where the user just provided a missing parameter in response to your clarification request from Scenario 2).**",
" a. Set 'requires_tools': true.",
" b. Populate 'thought' with your reasoning for tool use, acknowledging how all parameters were met (e.g., 'User confirmed city for weather query.').",
" c. Populate 'plan' (array of strings) with your intended steps (e.g., ['Initial query was for weather.', 'User specified city: Hangzhou.', 'Call get_weather tool for Hangzhou.']).",
" d. Populate 'tool_calls' with the tool call object(s).",
" e. **If the user just provided a missing parameter, combine this new information with the original intent (e.g., 'weather'). If all parameters for the relevant tool are now met, proceed DIRECTLY to using the tool. Do NOT ask for further, unrelated, or overly specific clarifications if the tool's defined requirements are satisfied by the information at hand.** (e.g., if tool gets 'current weather', don't ask 'which aspect of current weather').",
" f. Omit 'direct_response'. Ensure 'requires_tools', 'thought', 'plan', and 'tool_calls' are present.",
"5. **Schema and Output Integrity:** Your entire output *must* be a single, valid JSON object provided directly at the root level (no wrappers). This JSON object must strictly follow the 'response_format' schema, ensuring ALL non-optional fields defined in the schema for the chosen scenario are present (especially 'requires_tools'). Respond in the language of the user's query for 'direct_response'."
],
"tools": [
{
"name": "get_weather",
"description": "获取指定城市当前天气 (Gets current weather for a specified city). This tool provides a general overview of the current weather. It takes only the city name as a parameter and does not support queries for more specific facets of weather (e.g., asking for only humidity or only wind speed). Assume it provides a standard, comprehensive current weather report.",
"parameters": {
"city": {
"type": "string",
"description": "城市名称 (City name)",
"required": true
}
}
}
],
"response_format": {
"type": "json",
"schema": {
"requires_tools": {
"type": "boolean",
"description": "MUST be false if asking for clarification on missing parameters (Scenario 2) or if no tool is needed (Scenario 1). True only if a tool is being called with all required parameters (Scenario 3)."
},
"direct_response": {
"type": "string",
"description": "The textual response to the user. Used when 'requires_tools' is false (Scenario 1 or 2). This field MUST be omitted if 'requires_tools' is true (Scenario 3).",
"optional": true // Optional because it's not present in Scenario 3
},
"thought": {
"type": "string",
"description": "Your internal reasoning. Explain parameter absence if asking for clarification, or tool choice if calling a tool. Generally present unless it's an extremely simple Scenario 1 case.",
"optional": true // Optional for very simple direct answers
},
"plan": {
"type": "array",
"items": {
"type": "string"
},
"description": "Your internal step-by-step plan (array of strings) when 'requires_tools' is true (Scenario 3). Omit if 'requires_tools' is false. Each item MUST be a string.",
"optional": true // Optional because it's not present in Scenario 1 or 2
},
"tool_calls": {
"type": "array",
"items": {
"type": "object",
"properties": {
"tool": { "type": "string", "description": "Name of the tool." },
"args": { "type": "object", "description": "Arguments for the tool." }
},
"required": ["tool", "args"]
},
"description": "Tool calls to be made. Used only when 'requires_tools' is true (Scenario 3). Omit if 'requires_tools' is false.",
"optional": true // Optional because it's not present in Scenario 1 or 2
}
}
},
"examples": [
// Example for Scenario 3 (direct tool use)
{
"query": "今天北京天气如何?",
"response": {
"requires_tools": true,
"thought": "User wants current weather for Beijing. City is specified. Use 'get_weather'.",
"plan": ["Identified city: 北京.", "Tool 'get_weather' is appropriate.", "Prepare 'get_weather' tool call."],
"tool_calls": [{"tool": "get_weather", "args": {"city": "北京"}}]
}
},
// Multi-turn example demonstrating Scenario 2 then Scenario 3
{
"query": "天气如何?", // Turn 1: User asks for weather, no city
"response": { // AI asks for city (Scenario 2)
"requires_tools": false,
"thought": "用户询问天气但未指定城市。'get_weather'工具需要城市名。因此,我必须询问用户城市。",
"direct_response": "请问您想查询哪个城市的天气呢?"
}
},
{
"query": "杭州", // Turn 2: User provides city "Hangzhou"
"response": { // AI uses tool (Scenario 3)
"requires_tools": true,
"thought": "The user previously asked for weather and has now provided the city '杭州'. All required parameters for 'get_weather' are met. The tool provides a general current weather report.",
"plan": ["Initial query was for weather.", "User specified city: 杭州.", "Call 'get_weather' tool for Hangzhou."],
"tool_calls": [{"tool": "get_weather", "args": {"city": "杭州"}}]
}
},
// Another multi-turn example (English)
{
"query": "What's the weather like today?", // Turn 1
"response": { // AI asks for city (Scenario 2)
"requires_tools": false,
"thought": "User wants today's weather but no city provided. 'get_weather' tool needs the city parameter. I must ask for clarification.",
"direct_response": "Sure, I can get the weather for you! Which city are you interested in?"
}
},
{
"query": "London", // Turn 2
"response": { // AI uses tool (Scenario 3)
"requires_tools": true,
"thought": "The user asked about weather and has now specified 'London'. The 'get_weather' tool takes city and provides current weather. Proceeding with tool call.",
"plan": ["Original intent was weather.", "City provided: London.", "Call 'get_weather' tool."],
"tool_calls": [{"tool": "get_weather", "args": {"city": "London"}}]
}
},
// Example for Scenario 1 (direct answer)
{
"query": "你好",
"response": {
"requires_tools": false,
"direct_response": "你好!有什么我可以帮你的吗?"
// 'thought' is optional for very simple cases
}
},
// Example of user being too vague even after specifying city for a non-weather query, but we want to avoid this for weather.
// This example is more to ensure the AI understands the "don't ask for unnecessary details for the get_weather tool" instruction.
{
"query": "告诉我关于巴黎的事情。", // "Tell me about Paris." (General query)
"response": {
"requires_tools": false, // Assuming no general knowledge tool for this example, or AI should ask for specifics.
"thought": "User is asking for general information about Paris. This is too broad. I should ask for more specific interests if no general info tool is available.",
"direct_response": "巴黎有很多方面,比如历史、景点、美食等等。您对哪个方面比较感兴趣呢? (Paris has many aspects, like history, attractions, food, etc. Which aspect are you interested in?)"
}
},
{
"query": "查询杭州的天气", // User explicitly states "Hangzhou's weather"
"response": { // AI should directly use the tool
"requires_tools": true,
"thought": "User explicitly asked for Hangzhou's weather. City is clear. The 'get_weather' tool is appropriate and provides a general current weather report.",
"plan": ["User query: Hangzhou's weather.", "City: Hangzhou.", "Call 'get_weather' tool."],
"tool_calls": [{"tool": "get_weather", "args": {"city": "杭州"}}]
}
}
]
}

Function Calling 多轮对话流程

  • 用户提问:“天气如何?”由于未明确城市信息,AI 无法直接调用工具,此时应追问用户所在城市。

  • 用户回复:“杭州”。AI 获取了查询所需的关键信息,提取参数后生成 tool_calls。此时,应用程序识别到 requires_tools: true,根据 tool_calls 调用相应工具函数。

  • 工具执行完成后,结果返回给 AI,AI 再基于结果进行总结并回复用户。

本质上,大模型通过自然语言理解用户意图:要完成什么任务、需要哪些信息。它会自动从对话中提取出关键参数。随后,用户的程序可根据这些参数调用对应的函数完成任务,并将执行结果返回给模型,由模型生成最终回复。

MCP:让模型更好地调用工具

Function Calling 解决了“模型怎么调用自定义函数”,但在实际使用中还面临一些问题:

  • 多个工具组成的调用链(先查天气、再发邮件)
  • 工具参数结构的规范与自动注册
  • 不同调用方式的适配(HTTP、本地插件等)
  • 在不同模型间复用统一的工具体系

什么是 MCP?

MCP 是由 Anthropic 推出的开放标准协议,旨在为大模型和外部工具之间的通信提供通用接口

它不是 Function Calling 的替代,而是对其在执行层面的进一步规范和封装,使工具系统更易接入、更易管理、更易复用

MCP 核心角色

MCP Client

  • 向 MCP Server 请求工具列表
  • 使用 HTTP 或 stdio 协议发起工具调用请求

MCP Server

  • 接收 tool_calls,根据调用内容执行对应工具
  • 返回统一格式的结构化结果

MCP Server 调用方式

HTTP 模式(StreamableHttp)

MCP Server 作为 Web 服务运行,暴露如下接口:

  • /mcp:用于接收工具调用或列出工具列表
  • 支持 Event Stream(流式响应)与 JSON-RPC 协议

以下是一个天气服务的 HTTP 模式演示:

cat > streamable_weather.mjs << 'EOF'
#!/usr/bin/env node
import express from "express";
import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { randomUUID } from "node:crypto";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";

const app = express();
app.use(express.json());

function getServer() {
const server = new McpServer({
name: "Weather",
version: "1.0.0"
});

server.resource(
"get_weather",
new ResourceTemplate("weather://{city}", { list: undefined }),
async (uri, { city }) => ({
contents: [{
uri: uri.href,
text: `Resource weather for ${city}: 晴,24°C`
}]
})
);

server.tool(
"get_weather",
{ city: z.string() },
async ({ city }) => ({
content: [{ type: "text", text: `Tool weather for ${city}: 明天晴,最高24°C,微风3km/h` }]
})
);

server.prompt(
"get_weather",
{ city: z.string() },
({ city }) => ({
messages: [{
role: "user",
content: {
type: "text",
text: `请告诉我 ${city} 的天气情况`
}
}]
})
);
return server;
}

app.post("/mcp", async (req, res) => {
try {
const server = getServer();
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});

res.on("close", () => {
console.log("Request closed");
transport.close();
server.close();
});

await server.connect(transport);
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error("Error handling MCP request:", error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: "2.0",
error: {
code: -32603,
message: "Internal server error",
},
id: null,
});
}
}
});

app.get("/mcp", (req, res) => {
console.log("Received GET MCP request");
res.status(405).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed.",
},
id: null,
});
});

app.delete("/mcp", (req, res) => {
console.log("Received DELETE MCP request");
res.status(405).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed.",
},
id: null,
});
});

const PORT = process.env.PORT || 30001;
app.listen(PORT, () => {
console.log(
`MCP Stateless Streamable HTTP Server listening on http://localhost:${PORT}/mcp`
);
});

EOF
# 安装依赖
npm install express @modelcontextprotocol/sdk zod

# 启动服务
node streamable_weather.mjs

# 获取工具列表
curl -N -X POST http://localhost:30001/mcp \
-H 'Accept: application/json, text/event-stream' \
-H 'Content-Type: application/json' \
-d '{
"jsonrpc":"2.0",
"id":1,
"method":"tools/list",
"params":{}
}'

# > 返回工具
event: message
data: {"result":{"tools":[{"name":"get_weather","inputSchema":{"type":"object","properties":{"city":{"type":"string"}},"required":["city"],"additionalProperties":false,"$schema":"http://json-schema.org/draft-07/schema#"}}]},"jsonrpc":"2.0","id":1}

# 执行工具调用链
curl -N -X POST http://localhost:30001/mcp \
-H 'Accept: application/json, text/event-stream' \
-H 'Content-Type: application/json' \
-d '{
"jsonrpc":"2.0",
"id":2,
"method":"tools/call",
"params":{
"name":"get_weather",
"arguments":{ "city":"北京" }
}
}'

# > 返回执行结果
event: message
data: {"result":{"content":[{"type":"text","text":"Tool weather for 北京: 明天晴,最高24°C,微风3km/h"}]},"jsonrpc":"2.0","id":2}

Stdio 模式(本地插件)

Stdio 模式适用于本地运行的插件程序。模型与 MCP Server 通过标准输入输出进行通信,不依赖网络,适合部署在受限环境下。

以下是一个天气服务的 Stdio 模式演示:

cat > weather_stdio.mjs << 'EOF'
#!/usr/bin/env node
import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";

const server = new McpServer({
name: "Weather",
version: "1.0.0"
});

server.resource(
"get_weather",
new ResourceTemplate("weather://{city}", { list: undefined }),
async (uri, { city }) => ({
contents: [{
uri: uri.href,
text: `Resource weather for ${city}: 晴,24°C`
}]
})
);

server.tool(
"get_weather",
{ city: z.string() },
async ({ city }) => ({
content: [{ type: "text", text: `Tool weather for ${city}: 明天晴,最高24°C,微风3km/h` }]
})
);

server.prompt(
"get_weather",
{ city: z.string() },
({ city }) => ({
messages: [{
role: "user",
content: {
type: "text",
text: `请告诉我 ${city} 的天气情况`
}
}]
})
);

const transport = new StdioServerTransport();
await server.connect(transport);
EOF
# 获取工具列表
printf '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}\n' | node weather_stdio.mjs

# > 返回工具
{"result":{"tools":[{"name":"get_weather","inputSchema":{"type":"object","properties":{"city":{"type":"string"}},"required":["city"],"additionalProperties":false,"$schema":"http://json-schema.org/draft-07/schema#"}}]},"jsonrpc":"2.0","id":1}

# 执行工具调用链:调用 get_weather
printf '{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"get_weather","arguments":{"city":"北京"}}}\n' | node weather_stdio.mjs

# > 返回执行结果
{"result":{"content":[{"type":"text","text":"Tool weather for 北京: 明天晴,最高24°C,微风3km/h"}]},"jsonrpc":"2.0","id":4}

MCP 多轮对话流程

  • 在多轮对话中,当用户输入:“北京天气如何?”

  • AI 会识别出用户意图需要使用工具 get_weather,并生成如下结构化调用指令:
{
"tool": "get_weather",
"args": {
"city": "北京"
}
}
  • 用户的程序会将该调用指令转发至 MCP Server,MCP Server 接收到该调用请求后,执行对应工具,并返回如下结构化结果:

  • 用户的程序从 result.content 中提取文本字段,即 content.text,再进行总结和自然语言生成,最终回复用户:

“明天北京天气晴朗,最高气温24°C,微风3km/h。感谢您的咨询!如果还有其他问题,请随时提出。”

MCP 为大模型对接外部世界提供了统一且可扩展的执行框架,具备以下优势:

  • 支持多种通信方式(HTTP、Stdio);
  • 支持统一的工具注册与声明;
  • 可复用的跨模型调用协议;
  • 易于本地或远程部署。

它与 Function Calling 搭配使用,为构建模块化、可编排、可维护的 AI Agent 系统打下了基础。

AI Agent:具备认知与行动能力的智能体

AI Agent 是一个具备认知、行动、反思能力的完整智能系统,通常整合了 RAG(检索增强生成)Function CallingMCP

  • 用 RAG 获取知识;
  • 用 Function Calling 执行操作;
  • 用 MCP 统一工具调用标准。

一个成熟的 AI Agent 能够:

  • 理解目标:通过自然语言指令(如“帮我查下北京的天气”)识别用户意图;
  • 主动拆解任务:将复杂任务拆分为多个可执行步骤,按序执行;
  • 调用外部工具:自动连接 API、数据库、搜索引擎等外部系统;
  • 记忆上下文:理解当前对话历史与任务进展;
  • 自我反思:在执行失败后尝试重试、重新规划或变更路径(部分 Agent 支持)。

相较于传统的聊天式 AI,AI Agent 更像一个“可指挥、可编排”的执行者,具备在真实应用中解决复杂问题的能力,广泛适用于客服、数据处理、自动化办公、个人助理等场景。

概念对比一览

概念本质数据来源适用场景典型应用
RAG检索 + 生成知识库 / 文档专业问答、动态知识更新企业知识库、客服机器人
Function Calling调用外部函数API / 数据库实时数据交互、自动化任务天气查询、订单处理
MCP标准化工具调用协议多平台服务(如 GitHub)跨模型、跨服务协作智能工作流(如查天气+发邮件)
AI Agent自主规划 + 执行综合(RAG + 工具调用)复杂任务自动化个人助理

CloudCanal RAG

近期 CloudCanal 支持了构建 RagApi 服务,基于标准的 RAG 架构,同时引入了 MCP 协议,实现了向量化、检索、问答生成与工具链调用的端到端闭环。

构建流程

CloudCanal 构建的 RagApi 对外暴露为 OpenAI 格式的 API 接口,可直接对接业务系统或调用方。整体流程分为两个阶段:

阶段一:数据准备与嵌入(File → PostgreSQL 向量库)

  1. 数据采集与准备
    企业知识来源包括 Markdown、TXT、数据库、内部文档等。用户通过 CloudCanal 创建嵌入任务,配置数据源、模型、目标表等信息。
  2. 数据切分与向量化
    CloudCanal 自动处理原始文档并生成向量嵌入,写入 pgvector 扩展的向量字段中(如 __vector 列)。

阶段二:API 构建与服务发布(PostgreSQL → RagApi)

  1. 查询向量化与语义优化
    用户问题进入对话接口后,系统首先会使用相同的嵌入模型将问题向量化。此过程中可启用以下能力模块:
    • 压缩查询(QUERY_COMPRESS):对原始提问进行语义压缩,去除冗余、聚焦核心内容,提高向量匹配精度。
    • 扩展查询(QUERY_EXTEND):自动引入近义词、相关概念或补充说明,扩大匹配范围,提高召回率。
  2. 向量检索与知识片段选择
    在向量库中进行相似度搜索,检索结果可进一步通过 知识片段选择(KNOWLEDGE_SELECT) 进行筛选,支持多个知识库场景,系统会根据语义相关性自动选择最匹配的知识片段(支持跨表路由)。
  3. Prompt 构造与上下文拼接
    系统根据用户配置的 Prompt 模板,将问题与召回内容结合,构造出最终用于模型推理的 Prompt 输入。
  4. 模型推理与回答生成

生成的 Prompt 被送入指定的 Chat 模型进行推理(如 deepseek r1、qwq-plus、GPT-4o 等),模型返回最终回答内容。

  1. MCP 工具链集成(可选)

如需执行任务类问题(如“查 GitHub PR 状态”、“调用企业 API”),可启用 MCP 工具链调用。

支持标准化注册的 MCP 工具(HTTP / stdio),通过 Function Calling 调用链执行外部任务,补全答案或直接完成任务。

具体操作步骤可参考:

任务创建成功后,将对外暴露一个具备内置知识库支持查询语义优化能力可选 MCP 工具链执行的 RAG 服务,且兼容 OpenAI 的 API 协议

相当于将用户原有的大模型 API 接口进行了增强——无需更改客户端代码,即可接入增强后的智能问答与任务执行服务。

这里可以通过可视化工具 CherryStudio 进行交互测试。CherryStudio 兼容 OpenAI 接口标准,适合用于接口联调、上下文调试和模型效果验证。

Cherry Studio 配置步骤

  1. 打开 Cherry Studio,在“模型服务”中搜索 openai
  2. 配置参数如下:

  • 模型名称:填写 CC_RAG

  1. 回到对话页面:
  • 添加助手 → Default Assistant。
  • 右键点击 Default Assistant → 编辑助手 → 模型设置,绑定上一步添加的模型。

  1. 在对话窗口中输入:

CloudCanal 增量同步任务延迟是什么原因?应该怎么处理?

RagApi 将根据向量数据自动检索相关知识内容(本例使用 CloudCanal 文档知识库),结合模型生成自然语言回答。

MCP 工具链集成示例

如果 RagApi 配置了 MCP 工具服务(如网页抓取、GitHub 查询等),模型可自动生成工具调用。

步骤如下:

  1. 进入 CloudCanal,点击任务详情页右上角「功能列表」>「参数修改」。
  2. 进入「目标数据源配置」,找到 mcpServers,将如下配置粘贴进去。
  3. 点击右上角「生效配置」,确认参数修改内容。
  4. 点击「确认」。如部分参数需重启任务生效,系统会自动提示是否重启。
{
"mcpServers": {
"github": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-github"
],
"env": {
"GITHUB_PERSONAL_ACCESS_TOKEN": "<YOUR_TOKEN>"
}
},
"mcp-server-firecrawl": {
"command": "npx",
"args": [
"-y",
"firecrawl-mcp"
],
"env": {
"FIRECRAWL_API_KEY": "<YOUR_API_KEY_HERE>"
}
}
}
}

CloudCanal 将自动触发 MCP 工具执行,并进行多轮调用与总结,最终返回结果:

总结

RAG、Function Calling、MCP 和 AI Agent 不是孤立存在的技术,而是在现实应用中彼此协同、互为补充。CloudCanal 近期支持的 RagApi 服务,融合了这些 AI 底层能力,可以零代码傻瓜式完成 RAG 服务的构建,大大降低了使用智能 AI 的门槛。

标签:

OceanBase(Oracle 租户) 到 OceanBase(MySQL 租户) 数据同步

· 阅读需 3 分钟
Barry
Chief Technology Officer

简述

CloudCanal 4.4.0.0 版本开始支持 OceanBase (Oracle 租户) 作为源/对端的数据迁移能力。本篇文章主要介绍如何使用 CloudCanal 构建一条 OceanBase(Oracle 租户)到 OceanBase(MySQL 租户)的数据同步链路(以下简称 ObForOracle 和 OceanBase)。

技术点

订阅增量日志

ObForOracle 增量数据变更的订阅依赖其官方提供的日志代理组件 oblogproxy。CloudCanal 不断接收并解析 oblogproxy 推送的增量事件,最终写入对端。

前提条件

ObForOracle 版本为 3.0、3.1、3.2。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

  1. 登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

  2. 数据库类型选择 ObForOracle,并填写网络地址和认证信息。

    信息
    • 网络地址格式为 ip1:sql_port1;ip2:sql_port2
    • 多个 Root Server 用英文分号分隔。
    • 支持填写 OBProxy 地址,格式为 proxy_ip:proxy_port
  3. 配置 额外参数

    参数描述
    obLogProxyHostoblogproxy server 的地址,格式为 ip:port,默认端口统一为 2983。如果需要订阅增量数据,该参数不可为空。
    clusterUrl可以为空。为空时订阅增量数据时会使用 root server list。不为空时订阅增量数据会优先使用 cluster url。
    rpcPortList订阅增量数据时,该参数不可为空,默认端口为 2882。如果网络地址包含多个 Root Server(假设为 3 个),此处填写格式为 2882:2882:2882。
    tenantOceanBase Oracle 模式的租户名称,不可为空。若为空,使用 OceanBase 驱动在连接时会默认连接到 sys 租户(MySQL 模式)。
    clusterNameOceanBase 集群名称,可以为空。用于拼接连接串(用户名@租户名#集群名)以通过 OceanBase 官方驱动连接数据库。

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源,并分别点击 测试连接

  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的表。

  5. 选择表对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好表结构。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    ObForOracle 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: ObForOracle 源端的表定义将会迁移到对端,如果同名表在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 ObForOracle 到 OceanBase 数据迁移同步,打通数据流动的渠道,实现端到端的精准数据传输。

HANA 到 MySQL 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

SAP HANA 是一款列式存储的内存数据库系统,相比传统的硬盘存储数据库,数据处理速度更快,支持联机分析处理(OLAP)和联机事务处理(OLTP),常常用于实时分析处理、应用程序开发等场景。

MySQL 是在全球广泛使用的开源关系型数据库,历史悠久,运行稳定可靠,简便易用,灵活可扩展,因而受到许多组织的青睐。常用于 Web 应用的后端数据库、企业资源规划(ERP)系统的数据库、开发和测试数据库等。

本篇文章主要介绍如何使用 CloudCanal 构建一条 HANA 到 MySQL 的数据同步链路。

技术点

数据同步整体流程

CloudCanal 实现 HANA 源端增量数据同步,主要使用其触发器捕获变更事件。整体流程如下:

  • 安装触发器,通过触发器捕获增量变更数据
  • 记录位点,记录增量数据同步的起点
  • 执行全量数据迁移
  • 执行增量数据同步

表级别 CDC 表

CloudCanal 实现了表级别的 CDC 表设计,每张源表都对应一张 CDC 表,CDC 表的结构仅在原表结构的基础上增加了几个位点字段,用于增量同步。

相比于所有数据写入单一 CDC 表,表级别的 CDC 表更加独立,方便多次订阅表。此外,触发器只需要执行 INSERT 语句,因此对于字段较多的表也能够快速执行。扫描消费 CDC 数据时,不需要做额外的处理,消费更简单。

image.png

原表

CREATE COLUMN TABLE "SYSTEM"."TABLE_TWO_PK" (
"ORDERID" INTEGER NOT NULL ,
"PRODUCTID" INTEGER NOT NULL ,
"QUANTITY" INTEGER,
CONSTRAINT "FANQIE_pkey_for_TA_171171268" PRIMARY KEY ("ORDERID", "PRODUCTID")
)

CDC 表

CREATE COLUMN TABLE "SYSTEM"."SYSTEMDB_FANQIE_TABLE_TWO_PK_CDC_TABLE" (
"ORDERID" INTEGER,
"PRODUCTID" INTEGER,
"QUANTITY" INTEGER,
"__$DATA_ID" BIGINT NOT NULL ,
"__$TRIGGER_ID" INTEGER NOT NULL ,
"__$TRANSACTION_ID" BIGINT NOT NULL ,
"__$CREATE_TIME" TIMESTAMP,
"__$OPERATION" INTEGER NOT NULL
);
-- other index

触发器 (INSERT)

CREATE TRIGGER "FANQIE"."CLOUD_CANAL_ON_I_TABLE_TWO_PK_TRIGGER_104" AFTER INSERT ON "SYSTEM"."TABLE_TWO_PK" REFERENCING NEW ROW NEW FOR EACH ROW 
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
IF 1=1 THEN
INSERT INTO "SYSTEM"."SYSTEMDB_FANQIE_TABLE_TWO_PK_CDC_TABLE" (__$DATA_ID, __$TRIGGER_ID, __$TRANSACTION_ID, __$CREATE_TIME, __$OPERATION, "ORDERID","PRODUCTID","QUANTITY")
VALUES(
"SYSTEM"."CC_TRIGGER_SEQ".NEXTVAL,
433,
CURRENT_UPDATE_TRANSACTION(),
CURRENT_UTCTIMESTAMP,
2,
:NEW."ORDERID" ,
:NEW."PRODUCTID" ,
:NEW."QUANTITY"
);
END IF;
END;

表级别任务位点

在表级别 CDC 表模式下,同步增量数据时,每个表都有自己的位点,原有的单一位点无法满足这种同步需求。

因此,CloudCanal 引入了表级别的增量同步位点,确保每个表能够消费各自对应的增量同步位点。位点的具体体现为:

[
{
"db": "SYSTEMDB",
"schema": "FANQIE",
"table": "TABLE_TWO_PK",
"dataId": 352,
"txId": 442441,
"timestamp": 1715828416114
},
{
"db": "SYSTEMDB",
"schema": "FANQIE",
"table": "TABLE_TWO_PK_2",
"dataId": 97,
"txId": 11212,
"timestamp": 1715828311123
},
...
]

这样的设计有以下好处:

  • 位点精细控制:每个表都有自己的增量同步位点,在增量任务中可以重新消费特定表中的增量数据,而无需消费所有表的数据,实现更加精细的控制,减少不必要的数据传输和处理,提高同步效率。

  • 数据并行处理:由于每个表有自己的位点,可以实现表级别的并行处理。不同表的增量数据可以同时处理,避免了单一位点导致的串行处理瓶颈,从而加快了同步速度。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源。

    1. 选择源和目标实例,并分别点击 测试连接
    2. 在目标实例下方 高级配置 中选择源端 CDC 表模式:单 CDC 表 / 表级 CDC 表
  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的表。

  5. 选择表对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好表结构。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    HANA 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 初始化 HANA CDC 表以及对应触发器
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: HANA 源端的表定义将会迁移到对端,如果同名表在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 HANA 到 MySQL 数据迁移同步,打通数据流动的渠道,实现端到端的精准数据传输。

Kafka 到 Kafka 数据同步

· 阅读需 3 分钟
Barry
Chief Technology Officer

简述

Kafka 为处理实时数据提供了一个统一、高吞吐、低延迟的平台,其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。因此实现 Kafka 到 Kafka 的数据同步也成了一项重要工作。

本篇文章主要介绍如何使用 CloudCanal 构建一条 Kafka 到 Kafka 的数据同步链路。

技术点

消费者消息推送

在任务创建后,CloudCanal 会自动创建消费组,并订阅需要同步消息的 Topic。CloudCanal 从源端拉取到消息后,会将消息推送到目标端。

心跳机制

在源端 Kafka 未产生消息时,CloudCanal 便无法正常感知消息的延时时间。

我们采用心跳模式解决这个问题。在 打开 Kafka 源端心跳 后,CloudCanal 会监测所有分区的消费位点,若所有分区的最新的位点与当前位点差值均小于设定的最长位点间隔(通过 dbHeartbeatToleranceStep 参数设置),则会产生一条包含当前系统时间的心跳记录。CloudCanal 在消费到该记录后,会根据其包含的时间计算延迟。

操作示例

步骤1: 配置 Kafka 权限

参考 Kafka 需要的权限 文档,设置 CloudCanal 需要的账号权限。

步骤2: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 3: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤4: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 选择源和目标数据源,并分别点击 测试连接

  3. 选择同步的 消息格式

    信息

    倘若没有特定的消息格式,请选择 原始消息格式

  4. 选择 增量同步

  5. 选择需要同步的 Topic。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    Kafka 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: Kafka 源端会自动为对端创建 Topic,如果目标 Topic 在对端已存在,则会忽略。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 Kakfa 到 Kafka 数据迁移同步,助力企业快速构建数据管道,增强数据分析能力。

PostgreSQL 到 PostgreSQL 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

PostgreSQL 是一个历史悠久且广泛使用的数据库,不仅具备标准的关系型数据库能力,还具有相当不错的复杂 SQL 执行能力。用户常常会将 PostgreSQL 应用于在线事务型业务,以及部分数据分析工作,所以 PostgreSQL 到 PostgreSQL 数据迁移同步成为了一个重要工作。

本文主要介绍如何通过 CloudCanal 实现 PostgreSQL 到 PostgreSQL 数据迁移同步。

技术点

使用 PostgreSQL 逻辑复制机制

CloudCanal 使用 PostgreSQL 逻辑复制 (Logical) 机制实现对其增量数据的订阅。

发布(Publication)和同步任务一一关联,任务修改订阅后也会自动增减 Publication 中的表。

Trigger 方式实现 DDL 同步

DDL 同步对于在线数据库灾备等场景必不可少,但 PostgreSQL 逻辑复制并未提供 DDL 操作。

为此,我们采用了行业通用的 Trigger 方式捕获 DDL 操作,并且自动写入一张普通表 cc_pg_ddl_capture_tab。CloudCanal 正常订阅该表即可获取 DDL 操作,和普通 DML 增量订阅机制一致,可准确地保障 DDL 和涉及表 DML 事件顺序。

pg_ddl_capture

双向同步防循环能力

数据库多活能力要求常常出现在一些在线数据库核心应用场景中,对于数据同步工具,主要需要做到同步数据无循环。

对于 PostgreSQL 之间的多活同步防循环,我们采用同一事务标记操作方式实现。

通过一张额外的标记表,同步任务写入正常业务数据到对端时,在同一个事务中对标记表做操作。当任务从 PostgreSQL 中获取到该标记表事件时,则忽略当前事务所有操作,即达到防循环目的。

pg_loop_sync

操作示例

步骤 1: 修改 PostgreSQL 日志级别

  1. 参考 PostgreSQL 需要的权限 文档创建用户和授予权限。
  2. 设置 PostgreSQL wal_levellogical
信息

自建数据库可修改 postgresql.conf,设置 wal_level=logical 和 wal_log_hints = on。

  1. 设置同步账号网络权限。
信息

自建数据库可修改 pg_hba.conf,添加以下配置:

host replication 同步账号 CIDR网段 md5
host 同步库 同步账号 CIDR网段 md5
host postgres 同步账号 CIDR网段 md5

  1. 重启 PostgreSQL 生效。

步骤 2: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 3: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 4: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 选择源和目标数据源,并分别点击 测试连接

  3. 选择 数据同步 并勾选 全量初始化

    信息

    勾选 同步 DDL 将会自动创建对应的 DDL 捕获 trigger 和 event,需要较高权限。

  4. 选择需要同步的索引。

  5. 选择索引对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好表即可。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    PostgreSQL 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 初始化 DDL 捕获触发器和表
    • 初始化 PostgreSQL 增量复制位点
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动步骤流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: 将 PostgreSQL 源端的表结构迁移到对端,如果同名表在对端已存在,则忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端,支持断点续传。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文简单介绍了如何使用 CloudCanal 进行 PostgreSQL 到 PostgreSQL 数据迁移同步。

PostgreSQL 作为流行的关系型数据库,通过 CloudCanal 数据迁移同步加持,让数据进出更加便利和顺畅。

Hana 到 PostgreSQL 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

本篇文章主要介绍如何使用 CloudCanal 构建一条 Hana 到 PostgreSQL 的数据同步链路。

技术点

表级别 CDC 表

CloudCanal 在实现 Hana 源端增量同步时,最初采用的是单 CDC 表的模式,即所有订阅表的增量数据(插入、更新、删除)通过触发器统一写入同一张 CDC 表。这样设计的初衷是简化架构和实现方式,但是同时也带来了一些问题。

  • 触发器执行效率低:采用单个 CDC 表时,我们将订阅表的字段值拼接成 JSON 字符串。虽然这种方式统一,但增加了触发器的复杂性。当字段数量超过 300 个时,触发器效率显著下降,影响同步性能。

  • 增量数据积压:所有订阅表的变更数据集中写入单个 CDC 表,当 A 表增量数据较多而 B 表较少时,混合写入会导致无法及时处理 B 表数据,造成 B 表数据积压,影响同步及时性。

后续我们对于单 CDC 表模式进行了优化。本次优化实现了表级别的 CDC 表设计,每张源表都对应一张 CDC 表,CDC 表的结构仅在原表结构的基础上增加了几个位点字段,用于增量同步。

原表

CREATE COLUMN TABLE "SYSTEM"."TEST" (
"TEST1" INTEGER NOT NULL ,
"TEST2" INTEGER NOT NULL ,
"TEST3" INTEGER,
CONSTRAINT "TEST_KEY" PRIMARY KEY ("TEST1", "TEST2")
)

CDC 表

CREATE COLUMN TABLE "SYSTEM"."SYSTEM_TEST_CDC_TABLE" (
"TEST1" INTEGER,
"TEST2" INTEGER,
"TEST3" INTEGER,
"__$DATA_ID" BIGINT NOT NULL ,
"__$TRIGGER_ID" INTEGER NOT NULL ,
"__$TRANSACTION_ID" BIGINT NOT NULL ,
"__$CREATE_TIME" TIMESTAMP,
"__$OPERATION" INTEGER NOT NULL
);
-- other index

触发器 (INSERT)

CREATE TRIGGER "SYSTEM"."CLOUD_CANAL_ON_I_TEST_TRIGGER_TEST" AFTER INSERT ON "SYSTEM"."TEST" REFERENCING NEW ROW NEW FOR EACH ROW 
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
IF 1=1 THEN
INSERT INTO "SYSTEM"."SYSTEM_TEST_CDC_TABLE" ("__$DATA_ID", "__$TRIGGER_ID", "__$TRANSACTION_ID", "__$CREATE_TIME", "__$OPERATION", "TEST1", "TEST2", "TEST3")
VALUES(
"SYSTEM"."CC_TRIGGER_SEQ".NEXTVAL,
433,
CURRENT_UPDATE_TRANSACTION(),
CURRENT_UTCTIMESTAMP,
2,
:NEW."TEST1" ,
:NEW."TEST2" ,
:NEW."TEST3"
);
END IF;
END;

采用这种方式有以下几个好处:

  • 表级别 CDC 表更加独立,方便进行多次订阅。
  • 触发器只需要执行 INSERT 语句,因此对于字段较多的表也能够快速执行。
  • 扫描消费 CDC 数据时,不需要做额外的处理,消费更简单。

操作示例

步骤 1: 安装 CloudCanal

请参考 全新安装(Docker Linux/MacOS),下载安装 CloudCanal 私有部署版本

步骤 2: 添加数据源

登录 CloudCanal 控制台,点击 数据源管理 > 新增数据源

步骤 3: 创建任务

  1. 点击 同步任务 > 创建任务

  2. 配置源和目标数据源。

    1. 选择源和目标实例,并分别点击 测试连接
    2. 在目标实例下方 高级配置 中选择源端 CDC 表模式:单 CDC 表 / 表级 CDC 表
  3. 选择 数据同步 并勾选 全量初始化

  4. 选择需要同步的表。

  5. 选择表对应的列。

    信息

    如果需要选择同步的列,可先行在对端创建好表结构。

  6. 点击 确认创建

    信息

    任务创建过程将会进行一系列操作,点击 同步设置 > 异步任务,找到任务的创建记录并点击 详情 即可查看。

    Hana 源端的任务创建会有以下几个步骤:

    • 结构迁移
    • 初始化 Hana CDC 表以及对应触发器
    • 分配任务执行机器
    • 创建任务状态机
    • 完成任务创建
  7. 等待任务自动流转。

    信息

    当任务创建完成,CloudCanal 会自动进行任务流转,其中的步骤包括:

    • 结构迁移: Hana 源端的表定义将会迁移到对端,如果同名表在对端已经存在,则会忽略。
    • 全量数据迁移: 已存在的存量数据将会完整迁移到对端。
    • 增量数据同步: 增量数据将会持续地同步到对端数据库,并且保持实时(秒级别延迟)。

总结

本文介绍了如何使用 CloudCanal 进行 Hana 到 PostgreSQL 数据迁移同步,操作简便的同时带来高效的数据同步体验,大大加快了企业的数据流通与数据平台构建。

Sap Hana 数据迁移同步优化

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

CloudCanal 近期对 Hana 源端链路做了新一轮优化,优化点主要来自用户实际场景使用,这篇文章简要做下分享。

本轮优化主要包含:

  • 新增任务级增量表
  • 新增增量表定时清理能力
  • 新增增量表表结构自动演进能力
  • 任务延迟判定优化
  • Hana 1.x 的兼容
  • 产品化和文档优化

优化点

任务级增量表

CloudCanal Hana 源端任务原本不支持修改默认增量表,导致不同任务的触发器将增量数据写入同一个表,不同任务将相互影响。

比如,A 任务订阅的表积压大量数据,将影响B,C,D等订阅相同表的任务增量同步效率。

为解决这一问题,本轮优化支持 每个任务可单独设置增量表 ,以此确保任务之间互不影响。

image.png

增量表定时清理

触发器将增量数据写入增量表后,若未及时清理,可能导致空间占用增加。

在之前的版本中,用户只能手动定期清理,过程繁琐且具备一定风险(清理错)。

本轮优化增加设置任务参数 triggerDataCleanEnabled 打开自动定时清理增量表功能,并提供两个参数进行控制:

  • triggerDataCleanIntervalMin:增量表清理间隔(单位:分钟)
  • triggerDataRetentionMin:增量表数据保留时间(单位:分钟)

通过这套机制,用户能够灵活控制增量表的清理操作,同时确保未消费的增量数据不会被意外清除。

image.png

增量表自动演进

Hana 增量任务创建时自动生成增量表,CloudCanal 依赖于增量表实现各种能力,但随着 CloudCanal 版本更新,可能对增量表进行变更(比如加入新字段)。

由此带来的问题是:用户在更新 CloudCanal 后需要手动执行 DDL 以适应增量表结构的变化,若存在大量增量表,操作相当复杂。

为解决此问题,CloudCanal 新增 增量表结构 DIFF 能力,在任务启动时 自动生成差异 DDL 实现对增量表的自动演进

image.png

延迟判定优化

Hana 源端增量同步使用位点(增量表自增ID)来判断延迟,当位点向前推进时可准确获取延迟,但若无变更事件导致位点不更新,延迟会持续增大,实际上并未发生延迟。

为解决这一问题,本轮优化 通过查询增量表来判断是否存在延迟,具体逻辑为:

  • 若存在数据,系统根据增量数据的时间戳计算延迟。
  • 若无数据,任务获取当前时间发送心跳事件,并根据心跳上的时间戳计算延迟。

时间戳仅在重置位点时才用于数据查找,且在查找时进行时区转换处理。

Hana 1.x 的兼容

CloudCanal 之前版本只支持 Hana 2.x 版本,但是随着用户使用,我们发现一些用户还是在使用 Hana 1.x 版本。

Hana 1.x 版本的触发器和 2.x 存在一定的差异,且元信息获取逻辑也不同

本轮优化对上述差异点进行了兼容性优化,使 CloudCanal 能够比较全面的支持 Hana 1.x 和 2.x 版本的数据同步。

产品化增强

本轮优化除了内核层面增强,对产品能力和文档做了一系列优化,有效解决用户在数据源添加、任务创建等环节中常见的权限问题。

这些优化举措让用户创建迁移同步链路更加流畅,节省时间。

未来方向

更多目标链路

目前 Hana 支持的目标端有 MySQLStarrocksDoris 等,接下来的版本将打通 TiDBOceanBaseAdbForMySQL 等目标链路,这个需求主要来自于用户。

优化多字段触发的处理速度

在处理多字段表(单个表 300+ 字段)时,目前触发器的执行效率不满足预期,导致 DML 操作速度较慢,我们后续将对触发器模板进行性能优化,以提高处理速度。

总结

本文简要介绍 CloudCanal 近期对 Hana 源端数据同步的优化,以及链路未来的方向,希望对读者有所帮助。

Hana 到 Starrocks 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

SAP HANA 是由 SAP 开发的一款内存列式数据库, 具有预测分析、空间数据处理、文本分析、文本搜索、流分析、图形数据处理等高级分析功能。

HANA 内存列式数据库特性,即启动后可以把所有数据载入内存,相比传统基于硬盘的数据库,性能提升10~10,000倍。

HANA 一般内置在 SAP ERP 系统中提供服务,在制造业应用广泛。

现如今企业尝试建立统一数据分析平台,SAP HANA 保存了ERP相关数据,如何实时同步 HANA 数据到数据平台成为困扰企业的一个难题。

CloudCanal 最新版本已支持 HANA 作为源端迁移同步数据到 StarRocks 来构建实时数仓, 本文简要介绍使用 CloudCanal 快速构建一个 HANA 到 StarRocks 数据迁移同步任务

技术要点

数据同步整体流程

CloudCanal 实现 HANA 增量数据同步主要使用其触发器捕获变更事件,整体流程如下:

  • 安装触发器,通过触发器捕获增量变更数据
  • 记录位点,记录增量数据数据同步的起点
  • 执行全量数据迁移
  • 执行增量数据同步

数据捕获触发器

触发器是一种自动触发执行的存储过程,它可以在数据变更前执行也可以在数据变更后执行,因为本质也是存储过程,所以存储过程支持的操作触发器均支持。

不同数据库对触发器的支持程度不同,HANA 的触发器支持监听 I(新增)/U(更新)/D(删除) 三种事件,因此数据的所有变更都可以通过触发器捕获。

安装触发器的方式与创建存储过程类似,即通过执行 SQL 创建触发器。

通过触发器实现增量数据同步,需要触发器捕获数据的I/U/D变更事件并写入增量 CDC 数据表,数据的变更事件最终都会写到增量 CDC 数据表,执行流程如下:

其他 HANA 同步方案

目前支持同步 HANA 数据的产品还有 Informatica、Qlik 等,实现方案也是通过触发器。

因为 HANA 的触发器不能监听 DDL 变更,因此 CloudCanal 与 Informatica、Qlik 一样,都不支持DDL同步。

操作示例

准备动作

添加数据源

  • 登录 CloudCanal ,数据源管理->添加数据源 添加数据源

  • 创建源端数据源, 选择自建数据源,选择 HANA 并填写相关信息

    默认数据库: 即需要同步的数据所在数据库,常见默认数据库:SYSTEMDB、HXE、DB0

    image.png

  • 创建目标端数据源,选择自建数据源,选择StarRocks,并填写相关信息

    Client地址: CloudCanal 用其查询库表表的元数据信息,对应 StarRocks QueryPort,默认端口为 9030

    额外参数 Http地址: StarRocks 接收 streamload 的 http 请求,此处可填写 BE 节点地址,默认端口为 8040 , 如需负载均衡也可直接填写 FE节点 地址和端口,FE节点默认端口 8030

    image.png

  • 数据源创建成功 image.png

任务创建

  • 任务管理 > 创建任务 image.png

  • 源端选择 HANA 数据源,目标端选择 StarRocks 数据源,分别点击测试连接按钮并设置数据库映射关系

  • 点击下一步 image.png

  • 选择 增量同步,并且勾选 全量初始化

  • 点击下一步 image.png

  • 选择订阅的表

  • 点击下一步 image.png

  • 配置列映射

  • 点击下一步 image.png

  • 点击创建任务 image.png

  • 任务创建成功并启动后,会自动执行结构迁移、全量迁移、增量同步 image.png

总结

本文简单介绍了如何使用 CloudCanal 进行 HANA 到 StarRocks 数据迁移同步。

StarRocks 作为新兴的实时数仓产品,为传统数据业务带去更加实时、一致的体验,让数据得到更加广泛的使用,CloudCanal希望助一臂之力,让数据流动更加平滑顺畅。

Oracle 数据迁移同步优化(三)

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

CloudCanal 最近再次对其 Oracle 源端数据同步进行了一系列优化,这些优化基于用户在真实场景中的反馈,具备很强的生产级别参考意义。

本文将简要介绍这些优化项,希望带给读者一些收获。

  • 增量事件 SCN 乱序问题
  • MISSING_SCN 事件干扰
  • 新增的归档日志消费模式

优化点

增量事件 SCN 乱序问题

Oracle 源端 Logminer 数据同步原理大致如下:

  • 获取所有包含当前 SCN 位点的 Redo 或 Archive 日志文件,并添加到 Logminer 中
  • 计算本次需要分析的 SCN 范围(START_SCN, END_SCN)
  • Logminer 对于 SCN 范围进行日志分析,分析结果展现在 V$LOGMNR_CONTENTS 视图中
  • 扫描 V$LOGMNR_CONTENTS 视图,转换处理后同步到目标端

image.png

老版本 CloudCanal 扫描 V$LOGMNR_CONTENTS 视图时指定了 SCN 范围进行查询,但在实际客户场景中偶发 SCN 乱序问题

同时 Oracle 官方也建议查询视图时不要进行过多的范围过滤或排序处理,以避免查询结果乱序。

因此我们首先 进行了 2 个优化 ,以此解决该问题:

  • 扫描 V$LOGMNR_CONTENTS 视图时直接查询所有记录,其 SCN 范围完全依赖于 Logminer 所指定的文件
  • 设定 Logminer 分析的步长参数(logMiningScnStep)控制分析性能

MISSING_SCN 事件干扰

使用 Logminer 分析 Redo 日志时,有时会出现 MISSING_SCN 事件,老版本 CloudCanal 遇到该事件则会忽略,但这会导致事件漏扫从而丢数据。

MISSING_SCN 事件具体意义为

  • Logminer 分析 Redo 日志时,由于日志切换或其他特殊情况,导致部分 SCN 事件没有被 Logminer 分析到,因此在 V$LOGMNR_CONTENTS 视图中体现为 MISSING_SCN。

因此我们做了 第 3 个优化,当遇到 MISSING_SCN 事件时采取一定的策略规避漏扫问题,具体动作为:

  • 停止扫描,回退当前 SCN
  • 根据当前 SCN 重新分析和消费日志文件

image.png

重新分析后,缺失的 SCN 记录会被 Logminer 分析到,并且此类型事件出现频率较小,因此对同步效率影响非常小。

归档日志消费模式

Logminer 分析 Redo 日志时,如果 END_SCN 与最新 SCN 接近,可能会导致部分 SCN 无法被 Logminer 分析,从而出现数据丢失。

这种情况难以避免,因为很难在 Logminer 层面确定是否有 SCN 被漏掉。

CloudCanal 老版本通过设置 fallBackScnStep 参数与最新的 SCN 保持一定距离,这种做法虽牺牲了一部分实时性,但换取了数据的准确性,而该方式和 只消费归档日志模式 有一定的相似性。

归档日志不会再发生变化,从而能够保证 Logminer 分析的准确性,对于不太注重实时性的业务(比如日报),这是一个可接受的方式(增量同步的好处不光只是实时性)。

CloudCanal 第 4 个优化 即增加了只消费归档日志模式(参数:archiveLogOnlyMode)。

在该模式下, 同步任务会根据 Archive 日志文件 + SCN 双位点 的方式,以 Archive 生成的时间顺序逐个消费,这样可以保证不漏扫任何一个 Archive 文件。

image.png

未来展望

优化性能

本次优化侧重于数据的准确性,优化了 SCN 乱序问题MISSING_SCN 问题,但部分高并发场景回退 SCN 可能会导致性能下降。

所以优化性能是后续 CloudCanal Oracle 数据同步重要的一个方向。

数据订正能力

Oracle 部署形态多样,用户场景不一,数据类型复杂,在做足事前防范工作之后,事后如何补救也是非常重要的能力。

借助 CloudCanal 数据校验订正体系,后续丰富和优化 Oracle 源端数据校验和订正能力是一个重要的工作。

总结

本篇文章主要介绍 CloudCanal 对于 Oracle 源端数据同步的深度优化,希望对读者有所帮助。

MySQL 到 GaussDB 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简介

CloudCanal 近期开放了 MySQL -> GaussDB for MySQL /OpenGauss 数据链路,本篇文章将简要概述 CloudCanal 对于 GaussDB for MySQL/OpenGauss 目标数据迁移同步的支持。

功能介绍

结构迁移类型自动处理与优化

不同数据库对于数据类型支持存在差异,CloudCanal 结构迁移时会进行类型自动转换与优化。

例如:在 MySQL 中可以定义的 VARCHAR(0) 数据类型,在 OpenGauss 中不支持,CloudCanal 结构迁移时会自动将源端 MySQL 的 VARCHAR(0) 类型映射为 VARCHAR(1)

自定义数据处理

用户在迁移、实时同步期间如需要对传输的数据行进行自定义的加工可以采用 CloudCanal 提供的自定义数据处理能力,这对于实时宽表构建、新增动态列、基于微服务、缓存的数据清洗等数据处理场景都非常有帮助。关于更多自定义数据的使用方式可以参考:数据处理插件使用方式

支持高性能写入模式

CloudCanal 中默认采用 OpenGauss的驱动通过JDBC的方式进行批量写入。如果用户对性能要求很苛刻,可以尝试开启基于Copy模式的高性能写入模式。在Copy写入模式下,写入性能相比采用JDBC的方式有很大的提升。

可视化创建

CloudCanal 创建 GaussDB for MySQL/OpenGauss 数据迁移同步任务是完全可视化的,通过获取数据库元数据,让用户在 浏览器页面上即可决定哪些库、表、列进行迁移同步等。

自动化流程

GaussDB for MySQL/OpenGauss 数据迁移同步任务创建后,CloudCanal 将自动流转各个阶段的任务,用户无需干涉,直达数据实时同步状态。

监控图表支撑

CloudCanal 为 GaussDB for MySQL/OpenGauss 数据迁移同步任务提供了多个实用监控指标,包括增量缓存RPS增量缓存延迟(ms)内存队列数据个数等,当调优任务性能或排查任务异常原因时,监控指标提供了很好的判断依据。

告警支持

CloudCanal 为 GaussDB for MySQL/OpenGauss 数据迁移任务提供了包括钉钉/企业微信/飞书/自定义等 webhook 类型告警,对于企业级客户,可额外选择邮件,以及短信告警,实时保障同步任务的高可用。

简单示例

本示例以将数据从 MySQL 数据库同步到 GaussDB for OpenGauss 数据库为操作案例,以便更好地说明 CloudCanal 在不同数据库之间进行数据同步的能力。

准备动作

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备好 MySQL 数据库(本例使用 8.0 版本)和 GaussDB for OpenGauss 数据库(本例使用 5.0 版本)
  • 登录 CloudCanal 平台 ,添加 GaussDB for OpenGauss 和 MySQL

任务创建

  • 任务管理 -> 新建任务

  • 测试连接并选择 源 和 目标 数据库

  • 点击下一步

  • 选择 数据同步,并勾选全量数据初始化以及开启一次性校验,其他选项默认

  • 选择需要迁移同步的表

  • 确认创建任务

  • 任务自动执行结构迁移、全量同步和增量同步,执行数据校验,结果显示数据校验通过

总结

本文主要介绍了 CloudCanal 支持 GaussDB for MySQL/OpenGauss 目标端数据迁移同步功能,通过这个能力,用户可以便利地将数据实时同步到 GaussDB for MySQL/OpenGauss 数据库,实现数据更广泛、更实时的应用。

SQL Server 到 StarRocks 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

本篇文章主要介绍如何使用 CloudCanal 构建一条 SQLServerStarRocks 的数据同步链路。

技术点

源端SQLServer基于CDC代理

当数据库启用 CDC 能力后,SQL Server 代理上会生成一个专门分析ldf文件的作业,再将具体的表启用 CDC, 则该作业开始持续分析文件中的变更事件到指定的表中。

image.png

写入StarRocks采用StreamLoad导入方式

CloudCanal 采用了 StarRocks StreamLoad 方式进行导入,源端数据和变更转成字节流,以通过 HTTP 协议批量写入 StarRocks。

基于 StreamLoad 方式,写入对端的操作均为 INSERT,CloudCanal 自动将 INSERT / UPDATE / DELETE 转成 INSERT 语句,并填入 __op 值(删除标识符),StarRocks 将自动进行数据合并。

准备工作

SQL Server源端准备工作

  • 源库需要启用 CDC 执行命令

    • 建议使用 sa 账号,启用 CDC 需要 sysadmin 权限
    • 切换数据库,可以采用以下命令(假设你的数据库名称为example)
    use example;
  • 执行启用 CDC 功能 可以执行如下命令

    exec sys.sp_cdc_enable_db
  • 准备一个 CloudCanal 同步账号,并为这个账号授权(db_ownerpublic 权限) image.png image.png

  • 确认 SQL SERVER 代理是启用状态 image.png

  • 源端 SQL Server 实例中待同步的表需具备主键(CloudCanal 在创建同步任务的时候会帮你自动选择有主键的表)

StarRocks 对端准备工作

  • StarRocks 最高支持版本为:2.4.0
  • Cloudcanal 添加StarRocks 数据源 image.png
    • Client 地址
      • MySQL 协议端口,用于查询元数据,对应 StarRocks QueryPort,默认为 IP:9030
    • Http 地址
      • Stream Load 导入数据用途,对应 StarRocks HttpPort,默认为 IP:8030

注意事项

  • SQL SERVER 作为源端结构迁移仅中支持 Schema、Table 迁移。

SQL Server -> StarRocks 的数据类型支持

CloudCanal 结构迁移和数据迁移同步时会自动进行数据类型映射。详情见下表:

SQL ServerStarRocks
BIGINTBIGINT
BINARYNot Supported
BITTINYINT
CHARCHAR
DATEDATE
DATETIMEDATETIME
DATETIME2DATETIME
DATETIMEOFFSETDATETIME
DECIMALDECIMAL
FLOATFLOAT
GEOGRAPHYSTRING
GEOMETRYSTRING
HIERARCHYIDNot Supported
IMAGENot Supported
INTINT
MONEYFLOAT
NCHARCHAR
NTEXTSTRING
NUMERICDECIMAL
NVARCHARVARCHAR
REALDOUBLE
ROWVERSIONLARGEINT
SMALLDATETIMEDATETIME
SMALLINTSMALLINT
SMALLMONEYFLOAT
SQL_VARIANTNot Supported
TEXTSTRING
TIMESTRING
TIMESTAMPLARGEINT
TINYINTSMALLINT
UNIQUEIDENTIFIERVARCHAR
VARBINARYNot Supported
VARCHARVARCHAR
XMLSTRING
sysnameVARCHAR

操作示例

前置条件

  • 登录ClouGence官网 下载私有部署版,使用参见快速上手文档

  • 准备一个 SQL Server 数据库,和 StarRocks 实例(本例分别使用自建 SQL Server 2016 和 StarRocks 2.4.0)

  • 登录 CloudCanal 平台 ,添加 SQL Server 和 StarRocks image.png

  • 创建一条 SQL Server -> StarRocks 链路作为增量数据来源

任务创建

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库

  • 点击下一步 image.png

  • 选择 增量同步,其它默认 image.png

  • 此时如果 SQL Server 上数据库还没有启用 CDC 功能,则会在点击下一步的时候提示如何启用 CDC。只要按照提示的参考语句执行即可。 image.png

  • 选择需要迁移同步的 image.png image.png

  • 确认创建任务 image.png

  • 任务自动做结构迁移全量迁移增量同步 image.png

常见问题

数据不同步了都有那些情况?

  • SQL Server CDC 需要依赖 SQL Server 代理,首先要确定 SQL Server 代理服务是否启动
  • 表在启动 CDC 的时候会确定要捕获的列清单,此时如果修改列的类型可能会导致 CDC 中断。目前解决办法只能重建任务。
  • 增/减 同一个列名的列,对一个列删除后在增加。虽然 CDC 表中字段依然存在但是也会导致整个 CDC 中断。

什么情况下会影响稳定的数据同步?

  • 如果任务在同步期间出现了异常导致任务延迟。这时候需要格外注意,如果过长时间的延迟,即便是修复了延迟的问题(比如对端数据库长时间出现不可用)在后续数据同步上也可能存在丢失数据的风险。
  • SQL Server 为了防止 CDC 表数据无限膨胀 SQL Server 会每天定时执行清理作业,清理超过 3天的数据。
  • 为了增加延迟的容忍度可以执行这条 SQL 来增加 CDC 数据的保存时间,代价是这些数据需要存放到数据库表中,如果每日数据变更很多对磁盘开销会有额外的要求。
    • execute sys.sp_cdc_change_job @job_type = n'cleanup', @retention = 4320
    • msdb.dbo.cdc_jobs 表中保存了具体 捕获任务的数据保存时间。

总结

本文简单介绍了如何使用 CloudCanal 进行 SQL Server -> StarRocks 数据迁移同步。

Oracle 到 PostgreSQL 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

本篇文章主要介绍如何使用 CloudCanal 构建一条 OraclePostgreSQL 的数据同步链路。

技术要点

缩小的数据库权限要求

CloudCanalOracle 数据库的高权限要求,主要来自两个面向 DBA 的操作,自动构建字典自动切换归档日志,这两个操作主要是让用户使用更加自动化和便利,但是问题也比较明显,对数据库运维标准严苛的客户来说,这些权限对于我们的客户是没有的,所以新版本 CloudCanal ,通过参数配置,支持了关闭自动字典构建能力(默认打开)关闭自动切换归档日志能力(默认关闭)

多版本 schema 以支持位点回拉

对于关系型数据库同步工具而言,增量数据本身往往和元数据分离,也就是消费到的增量数据和即时从数据库里面获取的元数据不一定匹配(两个时间点之间有DDL),所以维持一个多版本的元数据以应对增量数据解析是必要的, CloudCanal 以每天的 schema dump 为基准,辅以到当前位点的 DDL 语句列表,可构建出任何时间点的元数据(实际上是更加精确的 scn 位点),单个 DDL 前后的数据变更事件,能够精确匹配到相对应的元数据进行解析, CloudCanal 才有可能在此版本产品上提供了回拉位点重复消费一段时间增量数据的能力

支持的版本

源端 Oracle 支持的版本:10.X11.X12.X18.X19.X

对端 PostgreSQL 支持的版本:8.49.09.19.29.39.49.59.610.X11.X12.X13.X14.X15.X16.X17.X

支持的DDL&数据类型映射

  • Oracle -> PostgreSQL 链路支持的DDL暂时只有 ALTER TABLE ,后续我们将不断进行完善
  • CloudCanal 结构迁移和数据迁移同步时会自动进行数据类型映射

类型映射见下表:

Oracle 字段类型PostgreSQL 字段类型
CHAR、NCHAR、VARCHAR2、NVARCHAR、NVARCHAR2、ROWID、HTTPURITYPECHARACTER_VARYING
LONG、CLOB、NCLOBTEXT
NUMBER_BIGINTBIGINT
NUMBER_DECIMAL、BINARY_FLOAT、BINARY_DOUBLENUMERIC
FLOATREAL
DATE、TIMESTAMPTIMESTAMP_WITHOUT_TIME_ZONE、TIMESTAMP_WITHOUT_TIME_ZONE
TIMESTAMP_WITH_TIME_ZONE、TIMESTAMP_WITH_LOCAL_TIME_ZONETIMESTAMP_WITH_TIME_ZONE
XMLTYPEXML
信息

针对于 Oracle -> PostgreSQL 链路,源端 Oracle 不在上表的字段类型暂时不支持。

操作示例

准备工作

添加数据源

  • 登录 CloudCanal 平台

  • 数据源管理 -> 新增数据源 image.png

  • 选择自建数据库 -> 选择对应数据库 -> 输入相关信息 -> 测试连接-> 新增数据源 image.png

信息

Oracle 相较于其他数据源有一些额外的参数可以调整

  • logminerUser:ORACLE源端增量任务使用redo解析(logminer)方式时使用的账号,需要CDB类型用户
  • logminerPasswd:ORACLE源端增量任务使用redo解析(logminer)方式时使用的账号密码
  • logminerConnectType:ORACLE源端增量任务使用redo解析(logminer)方式时使用的连接方式,目前支持ORACLE_SID或ORACLE_SERVICE模式
  • logminerSidOrService:ORACLE源端增量任务使用redo解析(logminer)方式时使用的连接标识符,和logminerConnectType参数配合使用,ORACLE_SID连接方式,则填写sid,ORACLE_SERVICE连接方式,则填写service name
  • 添加 OraclePostgreSQL 之后可以在数据源列表中看到新增的数据源 image.png

创建同步任务

  • 任务管理 -> 创建任务

  • 源端选择 Oracle 数据源,对端选择 PostgreSQL数据源

  • 分别点击测试连接,选择源端和对端需要订阅的数据库,选择下一步 image.png

  • 选择 全量迁移 -> 勾选 增量同步 -> 根据自身机器配置选择 任务规格

  • 选择 下一步 image.png

  • 选择源端需要同步的表,如果目标表显示橙色表示对端不存在该表,任务创建之后,会自动生成该表

  • 点击 下一步 image.png

  • 可以在左侧选择添加 数据过滤条件

  • 点击 下一步 image.png

  • 点击 创建任务 image.png

任务执行

任务创建并且启动后,会自动进行如下的三个阶段:

  • 结构迁移:任务创建之后,如果对端没有表结构,那么 CloudCanal 会去自动在对端创建表结构
  • 全量迁移:将源端存量数据整体迁移到对端
  • 增量同步:全量迁移期间以及全量完成以后的源端增量数据变更会实时同步到对端 image.png image.png

总结

本文简单介绍了如何使用 CloudCanal 进行 Oracle -> PostgreSQL 数据迁移同步。

Redis 数据迁移同步优化

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

CloudCanal 前一段时间支持了 Redis 到 Redis 数据迁移同步能力,并支持其双向同步,但是支持的指令种类有限。

随着用户使用,指令支持不全面成为一个比较大的问题,所以最近的版本,我们对此能力,结合用户实际碰到的问题,进行了新一轮优化。

此轮优化的特点是:

  • 增加数据初始化的类型
  • 增加数据同步的指令种类
  • 双向同步策略优化

本文简要介绍以上优化点,并展望该链路未来的研发方向,希望对用户使用有所帮助。

优化点

指令支持范围提升

Redis 到 Redis 数据同步在原先支持 Set、Hset、Del、Hdel、Expire 五种指令的基础上,额外支持了 Sadd、Zadd、LPushRename、Incr、Incrby、Hsetnx、Hmset、Lpush、Rpush、Lset、Zrem、Zremrangebyscore 12 种命令。

数据初始化(FULL SYNC)在原先支持 String、Hash 类型基础上,额外支持 List、Set、ZSet 三种类型。

数据同步指令种类的丰富,以及数据初始化类型多样化,让此链路具备更好的可落地性。

指令优化

对于 List 类型数据初始化,因获取的成员数据默认倒序,CloudCanal 会对其内部成员进行 重新排序,然后使用 RPush 命令将数据统一写入目标端,确保源、目标数据准确。

对于 Expire 指令,数据同步如果直接应用到目标端,会导致目标端数据过期晚于源端,因此 CloudCanal 在处理这类命令时,将 Expire 转换为绝对时间命令 PExpireAt,从而实现源对端 Key 同时过期。

image.png

双向同步防循环优化

优化前,Redis -> Redis 双向同步防循环策略采用 辅助指令进行判定,当收到正常指令,计算 hash 值,构建辅助指令,以反查辅助指令 key 是否存在进行数据过滤。

这种策略存在以下问题:

  • 辅助指令过期时间不好把控
  • 辅助指令会占用内存空间
  • 辅助指令未过期会导致同样的命令失效

而此次优化,总体延用辅助指令策略,但采用 删除辅助指令 判定是否进行数据过滤,虽然会导致性能下降,但能解决上述三个问题,在缓存写入量不大的情况下,效果良好(缓存常用场景读多写少)。

新策略的好处:

  • 无需考虑辅助指令过期问题
  • 能支持大部分指令防循环(Incr、IncrBy等)

image.png

指令解析 / 处理优化

本次优化对于指令解析、处理结构进行进一步重构,单个指令需要实现的逻辑更加清晰、独立,对于新指令添加,更加方便,为后续加快指令集支持打好基础。

演进方向

丰富指令集

CloudCanal 目前支持 17 种 Redis 常用指令,但仍然只是 Redis 指令集冰山一角,后续我们将支持更多 Redis 指令同步(单向 / 双向)。

数据校验和订正

目前 Redis 到 Redis 链路 暂未开通数据校验和订正功能,对于数据迁移同步产品,事后补救和事前防范(同步逻辑的严谨性)同等重要,后续我们将补上这部分能力。

单独全量功能

目前 Redis 到 Redis 链路并没有开通单独的全量迁移能力,而是在增量任务中顺带完成全量迁移(FULL SYNC),对于部分用户,需要针对该链路进行定时备份(周期性全量迁移),此功能是需要的。

总结

本文简要介绍了 CloudCanal 近期对于 Redis 到 Redis 单向和双向同步链路的优化,并展望该链路未来的研发方向,希望能为用户构建在线数据生态和数据应用发挥一定的作用。

跨互联网数据互通(HTTP)

· 阅读需 7 分钟
Barry
Chief Technology Officer

简介

CloudCanal 实现的 基于 Kafka 构建安全的跨互联网数据同步 方案被客户用于生产后,又出现了新的需求,主要集中在方案能否更加轻量化和可控性上,简而言之,去掉 Kafka 中转,直接在 CloudCanal 中实现跨网络安全互通。

本篇文章即介绍 CloudCanal 实现的更加轻量化方案,特点包括

  • 无消息等独立软件依赖
  • 两端数据库完全不开放公网端口
  • 两端数据库元数据可映射
  • 基于 HTTPS 传输
  • 具备用户名密码鉴权机制
  • 支持多种数据库异构互通

技术点

image.png

Tunnel数据源

去掉消息依赖的跨互联网数据库互通,我们是通过一个虚拟的数据源 Tunnel 实现。 Tunnel 数据源本身并不是实体数据库,而是一组逻辑信息,包括

  • ip(或域名)
  • port
  • 用户名
  • 密码
  • TLS 证书文件和密码
  • 元数据

通过这个虚拟数据源,我们可以使用 HTTP(S) 或 TCP 实现数据拉取或者接收数据的目的,同时完全匹配 CloudCanal 业务模型,达到功能的完整性。

PUSH模型

对于数据传输模式 PUSH 或 PULL,我们选择了 PUSH 模式,即客户端将数据推送到服务端,本质原因在于

  • 主要解决互通问题,而非订阅问题
  • 目标端同步写入数据更加匹配 CloudCanal 其他目标端风格
  • 数据通道无数据持久化,无需维护 store 来暂存数据

当然,PUSH 模式也带来一些问题,包括

  • 如何确保最终数据写入再提交位点
  • 位点回溯复杂(全量和增量、不同数据源位点格式不一致)

对于上述两个问题,我们采用 延迟提交位点技术 解决

延迟提交位点技术

采用 PUSH 模式后,位点管理是比较复杂且危险的工作,如果提早提交位点,可能丢数据。 为此,我们实现了 延迟提交位点技术,即客户端每一次写入 server 端,只返回已经提交的位点,并且将所有数据源、所有任务类型的位点序列化成 json 字符串。

通过这项技术,我们能够确保位点之前的数据肯定已经写到对端,并且在某些业务场景下,通过客户端任务的位点回溯,达到重复消费某一段时间数据的目的。

元数据映射

因为使用了虚拟的 Tunnel 数据源,并且其带有 schema(存储于CloudCanal kv配置表中), 所以针对这个数据源,我们模拟了表结构获取和迁移的过程,让其在任务创建和运维过程中如同一个真实存在的数据库。 一个真实的 Tunnel 数据源的元数据如下:

[
{
"db": "cc_virtual_db",
"schemas": [
{
"schema": "cc_virtual_schema",
"tables": [
{
"table": "WORKER_STATS",
"columns": [
{
"name": "ID",
"jdbcType": -5,
"typeName": "LONG",
"key": true
},
{
"name": "GMT_CREATE",
"jdbcType": 93,
"typeName": "TIMESTAMP",
"key": false
},
{
"name": "AUCRDT",
"jdbcType": 93,
"typeName": "TIMESTAMP",
"key": false
}
]
},
{
"table": "KBS_QUESTION",
"columns": [
{
"name": "ID",
"jdbcType": -5,
"typeName": "LONG",
"key": true
},
{
"name": "CATEGORY",
"jdbcType": 12,
"typeName": "STRING",
"key": false
}
]
}
]
}
]
}
]

我们可以通过结构迁移 (MySQL/SQLServer/Oracle -> Tunnel) 扩充它,或者直接通过 数据源管理->更多->查看配置->_**dbsJson **_进行修改。

操作示例

本示例使用阿里云资源模拟杭州 RDS for MySQL 到深圳 RDS for MySQL , 两端数据库均不开公网端口,数据走互联网, 采用 HTTPS 传输和用户名密码认证。

环境准备

  • 杭州环境部署 CloudCanal ,并购买 RDS for MySQL 作为源端 blog/tech_share/http_sync_3 blog/tech_share/http_sync_4

  • 深圳环境部署 CloudCanal , 并购买 RDS for MySQL 作为目标端 blog/tech_share/http_sync_1 blog/tech_share/http_sync_2

  • 因 CloudCanal 为 docker 版本 ,深圳环境 CloudCanal 安装包解压后 ,需要修改 docker-compose.yml 端口映射再安装/升级,并开放 ECS 安全组相关端口,以便远程连接

  • 此例以 18443 端口作为 Tunnel 数据源监听端口 blog/tech_share/http_sync_5 blog/tech_share/http_sync_6

为目标端数据库初始化元数据

  • 因无法通过 Tunnel 到对端数据库做结构迁移,所以需要事先使用 mysqldump 等工具初始化对端数据库结构

添加 Tunnel 数据源

  • 分别在源端和目标端 CloudCanal 配置 Tunnel 数据源 blog/tech_share/http_sync_7

  • 源端数据源列表 blog/tech_share/http_sync_9

  • 目标端数据源列表
    blog/tech_share/http_sync_8

为 Tunnel 初始化元数据

  • 源端创建一个 MySQL -> Tunnel 结构迁移,并完成 blog/tech_share/http_sync_10 blog/tech_share/http_sync_11

  • 从源端 Tunnel 数据源拷贝结构并复制到目标端 blog/tech_share/http_sync_12 blog/tech_share/http_sync_13 blog/tech_share/http_sync_14

目标端任务创建

  • 选择 Tunnel 和 目标数据库 blog/tech_share/http_sync_15

  • 选择数据同步 blog/tech_share/http_sync_16

  • 选择表、列、映射略

  • 任务正常运行,监听端口并准备接收数据 blog/tech_share/http_sync_17

源端任务创建

  • 选择源端数据库 和 Tunnel 数据源 blog/tech_share/http_sync_18

  • 选择数据同步,并初始化数据 blog/tech_share/http_sync_19

  • 数据持续同步中 blog/tech_share/http_sync_20

数据验证

造增量数据

  • 为了造数据简便,开下源端数据库公网地址 blog/tech_share/http_sync_21

数据校验

  • 在深圳环境添加源端数据源,并做数据校验。结果显示数据一致。 blog/tech_share/http_sync_22 blog/tech_share/http_sync_23

常见问题

  • 目前支持哪些链路的互通?

    • MySQL/SQLServer/ORACLE -> MySQL , 其他互通按需添加。
  • Tunnel 到对端数据库能做结构迁移么?准备表结构比较麻烦

    • 因为数据库结构对元数据精度要求很高,Tunnel中间结构主要为同步服务,所以元数据级别上还无法构成精确的结构迁移源端。建议构建临时实例(只dump表结构)并开公网,再使用CloudCanal结构迁移解决问题。
  • Tunnel 数据源有结构,能动态编辑么?

    • Tunnel 数据源模拟了一个数据库,编辑任务能力天然具备。加表先编辑目标端任务,再编辑源端任务,否则反之。我们后续计划用一篇专门的文章介绍这个运维操作。
  • 目前数据互通还存在什么问题?

    • 对于 blob 等字段类型还需要进一步支持和验证
    • 跨互联网,性能层面需要经过特别的优化
    • 安全层面,目前仅用到 HTTPS 证书加密,配合自定义的账号密码

总结

本文主要介绍纯粹通过 CloudCanal 进行数据互通实践,通过引入虚拟数据源,达成数据互通和元数据映射等能力,具备不错的可落地性。

宽表实时构建案例

· 阅读需 11 分钟
Barry
Chief Technology Officer

作者介绍

蒋鹏程,苏州万店掌软件技术有限公司

前言

CloudCanal 近期提供了自定义代码构建宽表能力,我们第一时间参与了该特性内测,并已落地生产稳定运行。开发流程详见官方文档 《CloudCanal自定义代码实时加工》

能力特点包括:

  • 灵活,支持反查打宽表,特定逻辑数据清洗,对账,告警等场景
  • 调试方便,通过任务参数配置自动打开 debug 端口,对接 IDE 调试
  • SDK 接口清晰,提供丰富的上下文信息,方便数据逻辑开发

本文基于我们业务中的实际需求(MySQL -> ElasticSearch 宽表构建),梳理一下具体的开发调试流程,希望对大家有所帮助。

使用案例

案例一:商品表和SKU宽表行构建

业务背景

在对接用户的小程序进行商品搜索时,需要如下几个能力

  1. 基于分词的全文索引
  2. 同时搜索不同表中的字段

需要全文索引的初衷是希望用户搜索商品的关键词就可以搜索到想要的商品。这在传统数据库中一般支持的都比较弱甚至不支持,因此需要借助 ES 分词器搜索

而第二个能力主要是由于业务数据通常分布在多个表中,但是 ES 并不能像需要关系型数据库那样联表查询,CloudCanal 自定义代码的能力则整号解决了我们多表关联的痛点。

业务流程

在使用 CloudCanal 总体的流程变得十分清晰,在 CloudCanal 层面通过订阅表结合自定义代码中的反查数据库以及数据处理,可以直接生成可以写到对端 ES 的宽表行。 17a3934d-3cb8-4682-9dc0-12d08ab69c8e-image.png

表结构

准备的 mysql 表结构如下,一个商品会对应多个 SKU,我们在对端创建好索引,其中的 sku_detail 保存一个商品关联的 SKU 信息,是一个典型的一对多场景。

ES mapping 中的字段对应主表 tb_enterprise_goods 中字段,额外新增的 sku_detail 字段就是我们需要从子表 tb_enterprise_sku 中同步的数据。

## 商品表
CREATE TABLE `tb_enterprise_goods` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(64) NOT NULL DEFAULT '' COMMENT '商品名称',
`enterprise_id` int(11) NOT NULL DEFAULT '0' COMMENT '企业id',
`goods_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商家商品编号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9410 DEFAULT CHARSET=utf8mb4;
## SKU表
CREATE TABLE `tb_enterprise_sku` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`enterprise_goods_id` int(11) NOT NULL COMMENT '企业商品id',
`name` varchar(255) NOT NULL DEFAULT '' COMMENT 'sku{1:2,2:1}',
`sku_no` varchar(255) DEFAULT '' COMMENT '商品sku编码',
`scan_goods` varchar(255) CHARACTER SET utf8 NOT NULL DEFAULT '' COMMENT 'sku条形码',
PRIMARY KEY (`id`),
) ENGINE=InnoDB AUTO_INCREMENT=14397 DEFAULT CHARSET=utf8mb4 COMMENT='企业 sku';

ES 索引如下:

      "enterprise_id": {
"type": "integer"
},
"goods_no": {
"type": "text",
"analyzer": "custom_e",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"id": {
"type": "integer"
},
"name": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"fields": {
"standard": {
"type": "text",
"analyzer": "standard"
},
"keyword":{
"type": "keyword"
}
},
"fielddata": true
},
"sku_detail": {
"type": "nested",
"properties": {
"id": {
"type": "integer"
},
"sku_no": {
"type": "text",
"analyzer": "custom_e",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"scan_goods": {
"type": "text",
"analyzer": "custom_e",
"fields": {
"keyword": {
"type": "keyword"
}
}
}

注:为了方便大家理解,此处表字段进行了缩减

自定义代码工作流程

36f7adfb-ece5-4411-9926-d2cd0262aa3e-image.png

自定义代码源码

public List<CustomRecord> addData(CustomRecord customRecord, DataSource dataSource) {
List<CustomRecord> customRecordList=new ArrayList<>();
String idStr = (customRecord.getFieldMapAfter().get("id")).toString();
List<EnterpriseSku> enterpriseSkuList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));
if (enterpriseSkuList.size() > 0) {
Map<String, Object> addFieldValueMap = new LinkedHashMap<>();
addFieldValueMap.put("sku_detail", JSONArray.parseArray(JSON.toJSONString(enterpriseSkuList)));
RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap);
}
customRecordList.add(customRecord);
return customRecordList;
}

public List<CustomRecord> updateData(CustomRecord customRecord, DataSource dataSource) {
List<CustomRecord> customRecordList=new ArrayList<>();
String idStr = (customRecord.getFieldMapAfter().get("id")).toString();
List<EnterpriseSku> enterpriseSkuList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));
if (enterpriseSkuList.size() > 0) {
Map<String, Object> addFieldValueMap = new LinkedHashMap<>();
addFieldValueMap.put("sku_detail", JSONArray.parseArray(JSON.toJSONString(enterpriseSkuList)));
RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap);
}
customRecordList.add(customRecord);
return customRecordList;
}

private List<EnterpriseSku> tryQuerySourceDs(DataSource dataSource, Integer id) {
try(Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement("select * from `live-mini`.tb_enterprise_sku where is_del=0 and enterprise_goods_id=" + id)) {
ResultSet resultSet = ps.executeQuery();
BeanListHandler<EnterpriseSku> bh = new BeanListHandler(EnterpriseSku.class);
List<EnterpriseSku> enterpriseSkuList = bh.handle(resultSet);
return enterpriseSkuList;
} catch (Exception e) {
esLogger.error(e.getMessage());
return new ArrayList<>();
}
}

思路

customRecord 对象即自定义代码传入的参数,传入的 id 为子表 tb_enterprise_sku 的外键 enterprise_goods_id,查询出子表关于这个外键的所有数据,放入 addFieldValueMap 中,再利用源码提供的方法RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap),对 customRecord 进行加工。

创建任务步骤

新建源端对端数据源 660b46e7-f736-49ff-b368-63b88cb5dbb7-image.png 选择订阅表及同步到对端的索引 84454765-03f7-4b44-9304-679bb044f380-image.png 选择同步字段,选择自定义包 fcacf1a8-24c1-49e5-9d86-1f19835fbbab-image.png 完成创建任务

实现效果

{
"_index" : "live-mini_pro_enterprise_goods_sku_view",
"_type" : "_doc",
"_id" : "17385",
"_score" : 12.033585,
"_source" : {
"img" : "https://ovopark.oss-cn-hangzhou.aliyuncs.com/wanji/2020-11-30/1606786889982.jpg",
"category_name" : "无类目",
"is_grounding" : 1,
"del_time" : "2021-11-01T17:13:32+08:00",
"goods_no" : "",
"distribute_second" : 0.0,
"uniform_proportion" : 0,
"description" : "赠送私域直播流量转化平台万集&线上商城",
"video" : "",
"self_uniform_proportion" : 0,
"update_time" : "2021-11-01T17:13:32+08:00",
"allocate_video" : null,
"self_commission_properation" : 0.0,
"category_id" : 0,
"is_promote" : 0,
"price" : 0.03,
"is_distributor_self" : 0,
"limit_purchases_max_quantity" : 0,
"limit_purchases_type" : 0,
"is_del" : 0,
"is_distributor" : 0,
"activity_price" : 0.0,
"id" : 17385,
"stock" : 0,
"distribute_first" : 0.0,
"is_distribution_threshold" : 0,
"refund_configure" : 1,
"create_time" : "2021-11-01T17:13:32+08:00",
"scan_goods" : "",
"limit_purchases_cycle" : 0,
"is_sku" : 1,
"allocate_mode" : 0,
"sku_detail" : [
{
"scan_goods" : "",
"sku_no" : "",
"id" : "19943"
}
],
"enterprise_id" : 24,
"is_delivery" : 0,
"is_limit_purchases" : 0,
"name" : "测试商品测试商品测试商品测试商",
"goods_type" : 0,
"goods_order" : 0,
"ts" : "2021-11-01T17:16:42+08:00",
"delivery_price" : 0.0
}
}

案例二:订单表、商品表宽表构建

业务背景

小程序商城中需要展示猜你喜欢的商品,对猜你喜欢商品是根据用户购买商品的频率来决定,主要涉及订单表,订单商品表,用户表,商品表等,使用ES 查询同样面临多表无法 join 的问题,本案例中依然采用 CloudCanal 自定义代码同步为扁平化数据。

业务原使用技术及问题

同步 ES 的方案原先使用 logstash 的方式全量同步数据,由于数据量的问题,同步数据放在每日的凌晨,带来的问题为,数据同步不及时,并且只能是全量风险比较高。多次出现删除索引数据后并没有同步的情况。

表结构

CREATE TABLE `tb_order` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`order_sn` varchar(32) NOT NULL COMMENT '订单编号',
`user_id` int(11) NOT NULL COMMENT '用户 id',
`user_name` varchar(255) DEFAULT NULL COMMENT '用户名称',
`user_phone` varchar(11) DEFAULT NULL COMMENT '用户电话',
`store_id` int(11) NOT NULL COMMENT '门店 id',
`enterprise_id` int(11) DEFAULT '1' COMMENT '企业id',
`order_type` int(11) NOT NULL COMMENT '0:快递配送;1:门店自取; 2:美团配送即时单; 3:美团即时配送预约单;',
`order_status` tinyint(11) DEFAULT '0' COMMENT '原订单状态:1:未付款,3:待发货/待打包,5:(待收货/待取货),6:交易完成,7:订单失效,8:交易关闭, 13:用戶取消,18:商家强制关闭,19同意退款但是退款失敗(未用到),30:美团即时配送状态异常',
`total_price` decimal(10,2) DEFAULT '0.00' COMMENT '订单总价',
PRIMARY KEY (`id`,`total_goods_weight`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=18630 DEFAULT CHARSET=utf8mb4 COMMENT='订单表';

CREATE TABLE `tb_order_goods` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL COMMENT '用户 id',
`order_id` int(11) NOT NULL COMMENT '订单 id',
`goods_id` int(11) NOT NULL COMMENT '订单商品 id',
`enterprise_goods_id` varchar(11) DEFAULT NULL COMMENT '企业商品id',
`name` varchar(512) DEFAULT '' COMMENT '订单商品名称',
`spec` varchar(100) DEFAULT NULL COMMENT '规格属性',
`img` varchar(100) DEFAULT '' COMMENT '订单商品图片',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19159 DEFAULT CHARSET=utf8mb4 COMMENT='订单商品表';

ES 索引字段

"store_id":{
"type": "integer"
},
"user_id":{
"type": "integer"
},
"sex":{
"type": "integer"
},
"birthday":{
"type": "keyword"
},
"goods_name":{
"type": "text",
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart",
"fields": {
"keyword":{
"type": "keyword"
}
},
"fielddata": true
},
"goods_type":{
"type": "integer"
},
"order_goods_id":{
"type": "integer"
},
"enterprise_goods_id":{
"type": "integer"
},
"goods_price":{
"type": "double"
},
"order_id":{
"type": "integer"
},
"order_create_time":{
"type": "date"
}

注:ES表结构中涉及多张表,为了方便举例,这边只贴出2张表。es_doc展示纬度为订单商品纬度。

实现流程

订阅订单表 3d95def3-3234-47f5-b626-89d95b868938-image.png 订阅字段 7574425f-6953-4640-9d0a-97c1403d07d7-image.png 画出横线的即为需要同步的字段,有一个点需要特别注意:ES 中需要展示的字段一定要勾上同步,不勾上的话在自定义代码中 add 后 也不会被同步 官方给出的解释为字段黑白名单。 这里有几个细节点,订阅的表的维度并非 ES 存储数据的维度,所以这边的 id 并不是 ES 的 _id,对于这种需要在源端同步必须传的字段,设置对端字段可以随意设置一个对端已有的字段,在自定义代码中可以灵活的去重新配置需要同步的字段。(如果设置默认,ES 的 index 会创建出这个字段,这显然不是我们想要看到的效果)

业务流程

12f805e2-e6d3-478e-b3e1-aaf989fcf59a-image.png

代码实现

查询扁平化数据

SELECT
to2.store_id,
tuc.id AS user_id,
tuc.sex AS sex,
tuc.birthday,
tog.NAME AS goods_name,
tog.goods_type,
tog.goods_id AS order_goods_id,
tog.goods_price,
tog.create_time AS order_create_time,
tog.id AS order_id,
tog.enterprise_goods_id AS enterprise_goods_id
FROM
`live-mini`.tb_order to2
INNER JOIN `live-mini`.tb_order_goods tog ON to2.id = tog.order_id
AND tog.is_del = 0
AND to2.user_id = tog.user_id
INNER JOIN `live-mini`.tb_user_c tuc ON to2.user_id = tuc.id
AND tuc.is_del = 0
WHERE
to2.is_del = 0
AND to2.id= #{占位}
GROUP BY tog.id

思路:自定义代码获取 order 表的主键后,查询上面的 SQL,先将原 customRecord 中数据删除,再以查询出的结果维度新增数据。修改的逻辑亦如此。

public List<CustomRecord> addData(CustomRecord customRecord, DataSource dataSource) {
List<CustomRecord> customRecordList=new ArrayList<>();
String idStr = (customRecord.getFieldMapAfter().get("id")).toString();
List<OrderGoods> orderGoodsList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));
RecordBuilder.modifyRecordBuilder(customRecord).deleteRecord();
if (orderGoodsList.size() > 0) {
for (OrderGoods orderGoods:orderGoodsList){
//添加需要的行和列
Map<String,Object> fieldMap=BeanMapTool.beanToMap(orderGoods);
customRecordList.add(RecordBuilder.createRecordBuilder().createRecord(fieldMap).build());
}
}
return customRecordList;
}

public List<CustomRecord> updateData(CustomRecord customRecord, DataSource dataSource) {
List<CustomRecord> customRecordList=new ArrayList<>();
String idStr = (customRecord.getFieldMapAfter().get("id")).toString();
List<OrderGoods> orderGoodsList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));
RecordBuilder.modifyRecordBuilder(customRecord).deleteRecord();
if (orderGoodsList.size() > 0) {
for (OrderGoods orderGoods:orderGoodsList){
//添加需要的行和列
Map<String,Object> fieldMap=BeanMapTool.beanToMap(orderGoods);
customRecordList.add(RecordBuilder.createRecordBuilder().createRecord(fieldMap).build());
}
}
return customRecordList;
}

private List<OrderGoods> tryQuerySourceDs(DataSource dataSource, Integer id) {
String sql="SELECT to2.store_id,tuc.id AS user_id,tuc.sex AS sex,tuc.birthday,tog.NAME AS goods_name,tog.goods_type,tog.goods_id AS order_goods_id,tog.goods_price,tog.create_time AS order_create_time,tog.id AS order_id,tog.enterprise_goods_id AS enterprise_goods_id FROM `live-mini`.tb_order to2 INNER JOIN `live-mini`.tb_order_goods tog ON to2.id = tog.order_id AND tog.is_del = 0 AND to2.user_id = tog.user_id INNER JOIN `live-mini`.tb_user_c tuc ON to2.user_id = tuc.id AND tuc.is_del = 0 WHERE to2.is_del = 0 and to2.id=";
try(Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql + id+" GROUP BY tog.id")) {
ResultSet resultSet = ps.executeQuery();
BeanListHandler<OrderGoods> bh = new BeanListHandler(OrderGoods.class);
List<OrderGoods> orderGoodsList = bh.handle(resultSet);
return orderGoodsList;
} catch (Exception e) {
esLogger.error(e.getMessage());
return new ArrayList<>();
}
}

实现效果

 {
"_index" : "live-mini-order-pro",
"_type" : "_doc",
"_id" : "359",
"_score" : 1.0,
"_source" : {
"goods_type" : 0,
"order_id" : 359,
"order_goods_id" : 450,
"order_create_time" : "2020-12-22T10:45:20.000Z",
"enterprise_goods_id" : 64,
"goods_name" : "【老客户专享】万店掌2021新年定制台历",
"sex" : 2,
"goods_price" : 1.0,
"user_id" : 386,
"store_id" : 1,
"birthday" : ""
}
}

写在最后

CloudCanal 的自定义代码很好地解决了我们多表关联同步 ES 的问题,简洁易用的界面和有深度的功能都令人印象深刻,期待 CloudCanal 更多新能力。关于 CloudCanal 自定义代码的能力,也欢迎大家与我交流。

Kafka 数据中转校验

· 阅读需 3 分钟
Barry
Chief Technology Officer

案例简述

CloudCanal 支持 MySQL -> Kafka完整的传输链路。

Kafka是消息系统,不支持SQL查询数据。针对这种异构数据源,我们如何来校验源对端数据的一致性呢?

本案例将通过 CloudCanal 先将Kafka数据再次回流到MySQL, 最后 检验两个 MySQL 的数据是否一致,即可间接地对比数据一致性。本案例使用 DebeziumEnvelope 消息格式。

前置条件

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备好 2 个 MySQL 实例,1 个 Kafka 实例(本例使用自己搭建的两台 MySQL 5.6,阿里云 Kafka 2.2)
  • 两台 MySQL 实例准备两个相同的表 kafka_migration_test.table1
create schema kafka_migration_test collate utf8mb4_unicode_ci;
create table table1 (
`col1` varchar(25) null,
`col2` varchar(25) null,
`col3` varchar(45) null
);
  • 登录 CloudCanal 平台,添加 Kafka,MySQL image.png

  • kafka 自定义一个主题 topic_1,并创建一条 MySQL(1) -> Kafka 链路作为增量数据来源

    • 因为默认的消息格式是不带 Schema 字段的,这里的消息我们会反写到 MySQL 要利用 Schema 的信息,所以创建成功后需要在 **源端配置页面 **设置 SchemaInclude 参数为 true image.png
  • kafka 自定义一个主题 topic_2,并创建消费者组 migration,后续用于反写数据, 并创建一条 Kafka -> MySQL(2) 链路作为增量数据来源

任务创建 1 [ MySQL(1) -> Kafka ]

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库,并选择 DebeziumEnvelope 消息格式,和 topic_1 主题 image.png

  • 选择 数据同步,不勾选 全量数据初始化,其他选项默认 image.png

  • 选择需要迁移同步的表 table1和对应的 Kafka 主题 topic_1 image.png

  • 没有主键自动忽略,方便后序生成数据 image.png

  • 确认创建任务,创建成功后在源端配置页面设置 SchemaInclude 参数为 true image.png

  • 然后点击生效配置,确认重启 image.png

任务创建 2 [ Kafka -> MySQL(2) ]

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库 image.png

  • 选择 数据同步,其他选项默认

  • 选择消费的 Kafka 主题 topic_2和需要迁移同步的表 table1 image.png

  • 确认创建任务,没有主键自动忽略,方便后序生成数据

数据验证

  • 程序造数据,向 MySQL(1) 生成数据,MySQL(1) -> Kafka(topic_1) -> Kafka(topic_2) -> MySQL(2)

  • Kafka(topic_1) 数据会下沉到 topic_2 主题

  • 任务正常运行一段时间后,停止造数据,等待数据全部同步完成

  • 创建 MySQL 到 MySQL 的数据准确校验 image.png

  • 选着数据校验功能,其他的默认 image.png

  • 数据校验 OK image.png

总结

本文展示了 Kafka 的双向同步案例,验证了 CloudCanal 传输数据的准确性。

数据校验与订正

· 阅读需 6 分钟
Barry
Chief Technology Officer

简述

CloudCanal 除了提供最核心的数据迁移和同步能力以外,还提供数据校验和数据订正两种非常实用的能力。

这两种功能为用户保障数据迁移同步链路的数据质量提供了非常大的便利性。

例如对端数据库因为各种原因产生一些异常写入导致的数据不一致或者丢失,用户均可以使用CloudCanal提供的数据校验和数据订正能力来基于同步链路的源端数据来恢复数据,使得对端数据库中相比源端丢失或者不一致的数据得到恢复。

技术点

基于校验结果的针对性订正

执行完CloudCanal的校验任务后,在运行任务的机器上会生成一个文件compre_rs.log用于记录校验的结果信息。日志路径为~/logs/cloudcanal/tasks/taskName/compare_rs.log,其格式如下:

库表名称,结果类型,主键信息

{"tableUnit":"test15.test_huasheng1","type":"DIFF","pkColMap":{"id":"9"}}
{"tableUnit":"test15.test_huasheng1","type":"LOSS","pkColMap":{"id":"12"}}

结果类型分为两种:

  • DIFF:对端相比源端不一致的行,例如上面例子中,源端主键id=9的行和对端存在不一致。
  • LOSS:在源端表中存在,但是在对端表中不存在的行。上面例子中源端主键id=12的行,在对端不存在

主键信息记录的是源端的,支持联合主键。

为了性能考虑,这里DIFF时不展示具体哪一列的数据不一致。如果需要查看这个信息,这个数据信息记录在~/logs/cloudcanal/tasks/${taskName}/diff.log中

利用数据库的upsert能力进行订正

针对支持upsert语义写入的数据源作为对端时,CloudCanal的订正可以正常工作。CloudCanal根据校验结果去源端反查数据后写入对端,如果对端不存在该主键的行,则直接INSERT写入,如果存在则自动转换为UPDATE进行更新。

使用in multi column处理联合主键的情况

针对实现SQL标准中in multi column语法的数据库作为源端时,CloudCanal支持对其进行数据订正。CloudCanal根据主键扫描源端表时,如遇联合主键的场景,会根据in multi column的语法来扫描源端的数据。不支持in multi column SQL语法的数据源CloudCanal不支持订正其数据。in multi column语法的使用例子可以参考如下:

-- works in PostgreSQL, Oracle, MySQL, DB2, HSQLDB
SELECT whatever
FROM t --- you missed the FROM
WHERE (col1, col2) --- parentheses here
IN ((val1a, val2a), (val1b, val2b), ...) ;

使用须知

  • 以下源端、对端之间支持创建订正任务:
    • 源端:Oracle、PostgreSQL、MySQL、OceanBase、PolarDBMySQL
    • 对端: MySQL、PolarDBMySQL、Oracle、PostgreSQL、OceanBase
  • 支持该特性的CloudCanal版本:v2.2.6.8(商业版)
  • 订正是以源端数据为准:校验结果中会记录对端相比源端缺失、不一致的行的源端主键信息。订正则会基于该源端主键进行订正。假设对端多出了一些源端不存在的主键,在订正的时候CloudCanal是不会去删除这些行的请知悉。
  • 基于校验结果的订正依赖校验任务的校验结果文件,因此当关联的校验任务在不同机器上执行过的话,则无法基于该校验任务创建订正任务。在校验任务详情,点击功能列表->重启历史记录 可以查看校验任务是否在多台机器上运行过。 image.png

操作说明

前置条件

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备两个支持数据订正的数据库,一个作为源端,一个作为对端。本次例子采用的源对端数据源类型为阿里云的PolarDBMySQL

校验订正的基本流程

使用CloudCanal的校验订正能力恢复异常数据的典型流程如下图所示。

image.png

数据校验

  • 在任务管理页点击创建任务,进入创建任务的第一步,配置源对端的数据库并且选择需要订阅的库。 image.png

  • 选择任务类型为校验,开启一次性校验,设置自动启动。 image.png

  • 选择需要进行校验的表。 image.png

  • 选择需要进行校验的列,支持映射和裁剪。裁剪的列将不参与校验。 image.png

  • 确认任务整体配置情况,无误后点击创建 image.png

  • 校验完成后可以查看具体每张表相比源端缺失或者不一致的数据。 image.png

数据订正

  • 校验任务的详情页,点击功能列表中的创建订正任务可以直接基于该次校验的结果,创建对应表的订正任务。 image.png

  • 订正任务的源对端信息和订阅的库信息与之前的校验任务保持一致,此处源对端测试连接成功后可直接点击下一步。 image.png

  • 规格与校验任务保持一致,可以直接下一步 image.png

  • 订阅信息仅供确认,无法修改,与校验任务保持一致,直接点击下一步 image.png

  • 确认列的映射、裁剪信息,无法修改,与校验任务保持一致,直接点击下一步 image.png

  • 确认订正任务的配置无误后点击创建任务 image.png

  • 校验完成后可以看到具体订正的统计信息 image.png

总结

本文介绍了如何利用 CloudCanal 的校验订正能力来快速恢复数据。

数仓实时构建案例

· 阅读需 12 分钟
Barry
Chief Technology Officer

简述

本案例为国内某大健康领域头部公司真实案例(因用户保密要求,暂不透露用户相关信息)。希望文章内容对各位读者使用 CloudCanal 构建实时数仓带来一些帮助。

业务背景

大健康背景下,用户对报表和数据大屏的实时性能要求越来越高。以核酸检测为例,检测结果需要实时统计分析,并在决策大屏中进行可视化展现。数据的及时性直接关系到区域疫情防控的精准布施从而有效防止疫情的扩散,不容半点闪失。在此之上,业务的多样性和复杂性也对公司的研发和运维成本要求也越来越高。

例如疫情防控指挥决策大屏中,数据包括流调溯源数据、物资冷链数据、居住人口数据、重点人群数据、风险排查、隔离管控、核酸检测数据、疫苗接种数据。这些来源数据标准不一,分散的数据引发数据冗余、数据不一致、数据应用困难等问题,导致研发和运维成本的上升,需要通过一个良好的接入层将这些数据做汇总和统一管理。

在此背景下,我司在更高效数据ETL方式以及高性能数据分析工具选型方面不断尝试和创新。通过引入了 CloudCanal 和 StarRocks,在数仓建设、实时数据分析、数据查询加速等业务上实现了效率最大化。

业务架构

我司旗下拥有多款大健康产品。虽然各款产品的具体业务不同,但是数据流的链路基本一致:数据接入->数据处理与分析->数据应用

下面以 疫情防控系统 为例简单介绍其中数据流的生命周期:

  • 数据接入:首先疫情防控系统的数据主要是三个来源、人员数据、冷链数据、物资数据。这些数据经过统一标准化处理之后才能用于分析
  • 数据处理与分析:原始的数据经过整合和标准化,可以从数据分析出密接人员、密接关系图谱等信息
  • 数据应用:数据处理与分析的指标可以用于实时监控大屏、以及相关预警

e983e0f0-dfd8-4515-b2ba-d7ee0ea65860-image.png

原有技术架构以及痛点

针对疫情防控系统,我们最初选择 ClickHouse 作为分析层,通过 DataX + Flink CDC 的模式实现实时+离线数据同步。随着业务的迭代,这套架构已经无法满足我们的需求。

技术架构

12141ca7-3922-40d8-bea2-1ec34c396492-image (1).png

原有疫情防控的架构总体上分为四块,自底向上分别是:

  • 数据层:源端数据源主要是 MySQL 为主的关系型数据库。

    • 业务信息:以核酸检测业务功能为例,需要支撑单日 300万 核酸检测任务。要求支撑每秒 1000 并发

    • 技术信息:数据层采用 MySQL 主从同步,配置级架构如下:

3767de93-c8fb-48c4-81f9-85924f2609a2-image (2).png

  • 痛点:

    • MySQL 从库查询效率满足不了常规读操作,查询效率低下,急需数据查询加速
    • 研发人员大量的精力和时间放到了数据库查询优化上
    • 处理层:采用离线+实时的 lambda 架构。其中离线部分采用 DataX 和 Kettle 进行定时全量,迁移源端维表到分析层的宽表中。实时部分使用 Flink-CDC 获取增量数据,一般是用于加速中间数据和近期的热数据。离线和在线数据分别存储在 ClickHouse 不同表中,提供给业务侧查询使用。
  • 业务信息:

    • 离线:将报表、大屏、数据交换服务采用离线方式构建 DM 主题数据集市。使用到的就是Datax 工具结合实现。
    • 实时:使用 Flink CDC 将MySQL 数据1:1同步到 ClickHouse 中。程序端通过改造查询SQL将慢语句通过 ClickHouse 实现。

    技术信息: 整体架构如下

    5f4d1355-45ef-4329-a378-ce1f3f23776f-image.png

  • 痛点:

      • 报表、大屏、数据交换离线场景对数据的实时性要求越来越高。大部分场景已不适用DataX这种离线方案。
    • DataX 定时任务调度带来的运维成本和源库影响:各种定时调度任务大大增加了运维管理的难度;同时这种定时触发的 SQL 很容易产生慢 SQL 影响源端数据库的正常工作。
    • Flink CDC 通过主库 Binlog 同步时出现过锁表影响业务的情况,虽然之后替换为订阅从库解决,但是会出现延迟现象。
    • Flink CDC 运维成本较高:Flink CDC 实时同步机制需要研发人员专职进行维护。例如像源端新增字段这种DDL需求,研发需要不断调整调度任务才能确保业务正常运行。
    • 分析层:分析层会保存计算好的指标数据以及用于加速查询的中间结果数据。
  • 业务信息:

    • 搭建三台单体 ClickHouse,分别对应 报表业务、大屏业务、数据交换服务、数据查询加速。
    • 以大屏业务举例,前期由于需求变化大,研发直接使用 ClickHouse 对单表过亿的数据进行数据关联、分组统计。高并发情况下也造成 ClickHouse 出现 CPU 打满的情况。ClickHouse 慢语句如下图。

    8c921320-a124-4ca0-8b61-ce37145b26d4-image (1).png

  • 痛点:

    • 集群运维较复杂,需要使用Zookeeper 搭建ClickHouse集群,运维成本高。
    • SQL 语法不兼容 MySQL,开发上手门槛高、Join 不友好
    • 修改、删除以及数据去重性能损耗大:例如使用ReplacingMergeTree()引擎,需要处理重复数据同时去重对性能要求较高。
    • 并发能力差:单机ClickHouse在高并发下,CPU经常被拉满,出现崩溃情况。
      • 业务层:业务层主要是应用程序访问分析层的指标结果或者通过查询中间结果来加速查询性能。最终的查询结果会服务疫情防控系统的实时大屏、报表以及预警等相关数据服务。
      • Clickhouse集群运维门槛高,之前在20.3版本出现过DDL任务和查询陷入死锁BUG,造成集群故障,最后放弃集群方案。采用3个单机通过Flink-CDC负责数据同步。
  • 业务层

    • 业务信息:主要是BI业务(报表、大屏)、数据查询加速、数据交换。
    • BI业务:平台报表业务和大屏业务全部接入ClickHouse数据库。
    • 数据查询加速:通过监控MySQL慢语句将慢语句也接入ClickHouse进行查询呈现。
    • 数据交换:ClickHouse负责与第三方平台进行数据交换任务。

    数据源接入

    接入的监测数据分散在各个数据库实例和数据库中。我们遇到的问题主要是:

  • 结构迁移成本高:很多表是一对一同步的,每次需要人为在ClickHouse上进行建表,增加了数据接入的成本

  • 人工操作多:需要接入的表人工筛选成本大

  • 新增表接入不方便:新增表接入需要重新修改配置

    现有系统架构以及优势

    架构介绍

    998f1a33-c886-4624-82ba-d8c4e55a5958-image (2).png

    新架构层次划分与原有架构基本相同,我们对处理层与分析层的技术栈选型进行了一些调整。在原有架构中,我们使用DataX+FlinkCDC的方案实现了数据的实时与离线同步传输。在替换CloudCanal后,统一实时离线两套技术栈,减少了运维成本。分析层中,通过使用StarRocks替换ClickHouse,在性能,运维成本,业务扩展上也带来了极大的提升。

    新架构优势说明

    引入 CloudCanal 数据同步工具

    • 异构数据源接入效率高:提供了库表列裁剪映射、各种维度的筛选能力等
    • CloudCanal人性化的操作页面以及低代码操作方式,释放了业务线的研发人员
    • 结构迁移、全量、增量一体化
    • 监控、报警运维便利

    引入 StarRocks MPP 数据库

    OLAP 数据库产品选型

    针对于分析层的问题与挑战,我们着力于寻找一款高性能,简单易维护的数据库产品来替换已有的ClickHouse架构,同时也希望在业务层上能突破 ClickHouse 单表查询的限制,通过实时多表关联的方式拓展业务层的需求。

    目前市面上的 OLAP 数据库产品百花齐放,诸如 Impala、Druid、ClickHouse 及 StarRocks。在经过一些列的对比之后,我们最终敲定选择StarRocks替换原有的ClickHouse作为分析层的数据库引擎。

    d00ae345-1af5-46b2-accc-680928eed4db-image (3).png

    接入StarRocks

    StarRocks 是一款极速全场景MPP企业级数据库产品,具备水平在线扩缩容、金融级高可用,兼容 MySQL协议和 MySQL 生态,提供全面向量化引擎与多种数据源联邦查询等重要特性,在全场景 OLAP 业务上提供统一的解决方案,适用于对性能,实时性,并发能力和灵活性有较高要求的各类应用场景。

    经过初步的考量,我们认为,StarRocks 兼容 MySQL 协议与标准SQL,相比于 ClickHouse 对于业务开发人员更加友好。同时,强大的多表关联能力可以将原有的大宽表模型转换为星型/雪花模型,增加了建模的灵活性,更好的应对业务需求的迭代。在运维方面,自动化调度机制可以支持在线扩缩容,可以极大的减少在ClickHouse上的运维成本。

    分析层改造收益

    在引入 StarRocks 对系统进行升级改造后,极大程度的减少了原本 ClickHouse 中的慢查询。整体查询效率提升2~3倍。下面是生产环境业务中两张核心表。其中以我们一个典型的统计SQL为例,可以看到StarRocks带来了明显的性能提升。

    表名行数
    rhr_person_info3451483
    chm_children_should6036599

    ClickHouse侧执行SQL

    select count(0) from rhr_person_info a 
    inner join chm_children_should b
    on a.id=b.person_id
    where toDate(b.should_start) <= toDate('2022-03-01')
    and toDate(b.should_end) >= toDate('2022-03-02')
    and (credentials_number = '%zz%' or en_name like '%zz%')
    and create_type_code =2
    and is_deleted =0
    and district_id like '130926105%'

    StarRocks侧执行SQL

    --starrocks
    select count(0) from rhr_person_info a
    inner join chm_children_should b
    on a.id=b.person_id
    where date_format( should_start,'%Y-%m-%d') <= ('2022-03-01')
    and date_format( should_end,'%Y-%m-%d') >= ('2022-03-02')
    and (credentials_number = '%zz%' or en_name like '%zz%')
    and create_type_code =2
    and is_deleted =0
    and district_id like '130926105%'

    查询响应时间比较

    StarRocksClickHouse
    平均响应时间368ms3340ms

    在体验至极性能的同时,我们在维护性,灵活建模等方面也获得了极佳的体验:

    • StarRocks 兼容 MySQL5.7 协议和 MySQL 生态
    • 支持高并发分析查询
    • 不依赖于大数据生态
    • MPP 架构,分片分桶的复合存储模型
    • 运维简单,易用性强
    • 水平扩展,不依赖外部组件,方便缩扩容
    • 支持宽表和多表 Join 查询(复杂场景),数据查询秒级/毫秒级

    新架构效果说明

    服务器资源合理释放(以核酸检测业务为例)

    对比数据层处理层分析层
    原架构4台2台3台
    新架构2台1台3台

    人力成本的释放

    原架构在数据层和处理层研发人员工作占比为60%,每一个业务的调整需要与 DBA 一起测试查询 SQL,防止出现慢语句同时业务系统随着需求的增加经常有增加字段的需求,研发人员需要不断调整和发布 Flink CDC 调度。新架构只需要 ETL 工程师负责运维即可,体现了 CloudCanal 低代码和便捷的运维优势。

    运维成本的降低

    StarRocks 部署不需要大数据组件的支撑,部署运维都很简单。StarRocks 兼容Mysql生态,业务使用可直接使用Mysql JDBC 进行连接,不用再担心SQL语法差异问题。

    未来规划

    目前,我们已经上线了 2 个产品线的 StarRocks 集群,通过 CloudCanal 更好的实现了实时数仓的搭建,已经在公司内部进行推广,后续会有更多的应用落地。感谢 CloudCanal 团队和 StarRocks 团队提供专业的支持服务。

    参考链接

深入浅出 SQL Server CDC 数据同步

· 阅读需 9 分钟
Barry
Chief Technology Officer

简介

SQL Server 是一款老牌关系型数据库,自 1988 年由 Microsoft、Sybase 和 Ashton-Tate 三家公司共同推出,不断迭代更新至今,拥有相当广泛的用户群体。

如今,我们提到 SQL Server 通常指 Microsoft SQL Server 2000 之后的版本。

SQL Server 2008 是一个里程碑版本,加入了大量新特性,包括 新的语法更丰富的类型 以及本文所提及的 CDC 能力,这个能力让数据从 SQL Server 实时同步到外部更加方便。

本文将介绍 CloudCanal 在新版本中对于 SQL Server 数据同步更进一步的优化和实践。

SQL Server CDC 长什么样?

原始日志

常见的数据库往往存在以下两种日志

  • redo 日志
    • 记录数据的正向变更,简单来说,事务的 commit 通常先记录在这个文件,再返回应用程序成功,可确保数据 持久性
  • undo 日志
    • 用于保证事务的 原子性,如执行 rollback 命令即反向执行 undo 日志中内容以达成数据回滚

一条 DML 语句写入数据库流程如下

  • 大部分关系型数据库中,一个或多个变更会被隐式或显式包装成一个事务
  • 事务开始,数据库引擎定位到数据行所在的 文件位置 并根据已有的数据生成 前镜像后镜像
  • 后镜像 数据记录到 redo 日志中,前镜像 数据记录到 undo 日志中
  • 事务提交后,日志提交位点(检查点)向前推进,已提交的日志内容即可能被覆盖或者释放

SQL Server redo/undo 日志采用了 ldf 格式 ,文件循环使用。

  • ldf 日志文件由多个 VLF(逻辑日志) 组合在一起,这些 VLF 首尾相连形成完整的数据库日志记录
  • ldf 在逻辑日志末端到达物理日志文件末端时,新的日志记录将回到物理日志文件开始,复写旧的数据

ldf 文件即 CDC 所分析的增量日志文件。

启用 CDC

在数据库上执行 exec [console].sys.sp_cdc_enable_db 命令为 console 数据库启用 CDC 功能,这个语句实际上会创建两个作业: cdc.console_capture , cdc.console_cleanup

使用 exec sp_cdc_help_jobs 命令可查看这两个作业详细信息。

  • cdc.console_capture
    • 负责分析 ldf 日志 并解析 console 数据库事件,再将其写入到 CDC 表中
    • 间隔 5 秒钟执行一次扫描,每次扫描 10 轮,每轮扫描最多 500 个事务
  • cdc.console_cleanup
    • 负责定期清理 CDC 表中较老的数据
    • 默认保留 3 天 CDC 日志数据(4320秒)

开启 CDC 功能后,SQL Server 数据库会多出一个名称为 cdc 的 schema,里面会多出下列这些表。

  • change_tables
    • 记录每一个启用了 CDC 的 源表 及其对应的 捕获表
  • captured_columns
    • 记录对应 捕获表 中每个列的信息
  • index_columns
    • 记录 源表 含有的主键信息(如果有)
  • lsn_time_mapping
    • 记录每个事务的开始/结束时间及 LSN 位置信息
  • ddl_history
    • 记录源表发生的 增/减列 对应的 DDL 信息,除此之外的 DDL 都不会被记录

有了上述准备动作和信息,即可开始对原始表开启 change data capture(CDC),即增量数据捕获了。

捕获表变更

有如下 源表

create table [dbo].[test_table] (
[id] [bigint] NOT NULL primary key,
[test] [nchar](10) NULL
)

执行下列命令即可为它启用 CDC

exec [console].[sys].[sp_cdc_enable_table]
@source_schema = [dbo],
@source_name = [test_table],
@role_name = NULL,
@capture_instance = [dbo_test_table], -- 可选项
@supports_net_changes = 0;

cdc schema 下多出一个名为 dbo_test_table_CT 的表,即 捕获表

  • 源表 [dbo].[test_table] 做若干 DML 操作,通常是 5 秒内就可在捕获表中看到变更记录
  • 源表 做一些 增/减 列 操作,对应的 DDL 会出现在 ddl_history 表中

其他表也可通过类似设置,获取到相应的增量变更。整个机制看上去相当直观和简单。

挑战是什么?

难点1:DDL 同步困难

CDC 捕获表只反馈数据的变化,无 DDL 信息

DDL 需额外获取即和 DML 的顺序关系要额外处理

解决这个问题,需要通过执行以下的 SQL 将 DDL 和 DML 事件混合到一起并保证顺序,但是实际使用中会面临严重的性能问题。

select * from (
select __$start_lsn lsn,__$operation oper,__$update_mask mask, null ddl ,id data_id,test data_test
from [console].[cdc].[dbo_test_table_CT]
union
select ddl_lsn lsn, -1 oper,null mask, ddl_command ddl ,null data_id,null data_test
from [console].[cdc].[ddl_history]
) t order by lsn

难点2:无法获取新增列数据

CDC 捕获表的结构并不会随着 DDL 事件的发生而变化,这意味着无法获取新增列的数据

难点3:数据库限制

使用 CDC 功能本身也会产生一些硬性的限制,大致可以分为两类

硬性限制

  • 已经启用 CDC 捕获的源表上不能执行 truncate table 语句,执行即报错

  • CDC 捕获表本质上也是一个普通的表,大量订阅会导致整库表的数量扩大

  • 依赖 SQL Server 代理,如没启动或作业运行失败,捕获表中不会有任何新数据写入

  • 一张表只能创建 2 张对应的 CDC 捕获表,即无法做超过 2 个以上的增量订阅

  • 一张表的 CDC 捕获只能设置启动和禁止,即不能通过重建 CDC 并指定 LSN 来获取新数据

软性限制

  • CDC 捕获表中的数据存留时间默认 3 天

  • 在插入或更新超大字段时默认 CDC 只会处理最大 64KB 个字节的数据

    • 数据内容如果超过这个限制会导致 CDC 捕获任务报错并停止工作
    • 受影响的类型有 7 个:textntextvarchar(max)nvarchar(max)varbinary(max)xmlimage

CloudCanal 的解决方法

CloudCanal SQL Server 增量消费基础处理模型如下所述,保证单个表的数据变更顺序,满足大部分场景

  • 根据 change_tables 表确定一个工作队列
  • 确定起始位点,对于捕获表的增量数据扫描从起始位点开始
  • 并发处理工作队列上的事件
  • 每个 Worker 会根据起始 LSN 扫描自身要处理的 CDC 捕获表
  • 每个 Worker 扫描都会维护自身的 LSN 进度

解决难点1:DML/DDL重排序

CDC 捕获表中的每一条记录都有一个 LSN 信息,ddl_history 表也有 LSN 信息。因此可以借助 插值 的思想将 DDL 事件插入到正常的 DML 事件序列中去,原理如下图:

  1. ddl_history 表进行预查询,获取到的 DDL 事件在稍后的处理中会进行位点比对处理
  2. 查询 dbo_test_table_CT 数据捕获表
  3. 处理每一条的捕获表的数据时检测 DDL 事件是否可以被插入
  4. 形成完整的事件流

解决难点2:反查补充缺失数据

SQL Server CDC 捕获表最多只能创建 2 张是硬性限制,但刚好能解决这个问题,在 DDL 发生后创建第二个 CDC 捕获表可以感知到 DDL 对数据的变化

  1. 创建第一个 CDC 捕获表 dbo_test_table_1_CT
  2. 在两次数据插入的中间增加一个新的列
  3. 创建第二个 CDC 捕获表 dbo_test_table_2_CT
  4. 在插入一条新数据

通过上图可看到 dbo_test_table_2_CT 相比 dbo_test_table_1_CT 已经可以感知到新增的列数据

遗憾的是 DDL 发生后到第二个 CDC 捕获表创建出来之前这中间的数据仍然是缺失的

上面的例子如下图所示(灰色的 Event 表示事件或者数据有缺损)

以 DDL 发生的 LSN 为分界点

  • 在 DDL 发生之前 dbo_test_table_1_CT 表中的数据是完全可信的
  • 在 DDL 发生之后由于 dbo_test_table_1_CT 表中并没有新列字段,因此它的数据是残缺的,不能完全信任
  • dbo_test_table_2_CT 是由于在 DDL 发生后才被创建出来,因此相比较 dbo_test_table_1_CT 它的数据是缺失的
  • 此外 dbo_test_table_1_CTdbo_test_table_2_CT 之间还存在一个盲区导致这个 INSERT 事件两个表都不可信

CloudCanal 解决办法是在此基础上将两张表都缺损的位点 反向使用 PK 从源表中补齐 的方式解决这个问题(上图中深灰色部分)

有一个极端情况是在第二张 CDC 捕获表创建过程中发生了新的 DDL ,这会导致新创建的捕获表也不可靠,因此需要重新创建第二个 CDC 捕获表,并且扩大中间需要反查补齐的数据范围(下图中深灰色部分)

CloudCanal 正是基于上述一系列机制才解决了 DDL 事件导致无法获取增量数据的难题

解决难点3:提供专业优化方案

对于硬性限制,CloudCanal 没有正面解决的方案,而是后续提供更多样的方式(如 trigger,定时增量扫描,新版本SQL Server CDC方案 等)进行补充。

软性限制,可通过以下方式优化

  • 通过以下命令中的 retention 参数来设置 CDC 捕获表中的数据存留时间

    exec console.sys.sp_cdc_change_job 
    @job_type = 'cleanup',
    @retention=4320 -- 单位:秒
  • 通过以下命令调整 CDC 处理的最大数据字节

    exec sp_configure 'show advanced options', 1 ;   
    reconfigure;
    exec sp_configure 'max text repl size', -1; -- -1 表示不限制
    reconfigure;

总结

本文简单介绍了 SQL Server CDC 技术,然后基于此能力,CloudCanal 是如何实现稳定的增量 DML + DDL 同步, 并且解决了其中遇到的难题。

主流 RDB 到 Flink 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

实时数据处理领域中,使用 Flink 方式,除了从日志服务订阅埋点数据外,总离不开从关系型数据库订阅并处理相关业务数据,这时就需要监测并捕获数据库增量数据,将变更按发生的顺序写入到消息中间件以供计算(或消费)。

本文主要介绍如何通过 CloudCanal 快速构建一条高效稳定运行的 MySQL -> Kafka -> Flink 数据同步链路。

技术点

兼容多种常见消息结构

CloudCanal 目前支持 Debezium Envelope (新增)CanalAliyun DTS Avro 等多种流行消息结构,对数据下游消费比较友好。 本次对 Debezium Envelope 消息格式的支持,我们采用了一种轻量的方式做到完全兼容,充分利用 CloudCanal 增量组件,扩展数据序列化器 (EnvelopDeserialize),得到 Envelop 消息并发送到 Kafka 中。 其中 Envelop 的消息结构分为 PayloadSchema 两部分

  • Payload:存储具体数据
  • Schema:定义 Payload 的解析格式 (默认关闭)
{
"payload":{
"after":{
"column_1":"3",
...
},
"before":null,
"op":"c",
"source":{
"db":"kafka_test",
"table":"new_table"
"pos":110341861,
"ts_ms":1659614884026,
...
},
"ts_ms":1659614884026
},
"schema":{
"fields":[
{
"field":"after",
"fields":[
{
"field":"column_1",
"isPK":true,
"jdbType":4,
"type":"int(11)"
},
...
],
"type":"struct"
},
...
],
"type":"struct"
}
}

高度可视化的CDC

CDC 工具如 FlinkCDCMaxwellDebezium ... 各有特色,CloudCanal 相对这些产品,最大的特点是高度可视化,自动化,下表针对目标端为Kafka 的 CDC 简要做了一些对比。

CloudCanalFlinkCDCMaxwell
产品化完备基础
同步对象配置可视化代码配置文件
封装格式多种常用格式自定义JSON
高可用
数据初始化(snapshot)实例级实例级单表
源端支持ORACLE,MySQL,SQLServer,MongoDB,PostgreSQL...ORACLE,MySQL,SQLServer,MongoDB,PostgreSQL...MySQL

CloudCanal 在平衡性能的基础上,提供多种关系型数据源的同步,以及反向同步。

提供便捷的可视化操作、轻巧的数据源添加、轻便的参数配置。

提供多种常见的消息格式,仅仅通过鼠标点击,就可以使用其他 CDC 的消息格式的传输,让数据处理变的异常的快捷、方便。

其中经过我们在相同环境的测试下, CloudCanal 在高写入的 MySQL 场景中,处理数据的效率表现的很出色,后续我们会继续对 CloudCanal 进行优化,提升整体的性能。

综上,相比与类似的 CDC 产品来说,CloudCanal 简单轻巧并集成一体化的操作占据了很大的优势。

Flink 流式计算中不仅要订阅日志服务器的日志埋点信息,同样需要业务数据库中的信息,通过 CDC 工具订阅数据,能减少查询对业务数据库产生的压力还能以流的形式传输,方便与日志服务器中的数据进行关联处理。

实际开发中,可以将业务数据库中的信息提取过滤之后动态的放入 Hbase 中作为维度数据,方便相关联的宽表进行关联查询。

也可以对数据进行开窗、分组、聚合,同样也可以下沉到其他的 Kafka 消费者组中,实现数据的分层。 image.png

操作示例

前置条件

  • 本例使用 Envelop 消息格式,关系型数据库 MySQL 为示例,展示 MySQL 对接 Flink 的 Demo

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档

  • 准备好 1 个 MySQL 实例,1 个 Kafka 实例(本例使用自己搭建的 MySQL 5.6,阿里云 Kafka 2.2)

  • 准备好 Flink 消费端程序,配置好相关信息:flink-demo 下载

  • 登录 CloudCanal 平台,添加 Kafka,MySQL 截屏2022-08-17 17.12.13.png

  • Kafka 自定义一个主题 topic_1,并创建一条 MySQL -> Kafka 链路作为增量数据来源

任务创建

  • 首先配置 FlinkDemo程序的阿里云 Kafka 相关信息 截屏2022-08-17 17.09.12.png

  • 运行 FlinkDemo 程序,等待消费 MySQL 同步 Kafka 的数据(程序不要关闭) 截屏2022-08-17 17.08.50.png

  • 任务管理 -> 任务创建

  • 测试链接并选择 目标 数据库,并选择 DebeziumEnvelope 消息格式,和 topic_1 主题(在阿里云里提前创建) 截屏2022-08-17 17.08.18.png

  • 选择 数据同步,不勾选 全量数据初始化,其他选项默认 截屏2022-08-17 17.07.46.png

  • 选择需要迁移同步的表 table1和对应的 Kafka 主题 topic_1 截屏2022-08-17 17.07.19.png

  • 持续点击下一步,并创建出数据同步任务。

  • MySQL生成数据,MySQL-> Kafka(topic_1) -> Flink
  • FlinkDemo 接收到 Kafka(topic_1) 数据,下沉到 topic_2 主题,打印并输出;这里 Flink 程序可以做更多的流式计算的操作,FlinkDemo 只是演示了最基本的数据传输案例截屏2022-08-17 17.10.05.png

常见问题

还支持哪些源端数据源呢?

目前开放 MySQL、Oracle,SQLServer,Postgres,MongoDB 到 Kafka,如果各位有需求,可以在社区反馈给我们。

支持 DDL 消息同步吗?

目前 关系型数据到 kafka 是支持 DDL 消息的同步的,可以将 关系型数据库 DDL 的变化同步到 Kafka 当中。

总结

本文简单介绍了如何使用 CloudCanal 进行 MySQL -> Kafka -> Flink 数据迁移同步。

主流数据库到 OceanBase 数据同步

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

CloudCanal 2.2.0.7 版本开始支持 OceanBase 作为对端的数据迁移同步能力

本文通过 MySQL->OceanBase的数据迁移同步案例简要介绍这个源端的能力。链路特点:

  • 结构迁移、全量迁移、增量同步(数据)
  • 流程全自动化
  • 高度产品化:任务管理、监控、审计一应俱全

使用须知

  • 仅支持 OceanBases MySQL 模式
  • 支持的源端数据源类型为 Oracle/PostgreSQL/MySQL,本文主要以 MySQL 源端为例说明使用方法。
  • DDL同步当前仅支持 MySQL->OceanBase

技术点

面向在线业务的编辑订阅能力

数据长周期增量同步过程中,常有订阅表增减的情况,CloudCanal 编辑订阅 能力,可在原有任务基础上进行变更。其中新增表会产生一个子任务,自动完成数据全量迁移和增量同步,然后和原有主任务合并,自动完成整个过程。 截屏2022-03-04 上午10.47.57.png

全自动化

CloudCanal 自动帮用户完成 结构迁移全量数据迁移增量数据同步,大大提升创建数据同步任务的效率。

自定义代码加工

CloudCanal 允许用户添加自定义代码处理数据,应用场景包括数据清洗数据脱敏宽表构建新系统数据库重构等。可参考文章《5分钟搞定 MySQL 到 ElasticSearch 宽表构建和同步》 以了解基本使用。

库表列裁剪映射

CloudCanal 提供了数据迁移同步中常用的产品化能力-在库、表、列等级别进行裁剪和映射,有效提升数据迁移同步任务的适配性。 截屏2022-03-04 上午10.44.41.png

断点续传

CloudCanal 支持迁移和同步任务的断点续传,通过定期记录的位点,让任务重启后自动从上一次位点开始继续迁移或同步。

操作示例

添加数据源

  • 登录 CloudCanal 平台

  • 选择 数据源管理->新增数据源

  • 选择 自建数据库中的OceanBase 截屏2022-03-04 上午10.50.05.png

    截屏2022-03-04 上午10.51.28.png

创建任务

  • 任务管理->任务创建

  • 选择 源 和 目标 数据库

  • 点击 下一步 截屏2022-03-04 上午10.52.47.png

  • 选择 增量同步,并且启用 全量数据初始化

  • 点击下一步 截屏2022-03-04 上午10.55.35.png

  • 选择订阅的表,结构迁移自动创建的表会按照默认类型映射进行处理。对端表如果已经提前建好,这里也可以直接映射对端已经存在的表

  • 点击下一步 截屏2022-03-04 上午10.56.54.png

  • 配置列映射、点击下一步 截屏2022-03-04 上午10.58.23.png

    如果是通过 CloudCanal 结构迁移自动建表,这边不允许重命名、裁剪以及列映射; 如果映射的是对端已经提前建好的表,这边支持列的裁剪和映射

  • 创建任务 截屏2022-03-04 上午10.59.39.png

  • 查看任务状态。任务创建后,会自动完成结构迁移、全量、增量阶段。 截屏2022-03-04 上午11.01.03.png

总结

本文简单介绍了如何使用 CloudCanal 进行 MySQL 到OceanBase 的数据迁移同步。

OceanBase 到主流数据库数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

CloudCanal 2.2.2.1 版本开始支持 OceanBase 作为对端的数据迁移同步能力.

本文通过 OceanBase->OceanBase的数据迁移同步案例简要介绍这个源端的能力。链路特点:

  • 结构迁移、全量迁移、增量同步(数据)
  • 流程全自动化
  • 高度产品化:任务管理、监控、审计一应俱全

使用须知

  • 仅支持 OceanBases MySQL 模式
  • 支持OceanBase 3.x版本
  • 支持的对端数据源类型为 OceanBase/StarRocks/MySQL,本文主要以 OceanBase 对端为例说明使用方法。
  • DDL同步当前仅支持 MySQL->OceanBase和OceanBase->OceanBase。开启方式为创建任务的时候设置同步DDL,并且在任务参数writeParallel中设置目标端执行并行度为1(执行DDL需串行避免写入异常)
  • 安装的oblogproxy默认保存增量24小时,如果需要调整,可以调整oblogproxy的参数log_clean_cycle_time_in_hours
  • 现在ob log proxy client不支持clientID复用,重启增量任务会启动新的client,ob log proxy所在机器会有较多磁盘占用,请留意或者定时清理
  • 当前支持的oblog proxy可以通过如下命令安装
docker run --name oblogproxy --net=host   -e OB_SYS_USERNAME=密文   -e OB_SYS_PASSWORD=密文   -d whhe/oblogproxy

技术点

面向在线业务的编辑订阅能力

数据长周期增量同步过程中,常有订阅表增减的情况,CloudCanal 编辑订阅 能力,可在原有任务基础上进行变更。其中新增表会产生一个子任务,自动完成数据全量迁移和增量同步,然后和原有主任务合并,自动完成整个过程。 截屏2022-03-04 上午10.47.57.png

全自动化

CloudCanal 自动帮用户完成 结构迁移全量数据迁移增量数据同步,大大提升创建数据同步任务的效率。

自定义代码加工

CloudCanal 允许用户添加自定义代码处理数据,应用场景包括数据清洗数据脱敏宽表构建新系统数据库重构等。可参考文章《5分钟搞定 MySQL 到 ElasticSearch 宽表构建和同步》 以了解基本使用。

库表列裁剪映射

CloudCanal 提供了数据迁移同步中常用的产品化能力-在库、表、列等级别进行裁剪和映射,有效提升数据迁移同步任务的适配性。 截屏2022-03-04 上午10.44.41.png

断点续传

CloudCanal 支持迁移和同步任务的断点续传,通过定期记录的位点,让任务重启后自动从上一次位点开始继续迁移或同步。

准备工作

添加数据源

  • 登录 CloudCanal 平台

  • 选择 数据源管理->新增数据源

  • 选择 自建数据库中的OceanBase 截屏2022-03-04 上午10.50.05.png

    8b6cf03f-dfba-494e-8057-e344d5a60036-image.png

信息

添加 OceanBase 数据源请提前安装好 oceanbase log proxy 用于订阅增量。

创建任务

  • 任务管理->任务创建

  • 选择 源 和 目标 数据库

  • 点击 下一步 09de85cb-04de-41d2-9d2b-d0651f631a0a-image.png

  • 选择 增量同步,并且启用 全量数据初始化

  • 点击下一步 截屏2022-03-04 上午10.55.35.png

  • 选择订阅的表,结构迁移自动创建的表会按照默认类型映射进行处理。对端表如果已经提前建好,这里也可以直接映射对端已经存在的表

  • 点击下一步 截屏2022-03-04 上午10.56.54.png

  • 配置列映射、点击下一步 截屏2022-03-04 上午10.58.23.png

    如果是通过 CloudCanal 结构迁移自动建表,这边不允许重命名、裁剪以及列映射; 如果映射的是对端已经提前建好的表,这边支持列的裁剪和映射

  • 创建任务 截屏2022-03-04 上午10.59.39.png

  • 查看任务状态。任务创建后,会自动完成结构迁移、全量、增量阶段。 7bfb7478-aa5d-4b86-823c-3ad377a9219b-image.png

总结

本文简单介绍了如何使用 CloudCanal 进行 OceanBase 到 OceanBase 的数据迁移同步。

深入浅出地理数据(GIS)迁移同步

· 阅读需 15 分钟
Barry
Chief Technology Officer

背景知识

什么是地理信息数据

地理信息数据的定义主要来自于我们熟知的星球——地球。我们知道地球表面是一个凸凹不平的表面,是一个近似的椭球体。以海平面为参照已知最高点和最低点之间有接近 2 万米的差距。

  • 珠穆朗玛峰,8848.86米含冰层(人民日报:2020年12月8日)
  • 马里亚纳海沟,相对海平面深10909米(人民日报:2020年11月30日)

即便是海平面也会在月球潮汐引力的作用下变化着,更不要提气候变化导致的海平面升高。因此要想找到一个确切的数学模型来表示地球还是挺困难的一件事。

于是人们以北极南极为两个定点,将地球按照这个轴线进行旋转。地球在这个旋转形态的下就会呈现一个椭球体的形态,这个就是理想的地球椭球体。这个椭球体就可以利用数学模型来进行表示。

正是基于这样一个共识,在 1975 年国际大地测量与地球物理联合会推荐下地球椭球体的模型数据被推荐为:半长径 6378140 米,半短径 6356755 米,扁率 1∶298.257,后续该数值有一些修正。

基于这些精准的测量数据,我们可以通过数学表示来定义地球上所有的点,从而利用现代化的定位技术我们可以精确定位地球上所有的位置。

定位技术

在地球上要想知道我们精确位置可以使用导航软件,导航软件的正常工作需要依赖全球定位系统。目前世界上共有四大导航系统,分别是

  • 中国的 北斗

  • 美国的 GPS

  • 欧盟的 伽利略

  • 俄罗斯的 格洛纳斯

这些定位系统最主要的部分是人造卫星,它们按照固定的规律围绕地球运转,其中有一些是运行在圆形的地球同步轨道上。

由于每个卫星和我们的距离不同,因此它们同一时刻发送的信标会以微小的时间间隔先后抵达我们的设备上。这样我们就有了带有延迟的信标信号,每一个延迟是可以被看作是一个距离。

我们事先知道每一个卫星的确切位置,再加上这些距离信息。当我们得到最少 3个信号之后就可以利用著名的三角定位法得到我们的准确位置,这也是所有卫星定位技术使用的核心原理。

空间数据

现有空间数据库标准主要有如下两套,两套标准之间大体是相互兼容的。

  • Simple Feature Access SQL,简称 SFA SQL:SFA SQL是 开放地理空间信息联盟(Open Geospatial Consortium,简称 OGC)制定的标准。

  • SQL Multimedia Part3: Spatial,简称 SQL/MM:SQL/MM 是 国际标准化组织(International Standard Organization,简称 ISO)制定的标准。

通过空间数据的描述我们可以定义一个具体的几何体。在这两种标准中公共的部分中都定义了下面 3 组共 6 个基础类型,这些是经常用到的类型。

  • 点、多个点
  • 线、多个线
  • 多边形、多个多边形

为了方便存储和使用这些数据 OGC 组织通过 OpenGIS 规范定了两种具体格式

  • Well-Known Text (WKT) format
  • Well-Known Binary (WKB) format

WKB/WKT 都只是通过标记语言描述点、线、面 的几何体数据,当用于几何计算时一般不需要坐标系。但是当数据需要展示在地图上时则需要将其原始的空间数据投射到大地坐标系上(这个过程称为投影)才可以得到这个几何图形具体的地理坐标。

空间引用识别号(SRID)

要将几何图形投影到坐标系,必须需要使用SRID。SRID可以理解为唯一标识了将某个几何体空间数据映射成某个具体坐标系中的方式。

当SRID为0或者不使用SRID时,表示一个几何图形实例没有被放到任何一个坐标系中,我们无法定位其位置。例如通过长宽高的具体值我们可以知道一个正方体的形状,但是我们没法知道他的具体坐标。

不同SRID值代表了将几何体映射到坐标系中的不同方式。几何体本身的空间数据结合SRID就可以具体定位这个几何体在坐标系中的位置。

下图简单演示了有无SRID得差异。像欧洲石油测绘组 (EPSG) 定义的 SRID是根据地球地理信息构建的坐标系,几何图形根据几何体空间数据以及EPSG标准的SRID值可以转成真实的地理坐标。

目前有多种公认的标准 SRID,例如欧洲石油测绘组 (EPSG) 定义的 SRID。不同数据库对于不同SRID标准的适配性也不同。

某些数据库和空间类型(如 PostgreSQL 中的 PostGIS 几何和地理或 Microsoft SQL Server 中的地理类型)使用预定义的 EPSG 代码子集,只可使用具有这些 SRID 的空间参考。

如今编制 SIRD 的工作已经转交给了国际石油和天然气生产商协会(OGP) 的手中,要想了解更多的EPSG信息,可以访问 https://epsg.io/

常见SRID标准与地理坐标系

在中国常用的坐标系有下面四个

  • WGS84:美国 GPS 系统上使用的坐标系。
  • GCJ02:由中国国家测绘局制定的地理坐标系统。
  • BD09:百度地图所使用的坐标系,它是建立在 GCJ02 坐标系之上。
  • CGCS2000:中国北斗系统所使用的坐标系。

大地坐标系与地图绘制

地图绘制的基本步骤

绘制地图构建大地坐标系主要会采用以下步骤:

  1. 首先会选择一个基准点,所有的地形数据都是基于这个基准点的进行绘制。而这个点也正是位于地球椭球体上的一个点。

  2. 地球椭球体可以充当画布,测绘员则可以在画布上勾勒出具体的街道和地形信息。从而形成最终的地图数据供我们使用。

  3. 基于定位的点和地图的基准点的之间的偏差就可以完成整个定位到地图的转换这个过程就是坐标系转换。当然实际的过程要更为复杂一些,甚至会有多次偏差修正。

存储地理信息

目前主流关系型数据库对地理信息基本都都有支持,其中最常用的类型便是 geometry 类型。在 Oracle 数据库中对应为 sdo_geometry 类型。

还有其它的几何类型,例如:PointPolygonMultiPointMultiPolygon 等等,介于篇幅的原因本文内容只针对 geometry类型。

有兴趣深入了解的朋友可以根据下方表格自行深入研究,本文不做过多展开。

数据库几何类型
MySQLPOINT、LINESTRING、POLYGON、MULTIPOINT、MULTILINESTRING、MULTIPOLYGON、
GEOMETRY、GEOMETRYCOLLECTION
PostgreSQLPOINT、LINE、LSEG、BOX、PATH、POLYGON、CIRCLE、GEOMETRY
OracleSDO_GEOMETRY、SDO_TOPO_GEOMETRY、SDO_GEORASTER
SqlServerGEOMETRY、GEOGRAPHY

不同的数据库由于存储和查询引擎的不同,针对地理信息的存储会有一些差异。这些差异主要是因为单纯的 WKB 并不能满足实际需求,最直接的问题就是 WKB 只考虑的几何图形的空间数据存储,但是并未涉及大地坐标系相关的信息。

比如在 MySQL 中地理信息数据将会在 WKB 数据前额外增加 4 个字节用于存放其对应的 SRID。而 PostgreSQL 用了更加高级的 EWKB 格式作为地理信息数据的存储格式。

因此如果想要以二进制方式直接从数据库中获取地理信息数据,了解正确的获取方式十分必要。

地理信息数据应用的问题

我们会从一个具体案例来和大家探讨地理信息数据应用中会遇到的实际问题。我们这个地理数据应用案例如下:

如何知道地球上一块土地在一段时间内的使用情况?

为了达成这个目的,我们将会不得不面临如下的一些挑战:

数据量大

首先土地的使用是随着时间变化而变化的,比如:

  • 在一些时间内这些用地可能是耕地,另外一些时间可能是用作林地。

  • 随着时间推移一块土地可能会被切割成个地块,或者合并成一个更大的地块。

因此每年获取的地图数据都只是当年最新的情况,地块数据也是不停地变化的。

基于这样一个情况,若想要知道一个时间跨度下的地块变化。通常会涵盖不同时间的地图数据。若地图数据是 1G 大小,如果要计算 10 年的变化就需要处理 10G 的历史地图数据。

计算量大

对于地图数据中还会含有很多其它结构化数据,比如:小区、门牌号、餐馆名称,地块通途以及交通道路等等信息。因此在基于业务查询需要会先进行业务维度上的数据查询和筛选。

写过业务逻辑的朋友都知道,复杂的业务查询很可能会涉及到几张表的联查操作。在加上我们还需要通过 GIS 函数进行几何图形的交并计算。这就会引发下面两个问题

  • 大量的地理的几何信息、标注信息引发出大表的 Join 性能问题。

  • 由于GIS的函数计算引发的大量计算。

没有万能的数据库

现在主流的对地理信息存储比较友好的数据库主要是PostgreSQL、Oracle和SQLServer。像PostgreSQL对地理信息数据处理的生态工具也比较友好,例如:

  • PostgreSQL 对 GeoServer、MapServer、ArcGISServer 几个地图服务中间件的支持性比较好。
  • PostgreSQL 对 PostGIS 的支持兼容性要比 Greenplum 好

这些传统数据库并不能解决所有问题,尤其是面临千万级别的 GIS 表时,表的 Join 查询又会面临严重的问题。

幸运的是,近几年新型和强力的OLAP分析型数据库不断出现,地理信息数据的处理和分析可以结合这些新型分析引擎大大提升地理信息数据处理的效率。

高效处理地理信息数据的现代化数据栈

以下现代化数据栈的方案来自于CloudCanal用户的一个真实案例。该用户原有方案是基于PostgreSQL进行地理信息数据的查询和处理。

通过CloudCanal数据互通,采用如下现代化的数据栈,轻松整合了ClickHouse强大的分析能力和ElasticSearch的强大全文索引能力来处理地理信息数据,大大提升了地理信息数据处理的效率。

数据栈架构图

以上架构图展示了整个地理信息数据的流向以及处理过程:

  1. PostgreSQL对地理信息存储和处理比较友好,业务应用先将产生的地理信息数据全部写入到PostgreSQL中

  2. 利用CloudCanal在异构数据源之间迁移和同步地理信息数据,自动化地转化地理信息数据写入新型的数据源中

  3. 利用ClickHouse强大的分析能力来高效处理地理信息数据

    1. 海量地理信息数据的聚合、join分析操作

    2. 应用利用ClickHouse强大的分析能力先进行数据初筛,生成数据量较小的有效数据,直接对数据规模较小的地理信息数据使用JTS工具进行二次几何函数计算然后生成最终处理结果

  4. 利用ElasticSearch强大的全文索引能力,应用可以直接对ElasticSearch中存储的地理信息数据进行全文检索

可以看到采用CloudCanal以后得现代化数据栈处理地理信息数据具有如下好处:

  • 可以应对复杂的业务查询需要,针对业务选用不同的新型数据库提升效率。

  • 应用可以直接使用分析引擎过滤出来的较小数据规模的地理信息数据进行几何函数计算,大大提升效率。

CloudCanal中对于地理信息数据友好兼容

表结构迁移

在使用 PostgreSQL 作为主库,ClickHouse 作为分析库的时候。第一个问题就是 ClickHouse 的建表,在没有 CloudCanal 工具,建表比较痛苦,使用 CloudCanal ,这个过程就会相当便利。

  • PostgreSQL 没有类似 MySQL show create table 的语句可以方便的获取到原始建表语句让我们参照,因此需要一张表一张表的去创建。

  • ClickHouse 的表字端类型和 PostgreSQL 的字端类型并不一致,还需要了解它们做针对的映射和转换。

即便是在 PostgreSQL 和 PostgreSQL 之间进行数据同步,还需要考虑一些问题

  • 带有 SRID 的 PostgreSQL 表结构迁移

这些问题通过使用 CloudCanal 解决,它会自动识别表的字段类型并且映射到适合的列上,这样就省了不少学习了解新数据库的时间。

同样 CloudCanal 就像 PostgreSQL 一样对 GIS 特性支持比较完整,它能够准确处理带有 SRID 的 PostgreSQL 表结构。如下表。

CREATE TABLE "city"
(
"ogc_fid" int4 NOT NULL,
"mssm" varchar(16),
"bz" varchar(16),
"provincen" varchar(50),
"provincec" varchar(50),
"cityn" varchar(50),
"cityc" varchar(50),
"shape_leng" float4,
"shape_area" float4,
"geom" geometry(geometry,4490) -- 带有 SRID 的列
);

数据迁移

CloudCanal支持将用户源端数据库中的地理信息相关数据完整迁移到对端异构数据源,并且支持断点续传。

在地理信息数据的迁移上,CloudCanal做了不少工作。当源端数据库是 PostgreSQL 时。全量数据同步过程会识别到表上的 SRID 信息,并将 PostgreSQL 使用 EWKB 格式转换为标准的 WKT 连同 SRID 一同作为最终数据。

  • 当对端是 ClickHouse 的时候我们可以得到完整的 GIS 地理信息数据以及对应的坐标系 SRID,在程序中可以进一步处理。

  • 当对端是 PostgreSQL 时也可以完整的将地理信息和坐标系同步到对端。

自定义处理

在地理信息数据从源端数据库迁移/同步到对端数据库过程中,通过 CloudCanal 自定义代码功能可以做一些非常灵活的加工操作。

用户可以自己实现自定义代码,在数据同步过程中针对每一条数据做一些额外的处理。比如:

  • 在处理 GIS 的应用中经常会用到求外切,得到几何图形的最大矩形区域。然后将这个矩形区域存储在一个新的字段中

  • 求 GIS 数据几何图形的中心点

  • 提前裁剪数据,将清洗好、裁剪好的规整数据写入对端新型数据库。

长周期的实时地理信息数据同步

CloudCanal不仅支持历史数据的迁移同时还支持异构数据源之间的实时数据同步。实时的地理信息数据同步能够加强企业在某些业务领域的竞争力。

在实时同步时,用户最关心的自然是长周期实时同步的稳定性。

在实际情况中为了保障业务运行中对于实时数据同步的稳定性 CloudCanal 采用了多种方式来实现。

  • 完全分布式:核心组件均支持分布式部署,避免单点故障

  • 容灾自动恢复:如果运行实时同步任务的机器Crash,CloudCanal会自动迁移这台机器上的实时同步任务到别的可用的机器继续实时化的同步

  • 实时同步断点续传:CloudCanal针对各种数据库源端类型都有设计专门的位点管理。当因故障产生任务重启,任务会在上一次中断的地方继续开始同步,避免数据丢失。

总结

在本文最初我们比较简略的介绍了地理信息数据相关的背景知识,文章后半段我们探讨了如何利用现代化的数据栈高效处理地理信息数据。

参考资料

  • 北斗卫星导航系统公开服务性能规范1.0 版
  • 习近平同尼泊尔总统班达里互致信函 共同宣布珠穆朗玛峰高程
  • “奋斗者”号全海深载人潜水器顺利完成万米深潜试验
  • 北斗卫星导航系统 公开服务性能规范
  • 高精度地图(一)——地理坐标系
  • The Home of Location Technology Innovation and Collaboration
  • MYSQL 8.0 中存储 GIS 数据的正确姿势

SQLServer 到 MySQL 数据同步 (一)

· 阅读需 4 分钟
Barry
Chief Technology Officer

简述

CloudCanal 2.1.0.x 版本开始支持 SQLServer 作为源端的数据迁移同步能力。

本文通过 SQLServer 到 MySQL 的数据迁移同步案例简要介绍这个源端的能力。链路特点:

  • 结构迁移、全量迁移、增量同步(数据)、数据校验俱全
  • 流程全自动化

技术点

SQLServer开CDC

SQLServer 开启 CDC (change data capture) 首先需要安装并启动 SQL Server Agent , 通过 agent 异步解析其变更日志写到 [目标db].cdc 的一系列表中。

安装并启动 Agent 后, 对 目标db 需要开启 CDC 能力。USE [目标db]切换到目标数据库,再执行EXEC sys.sp_cdc_enable_db;打开这个库的 CDC。 执行完毕后会在 [目标db].cdc schema下出现一系列表。

目标db 开启 CDC 后,还需要开启针对哪些表的 CDC, 命令为

EXEC sys.sp_cdc_enable_table   
@source_schema = N'dbo',
@source_name = N'worker_stats',
@role_name = NULL,
@supports_net_changes = 0;

默认会生成 source_shema_source_name_CT 的表,当然创建过程中也可以直接指定 capture_instance 指定这个表除 _CT 这个后缀之前部分的命名。

以上步骤运维稍显繁琐,但是使用 CloudCanal , 创建任务时都会自动准备好,前提是给到足够权限的账号。

增量 DDL 同步

DDL 对于数据同步的影响在于两方面,如果没有处理好,可能导致数据同步根本无法成立的致命问题。

第一,大部分数据库的增量日志都不带字段类型长度等属性,需要依赖从源端数据库独立获取的元数据进行日志解析,DDL 如果更改了解析日志相关元数据,则需要刷新,否则会导致字段不存在对不齐类型错误等问题,这个刷新需要严格按照事件变更顺序,一般和位点紧相关。

其次,对于上下游需要保证结构一致性的同步链路,DDL 变更则需要准确应用到下游数据库中,这中间步骤包括 DDL 获取、转换、执行等多个步骤,a 种源端 * b 种目标端 * c 种 DDL , 导致全数据库相互同步难度相当高,其中还伴随着 DDL 应用导致的链路延迟,应用失败导致位点回溯等棘手问题。

以上两点,第一点几乎无法回避,第二点可以通过预先做 DDL 执行解决,特别通过平台化方案自动关联执行,相对方便且合理。

CloudCanal 对 SQLServer DDL 同步解决了第一点,但是对于第二点,暂时没有解决,需要手动到对端进行 DDL 变更。

操作示例

前置条件:

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备好 SQLServer 数据库(本例版本为 2016)和 MySQL 数据库(本例版本为 8.0)
  • 在 SQLServer 的准备一些库表和数据(本例使用另外一个 MySQL 迁移同步数据到 SQLServer)

添加数据源

  • 登录 CloudCanal 平台
  • 数据源管理->新增数据源
  • 将源端SQLServer和目标端MySQL 分别添加 截屏2021-12-28 上午11.25.43.png

任务创建

  • 任务管理->任务创建
  • 选择 目标 数据源 截屏2021-12-28 下午12.12.48.png
  • 选择 数据同步,勾选 全量数据初始化, 勾选 DDL 不同步 截屏2021-12-28 下午12.14.38.png
  • 选择需要迁移同步的表
  • 选择列,默认全选
  • 确认创建 截屏2021-12-28 下午12.15.51.png
  • 查看异步任务,确认创建步骤正常 截屏2021-12-28 下午12.16.34.png
  • 任务自动运行 截屏2021-12-28 下午12.17.26.png

校验数据

  • 持续造增量数据,INSERT & UPDATE & DELETE 比例 2:7:1 截屏2021-12-28 下午12.18.12.png
  • 停止增量造数据
  • 创建校验任务,并校验任务结果 截屏2021-12-28 下午12.24.38.png

常见问题

是否支持 DDL 同步

暂时没有支持,不过 cdc schema下存在 ddl_history 表,可能可以结合 lsn 找到具体 DDL 语句,从而支持 DDL 同步。

是否支持其他目标端

最近会支持 Kafka , 其他数据源按具体需求逐步进行开放。

总结

本文简单介绍了如何使用 CloudCanal 进行 SQLServer 到 MySQL 的数据迁移同步。

SQLServer 到 MySQL 数据同步 (二)

· 阅读需 7 分钟
Barry
Chief Technology Officer

SQL Server 是一个值得信赖的老牌数据库系统,自从 1988 年由 Microsoft、Sybase 和 Ashton-Tate 三家公司共同推出之后就一直不断迭代更新。而如今我们提到 SQL Server 通常是指 Microsoft 从 SQL Server 2000 之后的版本。至今 SQL Server 家族已经非常繁茂涵盖了 云上(Azure SQL Server)、IoT 设备(边缘 SQL Server)、以及经典版本(本地 SQL Server)。

实现 SQL Server 作为源端的实时数据同步,一般都会用到它的 CDC 功能,这个功能是从 2008 版本才开始支持。

本文主要也是基于 SQL Server 2008 版本介绍如何使用 CloudCanal 快速构建一条稳定高效运行的 SQL ServerMySQL 数据同步链路。

技术点

基于 SQL Server 的 CDC

image.png SQL Server 将用户的每一个数据操作都记录在后缀为 ldf 日志文件中。这些日志会保存在 ldf 文件中。当数据库启用 CDC 能力后,SQL Server 代理上会生成一个专门分析ldf文件的作业,再将具体的表启用 CDC, 则该作业开始持续分析文件中的变更事件到指定的表中。

作业执行用到 SQL Server 代理,该组件如果处于非启动状态,则生成任何可消费的变更数据。通常,我们可以在 Windows 对象资源管理器中查看是否已经开启了 SQL Server 代理。

image.png

由于 SQL Server 执行作业时无法设置起始位置,因此对于一个表的变更记录我们最早只能追溯到表启用 CDC 的那个时间点。具体的起始位点可以在 “cdc.change_tables” 表中查询得到。

还需要注意的另外一个细节是 CDC 表也是一张普通的表它和用户共享同一个数据空间。为了防止 CDC 表数据无限膨胀 SQL Server 会每天定时执行清理作业,清理过期的数据(具体时间视数据库配置而定)。

SQL Server -> MySQL 的数据类型支持

CloudCanal 从 2021 年开始支持 SQL Server 同步后就不断地丰富它的对端数据源,支持 SQL Server 到 MySQL 是一个非常重要的同步链路。 目前 CloudCanal 已经可以支持的类型和映射关系如下:

SQL Server 类型MySQL 类型备注
BITBIT
DECIMALDECIMAL
NUMERICDECIMAL
SMALLINTSMALLINT
TINYINTTINYINT映射为 tinyint unsigned�
INTINT
BIGINTBIGINT
SMALLMONEYFLOAT
MONEYFLOAT
FLOATFLOAT
REALDOUBLE
DATEDATE
DATETIMEOFFSETDATETIME由于 MySQL 类型限制,会丢弃时区信息同时最多保留 6 位精度
DATETIME2DATETIME由于 MySQL 类型限制,会保留最多 6 位精度
SMALLDATETIMEDATETIME
DATETIMEDATETIME由于 MySQL 类型限制,会保留最多 6 位精度
TIMETIME由于 MySQL 类型限制,会保留最多 6 位精度
CHARCHAR
VARCHARVARCHAR源端 SQL Server 如果为 VARCHAR(MAX),则按照 TEXT 来处理
TEXTTEXT
NCHARCHAR
NVARCHARVARCHAR源端 SQL Server 如果为 NVARCHAR(MAX),则按照 NTEXT� 来处理
NTEXTTEXT
BINARYBINARY
VARBINARYVARBINARY源端 SQL Server 如果为 VARBINARY(MAX),则按照 IMAGE� 来处理
IMAGEBLOB
TIMESTAMPBIGINT会映射为 bigint unsigned
ROWVERSIONBIGINT会映射为 bigint unsigned
HIERARCHYID--暂不支持
UNIQUEIDENTIFIERVARCHAR(36)
SQL_VARIANT--暂不支持
XMLTEXT
GEOMETRY--暂不支持
GEOGRAPHY--暂不支持
SYSNAMEVARCHAR(128)

操作示例

前置条件

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档
  • 准备一个 SQL Server 数据库,和 MySQL 实例(本例分别使用自建 SQL Server 2008 和 MySQL 8.0)
  • 登录 CloudCanal 平台 ,添加 SQL Server 和 MySQL

image.png

  • 创建一条 SQL Server -> MySQL 链路作为增量数据来源

任务创建

  • 任务管理-> 任务创建
  • 测试链接并选择 目标 数据库
  • 点击下一步

image.png

  • 选择 数据同步,并勾选 全量数据初始化,其他选项默认

image.png

  • 此时如果 SQL Server 上数据库还没有启用 CDC 功能,则会在点击下一步的时候提示如何启用 CDC。只要按照提示的参考语句执行即可。

image.png

  • 选择需要迁移同步的

image.png image.png

  • 确认创建任务

image.png

  • 任务自动做结构迁移全量迁移增量同步

image.png

校验数据

  • 程序造数据, SQL Server -> MySQL,在源端以 1:1:1 的比例随机执行Insert、Update、Delete三种类型语句。使用20个线程并发写入变更。
    image.png
  • 任务正常运行一段时间后,停止造数据
  • 点击 SQLServer -> MySQL 任务详情功能列表 -> 创建相似任务,在创建任务的第二步选择数据校验

image.png

  • 数据校验 OK
    • 下面这个是校验结果。如果我们对端和源端一旦出现数据不一致就会像下面这样非常醒目的提示给用户,有多少数据不一致,有多少数据丢失。

image.png

常见问题

支持什么版本的 SQL Server 和 MySQL ?

  • 目前源端 SQL Server 2008 及以上版本皆可使用 CloudCanal 进行迁移同步(推荐使用 SQL Server 2016 或 SQL Server 2008)
  • 对端 MySQL 支持 5.6、5.7、8.0 版本,也可以选用 阿里云 RDS for MySQL 对应的版本,或者其它云服务商的 MySQL 版本

数据不同步了都有哪些情况?

  • SQL Server CDC 需要依赖 SQL Server 代理,首先要确定 SQL Server 代理服务是否启动
  • 表在启动 CDC 的时候会确定要捕获的列清单,此时如果修改列的类型可能会导致 CDC 中断。目前解决办法只能重建任务。
  • 增/减 同一个列名的列,对一个列删除后在增加。虽然 CDC 表中字段依然存在但是也会导致整个 CDC 中断。

什么情况下会影响稳定的数据同步?

  • 如果任务在同步期间出现了异常导致任务延迟。这时候需要格外注意,如果过长时间的延迟,即便是修复了延迟的问题(比如对端数据库长时间出现不可用)在后续数据同步上也可能存在丢失数据的风险。
  • SQL Server 为了防止 CDC 表数据无限膨胀 SQL Server 会每天定时执行清理作业,清理超过 3天的数据。
  • 为了增加延迟的容忍度可以执行这条 SQL 来增加 CDC 数据的保存时间,代价是这些数据需要存放到数据库表中,如果每日数据变更很多对磁盘开销会有额外的要求。
    • execute sys.sp_cdc_change_job @job_type = n'cleanup', @retention = 4320
    • msdb.dbo.cdc_jobs 表中保存了具体 捕获任务的数据保存时间。

总结

本文简单介绍了如何使用 CloudCanal 进行 SQL Server -> MySQL 数据迁移同步。各位读者朋友,如果你觉得还不错,请点赞、评论加转发吧。

ORACLE 到 MySQL 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

CloudCanal 2.1.0.x 版本开始支持 Oracle 作为源端的数据迁移同步能力。

本文通过 Oracle 到 MySQL 的数据迁移同步案例简要介绍这个源端的能力。链路特点:

结构迁移、全量迁移、增量同步(数据)、数据校验俱全流程全自动化

此文章简要介绍如何快速构建一条长期稳定运行的 Oracle->MySQL 数据链路。

技术要点

将数据从 Oracle 中同步出来有两种方式可以选择

  • 物化视图日志
  • 使用 Redo 日志

权限问题

请确保添加的数据源账号可以访问如下 13 张表 ,或者使用一个具有 DBA 权限的 Oracle 账号。

  • 表 SYS.DBA_USERS
  • 表 SYS.DBA_TABLES
  • 表 SYS.DBA_TAB_COLS
  • 表 SYS.DBA_TAB_COMMENTS
  • 表 SYS.DBA_COL_COMMENTS
  • 表 SYS.PRODUCT_COMPONENT_VERSION
  • 表 SYS.DBA_CONSTRAINTS
  • 表 SYS.DBA_CONS_COLUMNS
  • 表 SYS.DBA_INDEXES
  • 表 SYS.DBA_IND_COLUMNS
  • 表 v$version
  • 表 v$database
  • 表 v$tablespace

对于物化视图方案来讲需要有额外的下列权限

  • 语句 CREATE MATERIALIZED VIEW LOG ON xxx
  • 语句 CREATE INDEX xxxx

对于 Redo 方案来将需要有 LOGMNR 相关的权限

  • 表 SYS.ALL_LOG_GROUPS
  • 表 v$logfile
  • 表 v$log
  • 表 v$archived_log
  • 表 v$logmnr_logs
  • 存储过程 SYS.DBMS_LOGMNR_D.BUILD
  • 存储过程 SYS.DBMS_LOGMNR.ADD_LOGFILE
  • 存储过程 SYS.DBMS_LOGMNR.START_LOGMNR
  • 存储过程 SYS.DBMS_LOGMNR.END_LOGMNR
  • 语句 ALTER TABLE xxxx DROP SUPPLEMENTAL LOG xxx
  • 语句 ALTER TABLE xxxx ADD SUPPLEMENTAL LOG xxx
  • 语句 ALTER SYSTEM ARCHIVE LOG CURRENT

在配置同步任务之前需要确保上面的 Oracle 权限,另外作为源端 Oracle 全量阶段还需要读取对应表的权限。

操作示例

准备 CloudCanal

添加数据源

  • 登录 CloudCanal 平台

  • 数据源管理 -> 添加数据源

  • 选择 自建数据源 ,并填写相关数据库信息,其中 网络地址 请按提示带上端口号 9d87303b-45dd-4bd1-bd73-f053b59046cf-image.png

  • 如下已添加完 Oracle 和 MySQL a718b8f7-7961-4a1d-bb41-445ba81c0037-image.png

创建同步任务

  • 任务管理->新建任务

  • 源端选择刚添加的 Oracle 数据源,目标选择 MySQL, 分别点击 测试连接 按钮以测试数据库连通性和获取 schema 级别元信息

  • 点击下一步 1a3d90f2-4efb-49d8-b579-8919e9debca6-image.png

  • 选择 数据同步,并且勾选全量数据初始化

  • 规格可以根据任务重要度以及部署机器的内存容量合理选择,一般 2GB 内存规格即可

  • 点击下一步 a3722630-7625-4485-ad56-3340d3fd904d-image.png

  • 勾选需要同步的表,如果目标表为橙色,表示不存在同名表,任务创建完成后自动进行结构迁移。也可以下拉框选择表进行映射

  • 勾选需要同步的 INSERT/UPDATE/DELETE 操作,默认全选

  • 点击下一步 c70a7c0f-cbd9-4f6a-849c-21c607cf769e-image.png

  • 通过勾选做列映射列裁剪

  • 点击下一步 d20d2906-6ba5-4fea-bd43-f2033aa3509e-image.png

  • 对任务内容进行创建 ,如果任务不需要立刻运行 , 可置灰自动启动任务 按钮

  • 点击确认创建 77f49016-0cec-4d33-93d1-a94b741d788f-image.png

任务同步

  • 任务分为 3 个阶段:结构迁移数据初始化数据同步,每一个阶段完成时,状态自动流转,直到同步稳态 afd38301-a854-4543-b814-8782ed1c094d-image.png c2772d7c-2ab7-470c-8551-280f723221ad-image.png
    • 结构迁移:当对端数据库不存在对应的库表时 CloudCanal 会自动将 Oracle 的表在对端创建出来
    • 数据初始化:将源端所选库表数据以全量迁移方式搬迁到对端
    • 数据同步:准实时的同步增量数据,即源端数据库上发生的增、删、改操作同步到对端数据库上

FAQ

目前 Oracle 源端还支持哪些数据源?

除了 Oracle 到 MySQL 之外,截止社区版 2.0.1.1 版本,还支持下面这些链路

  • Oracle -> PostgreSQL
  • Oracle -> Greenplum
  • Oracle -> TiDB
  • Oracle -> Oracle
  • Oracle -> Kudu 。

预检失败会有哪些影响?

一些小伙伴可能在创建任务的时候遇到类似如下报错信息,可能会有一些疑惑。 8f3f8d80-437a-4ce9-b3d6-450a0f5e0466-image.png 在创建任务的最后阶段我们会进行一些检测,Oracle 作为源端会存在如下一些检测项目。

物化视图模式下

  • 如果表已经创建了物化视图日志表那么预检失败。

Redo 模式下

  • 开启日志归档模式 alter database archivelog 开启过程需要数据库离线。
  • 需要开启最小补全日志 alter database add supplemental log data

总结

本文简单介绍了如何使用 CloudCanal 快速构建Oracle-> MySQL 数据迁移同步链路,更多的源端和目标端陆续开放。

MySQL 到 Elasticsearch 数据同步

· 阅读需 5 分钟
Barry
Chief Technology Officer

简述

本文介绍如何通过 CloudCanal ,五分钟内创建一条长期稳定运行的 MySQL -> Elasticsearch (以下简称 ES) 实时数据迁移同步链路 。

技术内幕

限流

MySQL 到 ES 数据迁移同步过程中,往往会面临源端写入对端 RPS 较大问题,导致 ES 负载较大,影响业务对 ES 的正常读写。CloudCanal 为了应对这个情况,提供限流能力。同步任务创建完毕后,可在 任务详情 -> 参数设置 对源端流量进行限流。 8ab5e2b2-3a48-4042-b53b-1e469734b157-image.png

时区处理

CloudCanal 允许用户在创建数据迁移同步任务时指定时区。写入ES 时,源端时间类型数据将会格式化并带上时区信息 , 支持用户在跨国、跨地域场景下使用。

自动创建索引和 Mapping 结构

CloudCanal 迁移同步任务支持自动将源端数据库表结构映射成 ES 索引,该过程中允许用户在 列(column/field) 级别上,个性化设置自己需要的索引和 Mapping 结构。这些设置包括:

  • 每个列可以指定是否需要索引
  • 可以对 text 类型的 field 设置 ES mapping 中的分词器(标准分词器)
  • 索引分片数、副本数自定义设置

映射已建索引

用户可能已经在 ES 中提前建好了索引,这种情况下 CloudCanal 会自动探测,并允许用户配置映射,一张表可映射对端一个索引。

类型支持

不支持父子文档join,支持NESTED/OBJECT。NESTED有更好的索引性能,建议用NESTED,避免ES侧join

支持用户自定义分词器

映射用户已经创建的索引时候,支持自定义分词器

内置 _id 生成和 routing field 指定

写入 ES 时候 _id 用于唯一标识一个 doc。CloudCanal 数据同步默认遵循以下原则:

  • routing 使用 _id 值
  • 单主键表,会默认使用源端关系表的主键列的列值作为 _id 的值
  • 多主键表,会通过分隔符$连接多个主键列的值,组成唯一的 _id 值
  • 无主键表,会将所有列的值通过$连接,生成唯一的 _id 值

举个"栗子"

准备 CloudCanal

添加数据源

  • CloudCanal 支持 6.8 及以上版本 ES,我们点击 数据源管理->新增数据源 添加 ES 数据源 1048936c-46bb-4495-b33f-6bbdece681ba-image.png

  • 填写必要 host 信息后点击 新增数据源 2c28c66a-6392-4994-a98d-e4dd3a672c98-image.png

创建任务

  • 点击任务管理,选择创建任务 8637011b-9477-45d1-b9c5-f2e166bed7d0-image.png

数据源设置

  • 勾选源端和目标端数据库,并且选择相应的数据库 deb89653-e04d-430a-b6c6-1a27d0d785df-image.png

功能配置

  • 选择数据同步,并勾选数据初始化(带全量迁移) 450e225c-db1b-477f-8a81-bd5c5e8ea87e-image.png

表&action过滤

  • 此处可以进行的操作主要是:

    • 勾选需要订阅的表
    • 选择需要映射的索引(支持映射已经存在的索引)
    • 勾选 IUD 过滤
    • 批量设置分片数
    信息

    CloudCanal 的结构迁移支持自动帮用户按照源端表结构创建索引。

    cbd229cc-2803-482a-bddc-26048f392571-image.png

数据处理

  • 本页面提供的主要能力有:

    • 列裁剪设置(包括批量筛选和设置)
    • 设置源端where过滤条件
    • 索引设置
    • 分词器设置
    • 列映射(如果同步的是已经存在的索引,支持列映射)

    c250f412-12c1-45bd-b1a2-bbf77a0b152c-image.png

创建确认

  • 最后一步,确认创建内容无误后点击确认创建。 e308cb9d-0016-41d1-982a-84f742b4a12f-image.png

查看任务状态

  • 回到 CloudCanal 控制台,刷新并查看任务实时状态,从结构迁移、数据初始化,到数据同步。 253e90e4-6d8e-41c4-b421-eb95a0f73d9f-image.png

  • 登录 ES Kibana 控制台,查看迁移同步过去的结构和数据。 9ddba516-afa8-4d9b-aeec-f64a1485f3ef-image.png

总结

本文简单介绍了如何使用 CloudCanal 快速构建 MySQL->Elasticsearch 数据迁移同步链路,更多的源端和目标端陆续开放。

MySQL 到 ClickHouse 宽表构建

· 阅读需 6 分钟
Barry
Chief Technology Officer

简述

本文简要介绍 CloudCanal 如何支持 MySQL -> ClickHouse 的宽表构建。

技术点

ClickHouse 表关联之觞

ClickHouse 作为标准的列存数据库,其特点相当鲜明,对于多维度数据聚合、筛选特别高效,对于列存面向计算的特点,用得相当不错,包括但不限于以下特点

  • io 效率高
  • 列压缩
  • 少数列数据存取io放大效应较小
  • 极致计算优化
    • 向量化
    • 利用 SSE 等 SIMD 指令集加速
    • 未来可选 AVX 512 等指令集优化
    • 未来对于计算卸载到 FPGA、GPU 较便利

但是 ClickHouse 对于数据关联(join), 相比于其 多维聚合筛选 能力要弱一些。对于这个问题,我们觉得有必要通过 CloudCanal 的宽表能力,让其适用性得到进一步提升。大宽表 + 突出的数据 多维聚合筛选 能力,几乎等于交互式分析的杀手锏。

操作示例

前置条件:

  • 下载安装 CloudCanal 私有部署版本,使用参见快速上手文档

  • 准备好 MySQL 数据库(本例使用 5.7 版本)和 ClickHouse 数据库(本例使用 21.8.X 版本)

  • MySQL 上创建 1 张事实表(my_order)和 2 张维表 (user 、product)

     CREATE TABLE `my_order` (
    `id` bigint(19) NOT NULL AUTO_INCREMENT,
    `gmt_create` datetime NOT NULL,
    `gmt_modified` datetime NOT NULL,
    `product_id` bigint(20) NOT NULL,
    `user_id` bigint(20) NOT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1460 DEFAULT CHARSET=utf8;

    CREATE TABLE `product` (
    `id` bigint(19) NOT NULL AUTO_INCREMENT,
    `gmt_create` datetime NOT NULL,
    `gmt_modified` datetime NOT NULL,
    `name` varchar(255) NOT NULL,
    `price` decimal(20,2) NOT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2719 DEFAULT CHARSET=utf8;

    CREATE TABLE `user` (
    `id` bigint(19) NOT NULL AUTO_INCREMENT,
    `gmt_create` datetime NOT NULL,
    `gmt_modified` datetime NOT NULL,
    `name` varchar(255) NOT NULL,
    `level` varchar(255) NOT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2224 DEFAULT CHARSET=utf8
  • ClickHouse 上创建 1 张宽表 my_order , 并额外包含两张维表相关数据

    • user_id (关联user.id), user_name(对应user.name)
    • product_id(关联product.id) ,product_name(对应product.name),product_price (对应product.price)
    CREATE TABLE trade.my_order
    (
    `id` Int64,
    `gmt_create` DateTime,
    `gmt_modified` DateTime,
    `product_id` Int64,
    `user_id` Int64,
    `user_name` Nullable(String),
    `product_name` Nullable(String),
    `product_price` Nullable(Decimal(20, 2))
    )
    ENGINE = ReplacingMergeTree
    ORDER BY id
    SETTINGS index_granularity = 8192

开发宽表代码

打包

  • 进入工程目录,使用命令进行打包
    % pwd
    /Users/zylicfc/source/product/cloudcanal/cloudcanal-data-process
    % mvn -Dtest -DfailIfNoTests=false -Dmaven.javadoc.skip=true -Dmaven.compile.fork=true clean package

自定义代码包

  • 打包命令后,代码包位于工程目录下的 wide-table/target 目录 截屏2021-12-10 下午12.30.39.png

添加数据源

  • 登录 CloudCanal 平台
  • 数据源管理->新增数据源
  • MySQLClickHouse 分别添加 截屏2021-12-16 下午6.33.14.png

任务创建

  • 任务管理->任务创建
  • 选择 目标 数据源
  • 选择 数据同步,并勾选 全量数据初始化, 其他选项默认
  • 选择需要迁移同步的表, 此处只要选择事实表即可,维表会通过自定义代码反查补充 截屏2021-12-16 下午7.07.18.png
  • 选择列,默认全选,选择上传代码包(路径如上所示) 截屏2021-12-16 下午7.07.45.png
  • 确认创建,并自动运行 截屏2021-12-16 下午7.12.16.png

校验数据

  • 变更事实表数据 截屏2021-12-16 下午7.08.18.png 截屏2021-12-16 下午7.08.50.png
  • 变更维表数据 截屏2021-12-16 下午7.09.17.png 截屏2021-12-16 下午7.09.42.png

数据变化规律

  • 事实表插入,更新都会反查维表最新数据并写入对端
  • 维表更新,需要触发事实表更新才能带上最新的维表变更数据写入对端
  • 维表数据删除,如果触发事实表更新,默认将会把对应的维表数据(已删除)置为null

常见问题

维表变化后怎么办?

维表变化不会直接触发事实表更新。需要源端触发事实表更新(比如变更一个时间字段),带上最新的维表数据进行对端数据刷新。

另外对于维表数据的删除,如果触发事实表更新从而刷新对端数据,则默认置为null。

不会开发 java 代码怎么办?

如果能打包不会 java 开发,在 cloudcanal-data-process 寻找相应模版,修改配置即可。

如果不能打包也不会开发,找 CloudCanal 同学协助。

如果遇到出错或者问题怎么办?

如果会 java 开发,建议打开任务的 printCustomCodeDebugLog 观察输出的数据是否符合预期,如果不符合预期,可以打开任务的 debugMode 参数,对数据转换逻辑进行调试。

如果不会 java 开发, 找 CloudCanal 同学协助。

还支持其他数据源么?

这个是 CloudCanal 通用能力,只要源和目标之间实现了全量迁移和增量同步,即支持。

总结

本文简单介绍了如何使用 CloudCanal 进行 MySQL -> ClickHouse 的宽表构建,以最常见的单事实表多维表方式举例。各位读者朋友,如果你觉得还不错,请点赞、评论加转发吧。

CloudCanal 产品化之道

· 阅读需 7 分钟
Barry
Chief Technology Officer

前言

CloudCanal 是一款由ClouGence公司发行的集结构迁移、数据全量迁移/校验/订正、增量实时同步为一体的数据迁移同步平台。产品包含完整的产品化能力,助力企业打破数据孤岛、完成数据互融互通,从而更好的使用数据。CloudCanal社区版为免费版本,我们会持续地对其维护,请大家放心下载使用。如有兴趣使用请参考文章底部相关资料。

数据同步产品的透明化、可视化运维

数据同步产品是企业的基础设施产品,其透明化、可视化程度与产品的运维、使用效率息息相关。

透明化

透明化指产品内部本身对于用户来说不完全是一个黑盒子,产品内部核心构件以产品化的形式向用户透明,使得用户可以介入内部构件的管理,提供更加精细化的产品控制。

对于数据同步这类基础设施产品,透明化会显得更为重要。数据同步产品由于用户的数据源、数据负载、数据处理需求、机器环境等差异,用户在实际使用、运维中往往需要有对产品更强的干预能力,从而更好的满足自身的场景需求。在数据同步领域,这类干预诉求主要体现在如下几个方面:

  • 源、目标核心组件透明化:用户能够对源端的读取器和对端的写入器有更加精细的控制。例如对于源端可以配置限流、批大小、解析线程数等。
  • 内部组件异常透明化:在私有部署的情况下,数据同步产品的数据源、机器等等都是用户添加的。场景本身的复杂性,不可避免的导致一些由于使用、配置不当而产生的异常。内部组件异常透明化,可以使得用户更加准确、及时的发现问题、解决问题。例如数据同步过程中,数据源突然下线,通过透明化的日志即可看到数据源Connection refused异常,这样,可以快速将问题原因锁定在数据源无法访问这点上。
  • 运行时信息透明化:丰富的运行时信息能够使得出现问题时,提供更多诊断信息,快速定为问题原因。

可视化运维

可视化运维主要体现在产品功能各个维度的可视化成熟度。更高的可视化程度,会带来更好的易用性和产品体验。

CloudCanal透明化、可视化运维

核心组件透明可视化

CloudCanal核心组件主要包含:

  • 控制台(Console): 管控进程,负责产品化能力
  • Sidecar组件: 机器保姆进程,负责task和console之间信息转发以及task的可用性保障
  • 任务内核(Task): 具体迁移、同步任务的执行

其中针对Console组件,其内部还包含以下组件:

  • 异步任务工作流:数据同步任务的创建过程比较复杂,CloudCanal通过异步工作流来完成。
  • 状态机:状态机主要负责任务状态的切换,主要是结构迁移、全量、增量之间的阶段切换

在CloudCanal中,这些核心组件对用户都是透明的,用户可以通过可视化的界面对这些内部组件进行精细化的控制。 2919b29a-cff1-4a20-bffa-37df4056989d-image.png

Sidecar

在机器管理页面可以对Sidecar组件进行控制和检查,包含:

  • 生命周期管理
  • 远程日志查看
  • 详细机器监控(点击圆盘可以进入详细)

e5f11662-9607-42bc-b6e9-3bb07f12a22b-image.png

Console

异步任务工作流

数据同步产品中任务创建是个复杂的流程,往往设计多个子任务。CloudCanal的console内置的异步任务工作流会按顺序执行子任务。针对失败的子任务,我们可以看到具体失败的步骤,以及错误信息,便于运维同学定位问题。 138c8ba7-f494-4fc7-86d6-ca3630641fa8-image.png 6a1d90f8-2125-46ca-aa79-193c11378591-image.png

状态机

每个任务都有其关联的状态机,负责任务的阶段流转。状态的流转由条件触发器触发。 e16fa316-cf69-48c7-b0c1-bd2d17428424-image.png c5510305-3c92-452c-8ff7-4a16edba18ee-image.png

Task

任务详情

任务列表页提供了任务的列表信息和基本的生命周期控制能力以及进度查看。 bda0e5dc-698d-41f7-b260-791429684c9f-image.png

任务详情页面提供了任务完整的详情信息,包括:

  • 生命周期控制
  • 订阅关系查看(库表映射)
  • 源端、目标端数据源详细信息
  • 各阶段同步细节
  • 任务白屏化日志
  • 关联的绑定机器信息
  • 同步进度、位点细节

4a917e2c-6115-4bd5-a1fa-cd31c8d5d468-image.png

任务内核参数

除任务详情的透明可视化之外,任务内核参数是CloudCanal对于任务精细化控制的重要能力。任务详情中的参数修改支持对任务内核进行更加精细化的控制,以目标端参数配置为例,我们可以控制例如:

  • 异常跳过策略
  • 并行度
  • 约束冲突时的处理策略
  • 对端RPS限流
  • 大小写策略

a1c07ab4-5f60-4986-bbb6-452bb04b519d-image.png

内部组件异常透明化

CloudCanal的管控会搜集所有的异常日志,并且可视化的在控制台展示。核心组件的日志均会分类展示,便于用户快速查看日志和定位问题。CloudCanal会完整毫无遗漏的搜集所有运行时的异常,这也使得一些在日志中隐秘的、偶发的问题直接暴露出来。这些信息都会指导CloudCanal后续的研发,确保产品步步为营、高质量地去迭代升级。 7ac8c7db-ca45-438b-8dad-6c7e279c423f-image.png

运行时信息透明化

CloudCanal的运行时信息透明化主要体现在如下两个维度:

  • 监控
  • 日志

监控

CloudCanal在控制台上即可查看所有组件的监控信息。每个核心组件的监控下按照不同维度会细分多张图表,让用户对核心组件有着完全的掌控。例如增量源端的监控,我们监控会细致到内存队列的阻塞时间、每秒flush事务数等指标。 ac578344-579e-4061-acbc-111c96a0626a-image.png

日志

在CloudCanal平台上提供了核心组件的白屏化日志,并且对日志均按照功能进行了划分。

机器日志

在机器管理处,查看机器日志,我们可以查看机器的完整日志、慢通信日志、异常日志等信息。 aaf203f7-f78f-4bf0-8224-097e84aecb06-image.png

任务日志

在任务详情页,用户则可以查看实时的任务日志,确认任务实时运行状态或者确认问题。 12c847fc-c75b-4e09-9cfa-a807791498fa-image.png

总结

作为面向技术、运维人员的一款数据基础设施产品,在设计之初考虑产品层面的透明化、可视化运维是尤其重要的。这使得产品在后续功能变得越来越复杂和强大时,产品本身依然能够提供高质量的可运维性,同时也确保产品本身能够更好的迭代和发展。