简介
canal项目背景
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
基于日志增量订阅&消费支持的业务:
- 数据库镜像
- 数据库实时备份
- 多级索引 (卖家和买家各自分库索引)
- search build
- 业务cache刷新
- 价格变化等重要业务消息
canal定位
简单的说canal是基于数据库增量日志解析,提供增量数据订阅&消费。使用canal能做啥?可以用来做数据库备份、主从同步、数据变动监听等等。我刚使用canal是因为业务上有监听数据库数据变动的需求,所以使用了canal来实现。
工作原理
mysql主备复制实现
从上层来看,复制分成三步:
- master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
- slave将master的binary log events拷贝到它的中继日志(relay log);
- slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理
原理相对比较简单:
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
canal Server安装与配置
canal需要先安装服务端,安装服务端前先修改mysql配置,配置一下canal需要用的东西。
mysql配置
修改/etc/mysql/mysql.conf.d/下的mysql配置文件mysqld.cnf,在[mysqld]下加入以下配置:
# canal相关配置
log-bin=mysql-bin
# 选择row模式,总共有三种模式:基于SQL语句的复制(STATEMENT),基于行的复制(ROW),混合模式复制(MIXED)
binlog-format=ROW
# 配置mysql replaction需要定义,不能和canal的slaveId重复
server-id=1
之后重启mysql,在mysql中新建一个供canal使用的用户,并赋予相关权限:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
mysql相关设置完成后,可以查看以下是否生效:
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set, 1 warning (0.00 sec)
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set, 1 warning (0.00 sec)
mysql> show grants for '刚创建的供canal使用的用户名';
+---------------------------------------------------------------------------+
| Grants for canal@% |
+---------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%` |
+---------------------------------------------------------------------------+
1 row in set (0.00 sec)
canal安装
下载
mysql相关配置完成后,下载canal server安装包安装到服务器上,这里我是安装到ubuntu 18.04的服务器上,因为canal对Windows的支持并不是很好。
在canal的GitHub release上可以看到目前发布的所有版本,选择相应版本进行下载。
GitHub releases地址:https://github.com/alibaba/canal/releases
在写这篇文章时,canal的版本为v1.1.3,下载安装包:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
安装
下载完成后,解压相应安装包:
tar -zxvf canal.deployer-1.1.3.tar.gz
解压后,会在当前目录生成相应文件:
drwxrwxrwx 1 wujx wujx 512 Jun 4 15:29 conf
drwxrwxrwx 1 wujx wujx 512 Jun 4 15:29 lib
drwxrwxrwx 1 wujx wujx 512 Jun 4 15:35 logs
drwxrwxrwx 1 wujx wujx 512 Jun 4 16:31 bin
启动和停止脚本在bin目录里,如果要启动或停止canal,直接执行命令:
sh bin/startup.sh
sh bin/stop.sh
canal配置
canal的配置方式有两种:
- ManagercanalInstanceGenerator: 基于manager管理的配置方式,目前alibaba内部配置使用这种方式。大家可以实现canalConfigClient,连接各自的管理系统,即可完成接入。
- SpringcanalInstanceGenerator:基于本地spring xml的配置方式,目前开源版本已经自带该功能所有代码,建议使用。
这里介绍单机版canal使用spring配置的使用姿势。
canal.properties
canal.properties是系统根配置文件,即主配置文件,在canal.properties中配置canal.destinations,然后在创建相应的instance配置。
进入canal目录下的conf文件夹,编辑canal.properties,在canal.destinations配置项上加上新的实例名称,默认已有一个example,对应conf下的example文件夹里的实例配置,可以将里面的配置文件拷贝出来当模板,然后删除掉。
编辑canal.properties文件,新增实例:
canal.destinations = my-db
编辑canal.properties文件,修改canal连接IP和端口:
canal.ip = 127.0.0.1
canal.port = 11111
instance.properties
conf目录下创建跟上面新建的实例名称一样的文件夹:
mkdir my-db
直接将example目录里给的示例配置复制到新的实例文件夹中:
cp -R example/* my-db/
编辑my-db里的instance.properties,主要需要修改的地方是数据库的连接配置:
canal.instance.master.address=mysql主库连接地址
# 刚开始时在MySQL中配置的专门给canal使用的用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
除了数据库的连接配置,还有一处配置mysql解析关注的表的配置canal.instance.filter.regex,可以修改成自己需要关注的表的正则匹配规则,不需要关注解析所有的表。
canal.instance.filter.regex如何配置:
mysql 数据解析关注的表,Perl正则表达式。多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:
- 所有表:.* or .*\\..*
- canal schema下所有表: canal\\..*
- canal下的以canal打头的表:canal\\.canal.*
- canal schema下的一张表:canal\\.test1
- 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
具体的每项配置项的含义可以查看canal官方文档:canal配置文档地址
修改完相关配置之后,使用bin目录下的startup.sh启动canal server:
sh bin/startup.sh
spring boot 客户端使用
引入依赖
使用maven直接引入如下依赖包:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
关于依赖包的版本可以到canal GitHub的release里查看,我这里使用的是和我的canal server版本一致的jar包依赖。
调用示例
spring boot配置文件中添加相关配置:
canal:
config:
address: 127.0.0.1
port: 11111
username:
password:
destination: my-db
subscribe: test.user
batchSize: 1000
使用spring bootd的@ConfigurationProperties注解将配置读取到实体类:
@Component
@ConfigurationProperties(prefix = "canal.config")
@Data
public class canalConfigProperties {
private String address;
private Integer port;
private String username;
private String password;
private String destination;
private String subscribe;
private Integer batchSize;
}
新建canalService,也就是主要处理canal逻辑的类:
@Service
@Slf4j
public class canalService {
@Autowired
private canalConfigProperties canalConfigProperties;
@Async
public void start() {
log.debug("---------------canal启动---------------");
log.debug("canal address: {}", canalConfigProperties.getAddress());
log.debug("canal port: {}", canalConfigProperties.getPort());
log.debug("canal destination: {}", canalConfigProperties.getDestination());
log.debug("canal batchSize: {}", canalConfigProperties.getBatchSize());
log.debug("canal subscribe: {}", canalConfigProperties.getSubscribe());
//获取连接
canalConnector connector = canalConnectors.newSingleConnector(new InetSocketAddress(canalConfigProperties.getAddress(), canalConfigProperties.getPort()),
canalConfigProperties.getDestination(), canalConfigProperties.getUsername(), canalConfigProperties.getPassword());
try {
//连接canal
connector.connect();
//订阅表
connector.subscribe(canalConfigProperties.getSubscribe());
//先回滚最后一次请求,防止数据丢失
connector.rollback();
//一直循环获取读取事件
long number = 0;
while (true) {
number++;
log.debug("第:{}次读取数据", number);
//读取指定数量的数据
Message message = connector.getWithoutAck(canalConfigProperties.getBatchSize());
//获取读取到的消息ID和数据数量
long batchId = message.getId();
int size = message.getEntries().size();
//根据消息ID和读取到的数据量判断是否有新的事件
if (batchId == -1 || size == 0) {
//如果没有新事件则线程休眠1秒在进行下次数据读取
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//否则读取到数据进行数据处理
dataDispose(message.getEntries());
}
//确认本批次数据已读取
connector.ack(batchId);
}
} finally {
//如果停止监听,则释放订阅,因为一个canal instance只能有一个canal client订阅,所有停止需释放
connector.disconnect();
}
}
/**
* 对数据变动进行业务处理
*/
private void dataDispose(List<canalEntry.Entry> entrys) {
for (canalEntry.Entry entry : entrys) {
//如果是开始或结束事务的一些信息,则不需要对数据进行处理,直接结束本次循环
if (entry.getEntryType() == canalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == canalEntry.EntryType.TRANSACTIONEND) {
continue;
}
//解析每行变更的数据
canalEntry.RowChange rowChage;
try {
rowChage = canalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//判断是哪张表的数据,如果是user表的数据变动,则调用user表数据处理方法
if (entry.getHeader().getTableName().equals(TableType.ORDERS.getTableName())) {
userTableDataDispose(rowChage.getRowDatasList(), rowChage.getEventType());
}
}
}
/**
* user表数据变动处理
*/
private void userTableDataDispose(List<canalEntry.RowData> rowDatas, canalEntry.EventType eventType) {
for (canalEntry.RowData rowData : rowDatas) {
//如果是新增数据,则将一条完整的数据全部打印出来
if (eventType == canalEntry.EventType.INSERT) {
//将所有字段的名称和值放入map中
Map<String, Object> map = new HashMap<>();
for (canalEntry.Column column : rowData.getAfterColumnsList()) {
map.put(column.getName(), column.getValue());
}
log.debug("新的用户信息:{}", JSON.toJSONString(map));
} else if (eventType.equals(canalEntry.EventType.UPDATE)) {
//如果是更新用户操作,则判断是否更新了用户名称,若更新了用户名称,则将用户ID和名称打印出来
String userid = null;
String name = null;
boolean isStatusUpdate = false;
for (canalEntry.Column column : rowData.getAfterColumnsList()) {
if ("userid".equals(column.getName())) {
userid = column.getValue();
} else if ("name".equals(column.getName())) {
status = column.getValue();
isStatusUpdate = column.getUpdated();
}
}
if (isStatusUpdate) {
log.debug("更新的用户ID是:{},用户名是:{}", userid, name);
}
}
}
}
/**
* 要处理的数据表表名枚举
*/
enum TableType {
/**
* 用户表
*/
USER("user");
@Getter
private String tableName;
TableType(String tableName) {
this.tableName = tableName;
}
}
}
到这边,我们调用canalService里的start方法就可以启动canal客户端,去向canal服务端请求增量数据订阅和消费了,但是正常我们的canal服务应该是要随项目启动而启动,并一直执行,不是在某个地方手动去调用的,所以还要写一个初始化启动类,让canal能随着spring boot容器启动而启动。
新建InitService类,实现ApplicationRunner接口,重写run方法,实现这个接口会在spring boot容器初始化完成之后自动执行run方法中的代码逻辑:
@Service
public class InitService implements ApplicationRunner {
@Autowired
private canalService canalService;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
canalService.start();
}
}
通过这个方式,就可以让canalService在项目启动的时候直接执行,还有一点需要注意的是,在上面的canalService的start方法上,我加了@Async注解,也就是调用此方法时会新开一个线程去执行,如果不使用这种方式,那么canal将在spring boot主线程上执行,不利于后续我们对canal所在线程做一些其他的管理。使用@Async记得要在启动类上添加@EnableAsync注解。
数据格式
canal数据序列化采用了protobuff,取回来的数据格式具体如下,可根据自己需求将相应数据取出来使用:
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [发生的变更]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组]
afterColumns [Column类型的数组]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为文本]
对应的EntryProtocol.proto代码:
message Entry {
/**协议头部信息**/
Header header = 1;
///**打散后的事件类型**/ [default = ROWDATA]
oneof entryType_present {
EntryType entryType = 2;
}
/**传输的二进制数组**/
bytes storeValue = 3;
}
/**message Header**/
message Header {
/**协议的版本号**/ //[default = 1]
oneof version_present {
int32 version = 1;
}
/**binlog/redolog 文件名**/
string logfileName = 2;
/**binlog/redolog 文件的偏移位置**/
int64 logfileOffset = 3;
/**服务端serverId**/
int64 serverId = 4;
/** 变更数据的编码 **/
string serverenCode = 5;
/**变更数据的执行时间 **/
int64 executeTime = 6;
/** 变更数据的来源**/ //[default = MYSQL]
oneof sourceType_present {
Type sourceType = 7;
}
/** 变更数据的schemaname**/
string schemaName = 8;
/**变更数据的tablename**/
string tableName = 9;
/**每个event的长度**/
int64 eventLength = 10;
/**数据变更类型**/ // [default = UPDATE]
oneof eventType_present {
EventType eventType = 11;
}
/**预留扩展**/
repeated Pair props = 12;
/**当前事务的gitd**/
string gtid = 13;
}
/**每个字段的数据结构**/
message Column {
/**字段下标**/
int32 index = 1;
/**字段java中类型**/
int32 sqlType = 2;
/**字段名称(忽略大小写),在mysql中是没有的**/
string name = 3;
/**是否是主键**/
bool isKey = 4;
/**如果EventType=UPDATE,用于标识这个字段值是否有修改**/
bool updated = 5;
/** 标识是否为空 **/ //[default = false]
oneof isNull_present {
bool isNull = 6;
}
/**预留扩展**/
repeated Pair props = 7;
/** 字段值,timestamp,Datetime是一个时间格式的文本 **/
string value = 8;
/** 对应数据对象原始长度 **/
int32 length = 9;
/**字段mysql类型**/
string mysqlType = 10;
}
message RowData {
/** 字段信息,增量数据(修改前,删除前) **/
repeated Column beforeColumns = 1;
/** 字段信息,增量数据(修改后,新增后) **/
repeated Column afterColumns = 2;
/**预留扩展**/
repeated Pair props = 3;
}
/**message row 每行变更数据的数据结构**/
message RowChange {
/**tableId,由数据库产生**/
int64 tableId = 1;
/**数据变更类型**/ //[default = UPDATE]
oneof eventType_present {
EventType eventType = 2;
}
/** 标识是否是ddl语句 **/ // [default = false]
oneof isDdl_present {
bool isDdl = 10;
}
/** ddl/query的sql语句 **/
string sql = 11;
/** 一次数据库变更可能存在多行 **/
repeated RowData rowDatas = 12;
/**预留扩展**/
repeated Pair props = 13;
/** ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName **/
string ddlSchemaName = 14;
}
/**开始事务的一些信息**/
message TransactionBegin {
/**已废弃,请使用header里的executeTime**/
int64 executeTime = 1;
/**已废弃,Begin里不提供事务id**/
string transactionId = 2;
/**预留扩展**/
repeated Pair props = 3;
/**执行的thread Id**/
int64 threadId = 4;
}
/**结束事务的一些信息**/
message TransactionEnd {
/**已废弃,请使用header里的executeTime**/
int64 executeTime = 1;
/**事务号**/
string transactionId = 2;
/**预留扩展**/
repeated Pair props = 3;
}
/**预留扩展**/
message Pair {
string key = 1;
string value = 2;
}
/**打散后的事件类型,主要用于标识事务的开始,变更数据,结束**/
enum EntryType {
ENTRYTYPECOMPATIBLEPROTO2 = 0;
TRANSACTIONBEGIN = 1;
ROWDATA = 2;
TRANSACTIONEND = 3;
/** 心跳类型,内部使用,外部暂不可见,可忽略 **/
HEARTBEAT = 4;
GTIDLOG = 5;
}
/** 事件类型 **/
enum EventType {
EVENTTYPECOMPATIBLEPROTO2 = 0;
INSERT = 1;
UPDATE = 2;
DELETE = 3;
CREATE = 4;
ALTER = 5;
ERASE = 6;
QUERY = 7;
TRUNCATE = 8;
RENAME = 9;
/**CREATE INDEX**/
CINDEX = 10;
DINDEX = 11;
GTID = 12;
/** XA **/
XACOMMIT = 13;
XAROLLBACK = 14;
/** MASTER HEARTBEAT **/
MHEARTBEAT = 15;
}
/**数据库类型**/
enum Type {
TYPECOMPATIBLEPROTO2 = 0;
ORACLE = 1;
MYSQL = 2;
PGSQL = 3;
}
拓展
将cansl server注册成ubuntu服务
在上诉代码中,canal server的启动和停止都是使用canal/bin/里面的脚本进行的,不太方便,可以编写一个服务脚本来管理:
vim canal.sh
写入以下内容:
#!/bin/bash
### BEGIN INIT INFO
#
# Provides: canal-server
# Required-Start: $local_fs $remote_fs
# Required-Stop: $local_fs $remote_fs
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: initscript
# Description: This file should be used to construct scripts to be placed in /etc/init.d.
#
### END INIT INFO
## Fill in name of program here.
PROG="canal"
PROG_PATH="canal bin目录路径" ## Not need, but sometimes helpful (if $PROG resides in /opt for example).
PROG_ARGS=""
PID_PATH="canal.pid路径,默认是canal bin目录下,未修改直接填写与PROG_PATH一样的路径"
start() {
count=`ps -ef | grep canal | grep -v "grep" | wc -l`
echo "$count"
if [ "$count" -le "2" ]; then
rm -f "$PID_PATH/$PROG.pid"
fi
if [ -e "$PID_PATH/$PROG.pid" ]; then
## Program is running, exit with error.
echo "Error! $PROG is currently running!" 1>&2
exit 1
else
#调用启动脚本
sh /release/canal/bin/startup.sh
fi
}
stop() {
echo "begin stop"
if [ -e "$PID_PATH/$PROG.pid" ]; then
# 调用停止脚本
sh /release/canal/bin/stop.sh
rm -f "$PID_PATH/$PROG.pid"
echo "$PROG stopped"
else
## Program is not running, exit with error.
echo "Error! $PROG not started!" 1>&2
exit 1
fi
}
## Check to see if we are running as root first.
## Found at http://www.cyberciti.biz/tips/shell-root-user-check-script.html
if [ "$(id -u)" != "0" ]; then
echo "This script must be run as root" 1>&2
exit 1
fi
case "$1" in
start)
start
exit 0
;;
stop)
stop
exit 0
;;
reload|restart|force-reload)
stop
start
exit 0
;;
**)
echo "Usage: $0 {start|stop|reload}" 1>&2
exit 1
;;
esac
添加可执行权限:
chmod u+x canal.sh
软链到/etc/init.d目录下:
ln -s 上面编写的canal.sh目录/canal.sh /etc/init.d/canal
添加服务:
sudo update-rc.d canal defaults
之后就就可以使用service canal start|stop|restart来管理canal server了。