Canal原理、安装和测试

Canal 是阿里巴巴开源的一款分布式增量数据同步工具,主要用于基于 MySQL 数据库的增量日志 Binlog 解析,提供增量数据的订阅和消费。

Canal github 地址:https://github.com/alibaba/canal,目前最新版本为 1.1.8,当前的 canal 支持源端 MySQL 版本包括 5.1.x、5.5.x 、5.6.x、5.7.x、8.0.x。

Canal 1.1.1版本之后,默认支持将canal server接收到的binlog数据直接投递到MQ。目前默认支持的MQ系统有:

Canal 原理

Canal 通过模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向 MySQL master 发送 dump 协议。MySQL master 收到 dump 请求后,开始推送 binary log 给 slave(即 Canal)。Canal 解析 binary log 对象(原始为 byte 流),从而实现增量数据的订阅和消费。

Canal 架构说明,参考 文章

img

说明:

instance模块:

MySQL 的 Binlog

MySQL 的 Binlog(Binary Log)是 MySQL 数据库用于记录所有更改数据操作的日志文件,包括数据库的插入、更新、删除等操作。Binlog 主要用于数据库的增量备份、数据恢复、主从复制等场景。

Binlog 的作用:

MySQL 提供了三种主要的 Binlog 格式:

Binlog 的模式

可以通过以下命令查看 Binlog 文件的内容:

mysqlbinlog binlog_file_name

Binlog 的管理:

安装和配置

配置 mysql

通过 docker-compose 安装 MySQL,在 MySQL 中开启 Binlog,用于记录数据库的增量变化。

chensoul-mysql:
image: mysql:8
restart: always
container_name: chensoul-mysql
command: [
'mysqld',
'--log-bin=mysql-bin',
'--binlog_format=ROW',
'--default-time-zone=+8:00',
'--lower-case-table-names=1'
]
environment:
MYSQL_ROOT_HOST: "%"
MYSQL_ROOT_PASSWORD: root
healthcheck:
test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ]
interval: 5s
timeout: 2s
retries: 10
ports:
- "3306:3306

查看状态:

mysql -u root -p
> show variables like 'log_%';
> show variables like 'binlog_format';
> SHOW BINARY LOGS

创建 canal 用户

mysql -u root -p
> CREATE USER canal IDENTIFIED BY 'canal';
> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
> FLUSH PRIVILEGES;

安装 canal-server

通过 docker-compose 安装

chensoul-canal-server:
image: canal/canal-server
container_name: chensoul-canal-server
ports:
- "11111:11111"
environment:
- canal.auto.scan=false
- canal.instance.mysql.slaveId=123
- canal.instance.master.address=chensoul-mysql:3306
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.filter.regex=.*\\..*
- canal.destinations=test
- canal.instance.tsdb.enable=false
- canal.instance.connectionCharset=UTF-8

注意:

安装 canal-admin

在 mysql 数据库里创建 canal_manager ,并执行 sql 语句

wget https://raw.githubusercontent.com/alibaba/canal/refs/heads/master/admin/admin-web/src/main/resources/canal_manager.sql;
mysql -u root -p
> create database canal_manager;
> GRANT ALL PRIVILEGES ON canal_manager TO 'canal'@'%';
> FLUSH PRIVILEGES;
> use canal_manager;
> source canal_manager.sql;

通过 docker-compose 安装 canal-admin:

chensoul-canal-admin:
image: canal/canal-admin
container_name: chensoul-canal-admin
ports:
- "8089:8089"
environment:
- canal.adminUser=admin
- canal.adminPasswd=123456
- spring.datasource.url=jdbc:mysql://chensoul-mysql:3306/canal_manager?autoReconnect=true&useUnicode=true
- spring.datasource.username=canal
- spring.datasource.password=canal

客户端配置

配置 CanalConnector:

@Configuration
@EnableScheduling
@EnableAsync
public class CanalConfig {
@Value("${canal.server.ip}")
private String canalServerIp;
@Value("${canal.server.port}")
private int canalServerPort;
@Value("${canal.server.username:canal}")
private String username;
@Value("${canal.server.password:canal}")
private String password;
@Value("${canal.destinations:test}")
private String destination;
@Bean("newSingleConnector")
public CanalConnector newSingleConnector() {
return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,
canalServerPort), destination, username, password);
}
}

注意:

定时获取 canal 数据:

@Service
@Slf4j
public class CanalRunner {
int BATCH_SIZE = 5 * 1024;
@Resource
private CanalConnector connector;
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
//开启/关闭事务的实体类型,跳过
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
if (rowChage.getIsDdl()) {
log.info("binlog: {}:{}, table: {}.{}, eventType: {}, ddlSql: {}", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, rowChage.getSql());
} else {
log.info("binlog: {}:{}, table: {}.{}, eventType: {}", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType);
}
//获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
log.info("eventType:{}", eventType);
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
printColumn(rowData.getBeforeColumnsList());
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + ": " + column.getValue() + ",updated=" + column.getUpdated());
}
}
@PostConstruct
public void connect() {
connector.connect();
connector.subscribe(); // 监听所有的表结构
connector.rollback();
}
@Async
@Scheduled(initialDelayString = "${canal.scheduled.initialDelay:2000}", fixedDelayString = "${canal.scheduled.fixedDelay:2000}")
public void processData() {
try {
if (!connector.checkValid()) {
log.warn("与Canal服务器的连接失效!!!重连,下个周期再检查数据变更");
this.connect();
} else {
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
log.info("本次[{}]没有检测到数据更新。", batchId);
} else {
log.info("本次[{}]数据共有[{}]次更新需要处理", batchId, size);
printEntry(message.getEntries());
connector.ack(batchId); / 提交确认
log.info("本次[{}]处理Canal同步数据完成", batchId);
}
}
} catch (Exception e) {
log.error("处理Canal同步数据失效,请检查:", e);
}
}
@PreDestroy
public void disConnect() {
connector.disconnect();
}
}

说明:

客户端适配器

Canal 的 ClientAdapter 是一个用于数据同步的组件,它提供了多种数据源的适配功能,支持将数据从 MySQL 同步到其他数据存储系统,如关系型数据库、HBase、ElasticSearch 等。

ClientAdapter 的下载链接在 github release 页面,比如 https://github.com/alibaba/canal/releases/tag/canal-1.1.8,目前官方没有提供 docker 镜像。

ClientAdapter 功能:

Canal 集群模式

Canal 集群模式是一种高可用的部署方式,用于确保在单个实例失效时服务仍能持续运行。

高可用性机制:

img

canal server HA 步骤:

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制。

以下是一个典型的 Canal 集群配置示例:

canal.zkServers = 192.168.56.121:2181
canal.destinations = test
canal.auto.scan = true

这个配置指定了 Zookeeper 服务器地址、实例名称,并启用了自动扫描功能。

客户端连接 canal server 时,可以使用下面 api:

// 创建带cluster模式的客户端链接,自动完成failover切换
CanalConnector newClusterConnector(List<? extends SocketAddress> addresses, String destination,String username, String password)
// 创建带cluster模式的客户端链接,自动完成failover切换,服务器列表自动扫描
CanalConnector newClusterConnector(String zkServers, String destination, String username, String password)

常见问题

Canal 如何保证顺序性?

Canal 通过以下机制保证顺序性:

Canal Server 机制:

Canal Client 机制:

与消息队列的结合:

数据同步机制:


使用 Docker 安装 Gitea
常见分布式 ID 解决方案