跳到主要内容

MQ 消息同步格式说明

CloudCanal 支持选择同步到 MQ 的消息存储格式,本文介绍 MQ 多种消息格式的定义说明,方便下游消费和使用。

消息格式介绍

支持的消息同步格式

  • CloudCanal Json:CloudCanal 默认的消息格式,解析数据库增量日志传输至 Kafka,支持批量消息传输。
  • Canal Json:对于 Canal 的兼容格式,数据存储格式为 Canal Json。
  • Aliyun DTS Avro:一种数据序列化格式,可以将数据结构或对象转化成便于存储或传输的格式。
  • Debezium Envelope:Debezium 官方的 CDC 消息格式,携带 SCHEMA 信息,对大数据下游消费友好。

目标端 MQ 支持情况

消息格式KafkaRocketMQRabbitMQ
CloudCanal Json支持支持支持
Canal Json支持支持支持
Aliyun DTS Avro支持--
Debezium Envelope支持--

源端 MQ 支持情况

消息格式KafkaRocketMQRabbitMQ
CloudCanal Json支持支持支持
Canal Json支持支持支持
Aliyun DTS Avro---
Debezium Envelope支持--

消息格式具体说明

CloudCanal Json

参数说明:

参数类型说明
actionString操作的类型,如:INSERT / UPDATE / DELETE。
bidLongBatchEventBuffer 的 Batch Id。
beforeList变更前的数据。
dataList当前操作的数据。
dbString数据库名称。
schemaStringSCHEMA 名称。
tableString表名。
dbValTypeMap字段数据类型名称。
jdbcTypeMap字段 JDBC 数据类型
entryTypeString源端事件类型,如:ROWDATA / TRANSACTIONEND。
isDdlBoolean是否为 DDL 操作。
pksList源端主键名称。
execTsLong源端 SQL 执行的时间,13位Unix时间戳,单位为毫秒。
sendTsLong操作发送的时间,13 位 Unix 时间戳,单位为毫秒。
sqlString源端执行的 DDL 语句。
tableChangesJson该消息为 DDL 时,携带的该表的元信息,如:主键,列。

DML操作示例如下:

{
    "action":"INSERT/DELETE/UPDATE",
    "before":[
      // UPDATE 的 before 字段
      {
        "col1":"22",
        "col2":"22",
        "col_pk":"22"
      }
    ],
    "bid":0,
    "data":[
      {
        "col1":"11",
        "col2":"11",
        "col_pk":"11"
      }
    ],
    "db":"db_test",
    "dbValType":{
        "col1":"varchar(22)",
        "col2":"varchar(22)",
        "col_pk":"varchar(22)"
    },
    "isDdl":false,
    "entryType":"ROWDATA",
    "execTs":1669789152000,
    "jdbcType":{
        "col1":12,
        "col2":12,
        "col_pk":12
    },
    "pks":[],
    "schema":"db_test",
    "sendTs":1669789153377,
    "sql":"",
    "table":"table_test"
}

DDL操作示例如下:

{
    "action":"ALTER",
    "before":[],
    "bid":0,
    "data":[],
    "db":"db_test",
    "dbValType":{
        "col1":"varchar(22)",
        "col2":"varchar(22)",
        "col_pk":"varchar(22)"
    },
    "isDdl":true,
    "entryType":"ROWDATA",
    "execTs":1669789188000,
    "jdbcType":{
        "col1":12,
        "col2":12,
        "col_pk":12
    },
    "pks":[],
    "schema":"db_test",
    "sendTs":1669789189533,
    "sql":"alter table table_test add col2 varchar(22) null",
    "table":"table_test",
    "tableChanges":{
        "table":{
            "columns":[
                {
                    "jdbcType":12, // jdbc 类型。
                    "name":"col1",    // 字段名称。
                    "position":0,  // 字段的顺序。
                    "typeExpression":"varchar(22)", // 类型描述。
                    "typeName":"varchar" // 类型名称。
                },
                {
                    "jdbcType":12,
                    "name":"col2",
                    "position":1, 
                    "typeExpression":"varchar(22)", 
                    "typeName":"varchar" 
                },
                {
                    "jdbcType":12, 
                    "name":"col_pk",   
                    "position":2,  
                    "typeExpression":"varchar(22)", 
                    "typeName":"varchar" 
                }
            ],
            "primaryKeyColumnNames":["col_pk"] // 主键名列表。
        },
        "type":"ALTER"
    }
}

Canal Json

参数说明:

参数类型说明
typeString操作的类型,如:INSERT / UPDATE / DELETE。
idLong操作的序列号。
oldList变更前的数据。
dataList当前操作的数据。
databaseString数据库名称。
tableString表名。
mysqlTypeMap字段数据类型名称。
sqlTypeMap字段 JDBC 数据类型
isDdlBoolean是否为 DDL 操作。
pkNamesList源端主键名称。
esLong源端 SQL 执行的时间,13位Unix时间戳,单位为毫秒。
tsLong操作发送的时间,13 位 Unix 时间戳,单位为毫秒。
sqlString源端执行的 DDL 语句。
tableChangesJson该消息为 DDL 时,携带的该表的元信息,如:主键,列。

DML操作示例如下:

{
    "data":[
      {
        "col1":"11",
        "col2":"11",
        "col_pk":"11"
      }
    ],
    "database":"db_test",
    "es":1669790847000,
    "id":0,
    "isDdl":false,
    "mysqlType":{
        "col1":"varchar(22)",
        "col2":"varchar(22)",
        "col_pk":"varchar(22)"
    },
    "old":[
      // UPDATE 的 old 字段
      {
        "col1":"22",
        "col2":"22",
        "col_pk":"22"
      }
    ],
    "pkNames":["col_pk"],
    "sql":"",
    "sqlType":{
        "col1":12,
        "col2":12,
        "col_pk":12
    },
    "table":"table_test",
    "ts":1669790848072,
    "type":"INSERT/DELETE/UPDATE"
}

DDL操作示例如下:

{
    "data":[],
    "database":"db_test",
    "es":1669790951000,
    "id":0,
    "isDdl":true,
    "mysqlType":{
        "col1":"varchar(22)",
        "col2":"varchar(22)",
        "col_pk":"varchar(22)"
    },
    "old":[],
    "pkNames":[],
    "sql":"alter table table_test add col2 varchar(22) null",
    "sqlType":{
        "col1":12,
        "col2":12,
        "col_pk":12
    },
    "table":"table_test",
    "tableChanges":{
        "table":{
            "columns":[
                {
                    "jdbcType":12, // jdbc 类型。
                    "name":"col1", // 字段名称。
                    "position":0,  // 字段的顺序。
                    "typeExpression":"varchar(22)", // 类型描述。
                    "typeName":"varchar" // 类型名称。
                },
                {
                    "jdbcType":12,
                    "name":"col2",
                    "position":1,
                    "typeExpression":"varchar(22)",
                    "typeName":"varchar"
                },
                {
                    "jdbcType":12,
                    "name":"col_pk",
                    "position":2,
                    "typeExpression":"varchar(22)",
                    "typeName":"varchar"
                }
            ],
            "primaryKeyColumnNames":["col_pk"] // 主键名列表。
        },
        "type":"ALTER"
    },
    "ts":1669790952584,
    "type":"ALTER"
}

Aliyun DTS Avro

该消息类型需要根据 DTS Avro 的 SCHEMA 定义进行数据解析,DTS Avro 定义详情参见 DTS Avro 的 SCHEMA 定义

Debezium Envelope

该消息类型主要由 SCHEMA 和 PAYLOAD 构成,SCHEMA 是数据的元信息,PADYLOAD 是记录数据变化的内容。

SCHEMA 定义详情参见 Debezium 官方文档

Kafka 源端使用该消息格式,参见:源端 Kafka Debezium Json 使用说明

参数说明:

参数类型说明
opString操作的类型,如:c(INSERT),u(UPDATE),d(DELETE),a(ALTER)。
ts_msLong操作发送的时间,13 位 Unix 时间戳,单位为毫秒。
afterJson变更前的数据。
beforeJson变更后的数据。
sourceJson事件的元信息,如:db,table。
ddlString源端执行的 DDL 语句。
tableChangesJson该消息为 DDL 时,携带的该表的元信息,如:主键,列。

DML操作示例如下:

{
  "schema":...,
  "payload":{
    "op":"c",
    "ts_ms":1669796261933,
    "after":{
      "col1":"11",
      "col2":"11",
      "col_pk":"11"
    },
    "before":{},
    "source":{
      "ts_ms":1669796261933,
      "db":"db_test",
      "table":"table_test",
      "connector":"MySQL",
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "server_id": 223344,
      ...
    }
  }
}

DDL操作示例如下:

{
  "schema":...,
  "payload":{
    "databaseName":"db_test",
    "ddl":"alter table table_test add col2 varchar(22) null",
    "ts_ms":1669797213247,
    "source":{
      "ts_ms":1669796261933,
      "db":"db_test",
      "table":"table_test",
      "connector":"MySQL",
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "server_id": 223344,
      ...
    },
    "tableChanges":{
      "type":"ALTER",
      "table":{
        "columns":[
          {
            "jdbcType":12, // jdbc 类型。
            "name":"col1",    // 字段名称。
            "position":0,  // 字段的顺序。
            "typeExpression":"varchar(22)", // 类型描述。
            "typeName":"varchar" // 类型名称。
          },
          {
            "jdbcType":12,
            "name":"col2",
            "position":1,
            "typeExpression":"varchar(22)",
            "typeName":"varchar"
          },
          {
            "jdbcType":12,
            "name":"col_pk",
            "position":2,
            "typeExpression":"varchar(22)",
            "typeName":"varchar"
          }
        ],
        "primaryKeyColumnNames":["col_pk"], // 主键名列表。
      }
    }
  }
}