简介

canal项目背景

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

基于日志增量订阅&消费支持的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

canal定位

简单的说canal是基于数据库增量日志解析,提供增量数据订阅&消费。使用canal能做啥?可以用来做数据库备份、主从同步、数据变动监听等等。我刚使用canal是因为业务上有监听数据库数据变动的需求,所以使用了canal来实现。

工作原理

mysql主备复制实现

从上层来看,复制分成三步:

  1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  2. slave将master的binary log events拷贝到它的中继日志(relay log);
  3. slave重做中继日志中的事件,将改变反映它自己的数据。

canal的工作原理

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

canal Server安装与配置

canal需要先安装服务端,安装服务端前先修改mysql配置,配置一下canal需要用的东西。

mysql配置

修改/etc/mysql/mysql.conf.d/下的mysql配置文件mysqld.cnf,在[mysqld]下加入以下配置:

# canal相关配置
log-bin=mysql-bin
# 选择row模式,总共有三种模式:基于SQL语句的复制(STATEMENT),基于行的复制(ROW),混合模式复制(MIXED)
binlog-format=ROW 
# 配置mysql replaction需要定义,不能和canal的slaveId重复
server-id=1

之后重启mysql,在mysql中新建一个供canal使用的用户,并赋予相关权限:

CREATE USER canal IDENTIFIED BY 'canal';  

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

mysql相关设置完成后,可以查看以下是否生效:

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set, 1 warning (0.00 sec)
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set, 1 warning (0.00 sec)
mysql> show grants for '刚创建的供canal使用的用户名';
+---------------------------------------------------------------------------+
| Grants for canal@%                                                        |
+---------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%` |
+---------------------------------------------------------------------------+
1 row in set (0.00 sec)

canal安装

下载

mysql相关配置完成后,下载canal server安装包安装到服务器上,这里我是安装到ubuntu 18.04的服务器上,因为canal对Windows的支持并不是很好。

在canal的GitHub release上可以看到目前发布的所有版本,选择相应版本进行下载。

GitHub releases地址:https://github.com/alibaba/canal/releases

在写这篇文章时,canal的版本为v1.1.3,下载安装包:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

安装

下载完成后,解压相应安装包:

tar -zxvf canal.deployer-1.1.3.tar.gz

解压后,会在当前目录生成相应文件:

drwxrwxrwx 1 wujx wujx      512 Jun  4 15:29 conf
drwxrwxrwx 1 wujx wujx      512 Jun  4 15:29 lib
drwxrwxrwx 1 wujx wujx      512 Jun  4 15:35 logs
drwxrwxrwx 1 wujx wujx      512 Jun  4 16:31 bin

启动和停止脚本在bin目录里,如果要启动或停止canal,直接执行命令:

sh bin/startup.sh 
sh bin/stop.sh

canal配置

canal的配置方式有两种:

  1. ManagercanalInstanceGenerator: 基于manager管理的配置方式,目前alibaba内部配置使用这种方式。大家可以实现canalConfigClient,连接各自的管理系统,即可完成接入。
  2. SpringcanalInstanceGenerator:基于本地spring xml的配置方式,目前开源版本已经自带该功能所有代码,建议使用。

这里介绍单机版canal使用spring配置的使用姿势。

canal.properties

canal.properties是系统根配置文件,即主配置文件,在canal.properties中配置canal.destinations,然后在创建相应的instance配置。

进入canal目录下的conf文件夹,编辑canal.properties,在canal.destinations配置项上加上新的实例名称,默认已有一个example,对应conf下的example文件夹里的实例配置,可以将里面的配置文件拷贝出来当模板,然后删除掉。

编辑canal.properties文件,新增实例:

canal.destinations = my-db

编辑canal.properties文件,修改canal连接IP和端口:

canal.ip = 127.0.0.1
canal.port = 11111

instance.properties

conf目录下创建跟上面新建的实例名称一样的文件夹:

mkdir my-db

直接将example目录里给的示例配置复制到新的实例文件夹中:

cp -R example/* my-db/

编辑my-db里的instance.properties,主要需要修改的地方是数据库的连接配置:

canal.instance.master.address=mysql主库连接地址
# 刚开始时在MySQL中配置的专门给canal使用的用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

除了数据库的连接配置,还有一处配置mysql解析关注的表的配置canal.instance.filter.regex,可以修改成自己需要关注的表的正则匹配规则,不需要关注解析所有的表。

canal.instance.filter.regex如何配置:

mysql 数据解析关注的表,Perl正则表达式。多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)

常见例子:

  1. 所有表:.* or .*\\..*
  2. canal schema下所有表: canal\\..*
  3. canal下的以canal打头的表:canal\\.canal.*
  4. canal schema下的一张表:canal\\.test1
  5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

具体的每项配置项的含义可以查看canal官方文档:canal配置文档地址

修改完相关配置之后,使用bin目录下的startup.sh启动canal server:

sh bin/startup.sh

spring boot 客户端使用

引入依赖

使用maven直接引入如下依赖包:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.3</version>
</dependency>

关于依赖包的版本可以到canal GitHub的release里查看,我这里使用的是和我的canal server版本一致的jar包依赖。

调用示例

spring boot配置文件中添加相关配置:

canal:
  config:
    address: 127.0.0.1
    port: 11111
    username:
    password:
    destination: my-db
    subscribe: test.user
    batchSize: 1000

使用spring bootd的@ConfigurationProperties注解将配置读取到实体类:

@Component
@ConfigurationProperties(prefix = "canal.config")
@Data
public class canalConfigProperties {

    private String address;
    private Integer port;
    private String username;
    private String password;
    private String destination;
    private String subscribe;
    private Integer batchSize;

}

新建canalService,也就是主要处理canal逻辑的类:

@Service
@Slf4j
public class canalService {

    @Autowired
    private canalConfigProperties canalConfigProperties;

    @Async
    public void start() {
        log.debug("---------------canal启动---------------");
        log.debug("canal  address:      {}", canalConfigProperties.getAddress());
        log.debug("canal  port:         {}", canalConfigProperties.getPort());
        log.debug("canal  destination:  {}", canalConfigProperties.getDestination());
        log.debug("canal  batchSize:    {}", canalConfigProperties.getBatchSize());
        log.debug("canal  subscribe:    {}", canalConfigProperties.getSubscribe());
        //获取连接
        canalConnector connector = canalConnectors.newSingleConnector(new InetSocketAddress(canalConfigProperties.getAddress(), canalConfigProperties.getPort()),
                canalConfigProperties.getDestination(), canalConfigProperties.getUsername(), canalConfigProperties.getPassword());
        try {
            //连接canal
            connector.connect();
            //订阅表
            connector.subscribe(canalConfigProperties.getSubscribe());
            //先回滚最后一次请求,防止数据丢失
            connector.rollback();
            //一直循环获取读取事件
            long number = 0;
            while (true) {
                number++;
                log.debug("第:{}次读取数据", number);
                //读取指定数量的数据
                Message message = connector.getWithoutAck(canalConfigProperties.getBatchSize());
                //获取读取到的消息ID和数据数量
                long batchId = message.getId();
                int size = message.getEntries().size();
                //根据消息ID和读取到的数据量判断是否有新的事件
                if (batchId == -1 || size == 0) {
                    //如果没有新事件则线程休眠1秒在进行下次数据读取
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //否则读取到数据进行数据处理
                    dataDispose(message.getEntries());
                }
                //确认本批次数据已读取
                connector.ack(batchId);
            }
        } finally {
            //如果停止监听,则释放订阅,因为一个canal instance只能有一个canal client订阅,所有停止需释放
            connector.disconnect();
        }
    }

    /**
     * 对数据变动进行业务处理
     */
    private void dataDispose(List<canalEntry.Entry> entrys) {
        for (canalEntry.Entry entry : entrys) {
            //如果是开始或结束事务的一些信息,则不需要对数据进行处理,直接结束本次循环
            if (entry.getEntryType() == canalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == canalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            //解析每行变更的数据
            canalEntry.RowChange rowChage;
            try {
                rowChage = canalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //判断是哪张表的数据,如果是user表的数据变动,则调用user表数据处理方法
            if (entry.getHeader().getTableName().equals(TableType.ORDERS.getTableName())) {
                userTableDataDispose(rowChage.getRowDatasList(), rowChage.getEventType());
            }
        }
    }

    /**
     * user表数据变动处理
     */
    private void userTableDataDispose(List<canalEntry.RowData> rowDatas, canalEntry.EventType eventType) {
        for (canalEntry.RowData rowData : rowDatas) {
            //如果是新增数据,则将一条完整的数据全部打印出来
            if (eventType == canalEntry.EventType.INSERT) {
                //将所有字段的名称和值放入map中
                Map<String, Object> map = new HashMap<>();
                for (canalEntry.Column column : rowData.getAfterColumnsList()) {
                    map.put(column.getName(), column.getValue());
                }
                log.debug("新的用户信息:{}", JSON.toJSONString(map));
            } else if (eventType.equals(canalEntry.EventType.UPDATE)) {
                //如果是更新用户操作,则判断是否更新了用户名称,若更新了用户名称,则将用户ID和名称打印出来
                String userid = null;
                String name = null;
                boolean isStatusUpdate = false;
                for (canalEntry.Column column : rowData.getAfterColumnsList()) {
                    if ("userid".equals(column.getName())) {
                        userid = column.getValue();
                    } else if ("name".equals(column.getName())) {
                        status = column.getValue();
                        isStatusUpdate = column.getUpdated();
                    }
                }
                if (isStatusUpdate) {
                    log.debug("更新的用户ID是:{},用户名是:{}", userid, name);
                }
            }
        }
    }

    /**
     * 要处理的数据表表名枚举
     */
    enum TableType {
        /**
         * 用户表
         */
        USER("user");

        @Getter
        private String tableName;

        TableType(String tableName) {
            this.tableName = tableName;
        }

    }

}

到这边,我们调用canalService里的start方法就可以启动canal客户端,去向canal服务端请求增量数据订阅和消费了,但是正常我们的canal服务应该是要随项目启动而启动,并一直执行,不是在某个地方手动去调用的,所以还要写一个初始化启动类,让canal能随着spring boot容器启动而启动。

新建InitService类,实现ApplicationRunner接口,重写run方法,实现这个接口会在spring boot容器初始化完成之后自动执行run方法中的代码逻辑:

@Service
public class InitService implements ApplicationRunner {

    @Autowired
    private canalService canalService;

    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        canalService.start();
    }
}

通过这个方式,就可以让canalService在项目启动的时候直接执行,还有一点需要注意的是,在上面的canalService的start方法上,我加了@Async注解,也就是调用此方法时会新开一个线程去执行,如果不使用这种方式,那么canal将在spring boot主线程上执行,不利于后续我们对canal所在线程做一些其他的管理。使用@Async记得要在启动类上添加@EnableAsync注解。

数据格式

canal数据序列化采用了protobuff,取回来的数据格式具体如下,可根据自己需求将相应数据取出来使用:

Entry
	Header
		logfileName [binlog文件名]
		logfileOffset [binlog position]
		executeTime [发生的变更]
		schemaName 
		tableName
		eventType [insert/update/delete类型]
	entryType 	[事务头BEGIN/事务尾END/数据ROWDATA]
	storeValue 	[byte数据,可展开,对应的类型为RowChange]	
RowChange
	isDdl		[是否是ddl变更操作,比如create table/drop table]
	sql		[具体的ddl sql]
	rowDatas	[具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
		beforeColumns [Column类型的数组]
		afterColumns [Column类型的数组]		
Column 
	index		
	sqlType		[jdbc type]
	name		[column name]
	isKey		[是否为主键]
	updated		[是否发生过变更]
	isNull		[值是否为null]
	value		[具体的内容,注意为文本]

对应的EntryProtocol.proto代码:

message Entry {
    /**协议头部信息**/
    Header header = 1;
    ///**打散后的事件类型**/ [default = ROWDATA]
    oneof entryType_present {
        EntryType entryType = 2;
    }

    /**传输的二进制数组**/
    bytes storeValue = 3;
}

/**message Header**/
message Header {
    /**协议的版本号**/ //[default = 1]
    oneof version_present {
        int32 version = 1;
    }

    /**binlog/redolog 文件名**/
    string logfileName = 2;

    /**binlog/redolog 文件的偏移位置**/
    int64 logfileOffset = 3;

    /**服务端serverId**/
    int64 serverId = 4;

    /** 变更数据的编码 **/
    string serverenCode = 5;

    /**变更数据的执行时间 **/
    int64 executeTime = 6;

    /** 变更数据的来源**/ //[default = MYSQL]
    oneof sourceType_present {
        Type sourceType = 7;
    }

    /** 变更数据的schemaname**/
    string schemaName = 8;

    /**变更数据的tablename**/
    string tableName = 9;

    /**每个event的长度**/
    int64 eventLength = 10;

    /**数据变更类型**/ // [default = UPDATE]
    oneof eventType_present {
        EventType eventType = 11;
    }

    /**预留扩展**/
    repeated Pair props = 12;

    /**当前事务的gitd**/
    string gtid = 13;
}

/**每个字段的数据结构**/
message Column {
    /**字段下标**/
    int32 index = 1;

    /**字段java中类型**/
    int32 sqlType = 2;

    /**字段名称(忽略大小写),在mysql中是没有的**/
    string name = 3;

    /**是否是主键**/
    bool isKey = 4;

    /**如果EventType=UPDATE,用于标识这个字段值是否有修改**/
    bool updated = 5;

    /** 标识是否为空  **/ //[default = false]
    oneof isNull_present {
        bool isNull = 6;
    }

    /**预留扩展**/
    repeated Pair props = 7;

    /** 字段值,timestamp,Datetime是一个时间格式的文本 **/
    string value = 8;

    /** 对应数据对象原始长度 **/
    int32 length = 9;

    /**字段mysql类型**/
    string mysqlType = 10;
}

message RowData {

    /** 字段信息,增量数据(修改前,删除前) **/
    repeated Column beforeColumns = 1;

    /** 字段信息,增量数据(修改后,新增后)  **/
    repeated Column afterColumns = 2;

    /**预留扩展**/
    repeated Pair props = 3;
}

/**message row 每行变更数据的数据结构**/
message RowChange {

    /**tableId,由数据库产生**/
    int64 tableId = 1;

    /**数据变更类型**/ //[default = UPDATE]
    oneof eventType_present {
        EventType eventType = 2;
    }

    /** 标识是否是ddl语句  **/ // [default = false]
    oneof isDdl_present {
        bool isDdl = 10;
    }

    /** ddl/query的sql语句  **/
    string sql = 11;

    /** 一次数据库变更可能存在多行  **/
    repeated RowData rowDatas = 12;

    /**预留扩展**/
    repeated Pair props = 13;

    /** ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName  **/
    string ddlSchemaName = 14;
}

/**开始事务的一些信息**/
message TransactionBegin {

    /**已废弃,请使用header里的executeTime**/
    int64 executeTime = 1;

    /**已废弃,Begin里不提供事务id**/
    string transactionId = 2;

    /**预留扩展**/
    repeated Pair props = 3;

    /**执行的thread Id**/
    int64 threadId = 4;
}

/**结束事务的一些信息**/
message TransactionEnd {

    /**已废弃,请使用header里的executeTime**/
    int64 executeTime = 1;

    /**事务号**/
    string transactionId = 2;

    /**预留扩展**/
    repeated Pair props = 3;
}

/**预留扩展**/
message Pair {
    string key = 1;
    string value = 2;
}

/**打散后的事件类型,主要用于标识事务的开始,变更数据,结束**/
enum EntryType {
    ENTRYTYPECOMPATIBLEPROTO2 = 0;
    TRANSACTIONBEGIN = 1;
    ROWDATA = 2;
    TRANSACTIONEND = 3;
    /** 心跳类型,内部使用,外部暂不可见,可忽略 **/
    HEARTBEAT = 4;
    GTIDLOG = 5;
}

/** 事件类型 **/
enum EventType {
    EVENTTYPECOMPATIBLEPROTO2 = 0;
    INSERT = 1;
    UPDATE = 2;
    DELETE = 3;
    CREATE = 4;
    ALTER = 5;
    ERASE = 6;
    QUERY = 7;
    TRUNCATE = 8;
    RENAME = 9;
    /**CREATE INDEX**/
    CINDEX = 10;
    DINDEX = 11;
    GTID = 12;
    /** XA **/
    XACOMMIT = 13;
    XAROLLBACK = 14;
    /** MASTER HEARTBEAT **/
    MHEARTBEAT = 15;
}

/**数据库类型**/
enum Type {
    TYPECOMPATIBLEPROTO2 = 0;
    ORACLE = 1;
    MYSQL = 2;
    PGSQL = 3;
}

拓展

将cansl server注册成ubuntu服务

在上诉代码中,canal server的启动和停止都是使用canal/bin/里面的脚本进行的,不太方便,可以编写一个服务脚本来管理:

vim canal.sh

写入以下内容:

#!/bin/bash
### BEGIN INIT INFO
#
# Provides:  canal-server
# Required-Start:   $local_fs  $remote_fs
# Required-Stop:    $local_fs  $remote_fs
# Default-Start:    2 3 4 5
# Default-Stop:     0 1 6
# Short-Description:    initscript
# Description:  This file should be used to construct scripts to be placed in /etc/init.d.
#
### END INIT INFO

## Fill in name of program here.
PROG="canal"
PROG_PATH="canal bin目录路径" ## Not need, but sometimes helpful (if $PROG resides in /opt for example).
PROG_ARGS=""
PID_PATH="canal.pid路径,默认是canal bin目录下,未修改直接填写与PROG_PATH一样的路径"

start() {
    count=`ps -ef | grep canal | grep -v "grep" | wc -l`
    echo "$count"
    if [ "$count" -le "2" ]; then
        rm -f  "$PID_PATH/$PROG.pid"
    fi
    if [ -e "$PID_PATH/$PROG.pid" ]; then
        ## Program is running, exit with error.
        echo "Error! $PROG is currently running!" 1>&2
        exit 1
    else
        #调用启动脚本
        sh /release/canal/bin/startup.sh
    fi
}

stop() {
    echo "begin stop"
    if [ -e "$PID_PATH/$PROG.pid" ]; then
        # 调用停止脚本
        sh /release/canal/bin/stop.sh
        rm -f  "$PID_PATH/$PROG.pid"
        echo "$PROG stopped"
    else
        ## Program is not running, exit with error.
        echo "Error! $PROG not started!" 1>&2
        exit 1
    fi
}

## Check to see if we are running as root first.
## Found at http://www.cyberciti.biz/tips/shell-root-user-check-script.html
if [ "$(id -u)" != "0" ]; then
    echo "This script must be run as root" 1>&2
    exit 1
fi

case "$1" in
    start)
        start
        exit 0
    ;;
    stop)
        stop
        exit 0
    ;;
    reload|restart|force-reload)
        stop
        start
        exit 0
    ;;
    **)
        echo "Usage: $0 {start|stop|reload}" 1>&2
        exit 1
    ;;
esac

添加可执行权限:

chmod u+x canal.sh 

软链到/etc/init.d目录下:

ln -s 上面编写的canal.sh目录/canal.sh /etc/init.d/canal

添加服务:

sudo update-rc.d canal defaults

之后就就可以使用service canal start|stop|restart来管理canal server了。