一、部署环境
1.基础环境:

软件版本作用
LinuxCentos7.1,8g
Jdk1.8.0_151
canal1.1.1canal server端,与mysql和zookeeper交互
Zookeeper3.4.5作为canal server端和client的一个代理者,canal1.1.1中依赖的zookeeper的版本为3.4.5

2.机器环境:canal client服务器2台,canal server服务器2台(资源有限,zookeeper和canal server 部署在一起)

分类IP安装软件
canal-serverX.X.X.50:2100 X.X.X.54:2100canal-server
zookeeperX.X.X.50:2181 X.X.X.54:2181zookeeper(正常的zookeeper服务要求安装奇数个,因为zookeper的leader选举,要求可用节点数量 > 总节点数量/2。由于资源有限,本文仅启动了2个的zookeeper服务,这样其实对高可用产生了影响,因为如果zookeeper中的任意一台服务挂掉,也会造成整个canal服务的不可用 )
canal-clientX.X.X.X:8999 X.X.X.X:8999业务模块作为canal的client端)

二、安装canal-server端

  1. 软件下载
    链接:canal.deployer-1.1.1.tar.gz
  2. 解压文件到指定目录
    /usr/local/etc
  3. 修改canal.properties配置文件(系统配置文件)
canal.id= 2101 #每个canal server实例的唯一标识,暂无实际意义,默认:1
canal.ip= #canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务,默认:无
canal.port= 2100 #canal server提供socket服务的端口,默认:11111
canal.zkServers= X.X.X.X:2181,X.X.X.X:2181 #canal server链接zookeeper集群的链接信息
# flush data to zk
canal.zookeeper.flush.period = 1000 #canal持久化数据到zookeeper上的更新频率,单位毫秒
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp    #canal server端的模式,可选
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
# binlog filter config
canal.instance.filter.druid.ddl = true 
canal.instance.filter.query.dcl = true #是否忽略DCL的query语句,比如grant/create user等,默认false
canal.instance.filter.query.dml = true #是否忽略DML的query语句,比如insert/update/delete table.(mysql5.6ROW模式可以包含statement模式的query记录),默认false
canal.instance.filter.query.ddl = true #是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index. (目前支持的ddl类型主要为table级别的操作,create databases/trigger/procedure暂时划分为dcl类型),默认false

  1. 修改instance.properties配置文件(instance级别的配置文件)
 canal.instance.mysql.slaveId=0 #mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一,默认:1234
# position info
canal.instance.master.address=X.X.X.X:3306 #mysql主库链接地址,默认:127.0.0.1:3306
canal.instance.master.journal.name= #mysql主库链接时起始的binlog文件,默认:无
canal.instance.master.position= #mysql主库链接时起始的binlog偏移量,默认:无	
canal.instance.master.timestamp= #mysql主库链接时起始的binlog的时间戳,默认:无
# username/password
canal.instance.dbUsername=canal #mysql数据库帐号	
canal.instance.dbPassword=canal #mysql数据库密码	
canal.instance.connectionCharset = UTF-8 #数据解析编码
canal.instance.defaultDatabaseName =kuaiche #mysql链接时默认schema	
# table regex
canal.instance.filter.regex=kuaiche.bill_transport
# mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
# 常见例子:
# 1. 所有表:.* or .*\\..*
# 2. canal schema下所有表: canal\\..*
# 3. canal下的以canal打头的表:canal\\.canal.*
# 4. canal schema下的一张表:canal.test1
# 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
# table black regex
canal.instance.filter.black.regex= # 过滤黑名单:
  1. 启动命令
    sh bin/startup.sh

  1. 停止命令
    sh bin/stop.sh

  1. 查看启动日志
    canal/logs/canal/canal.log
2018-12-05 17:58:26.174 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2018-12-05 17:58:26.216 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2018-12-05 17:58:26.226 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2018-12-05 17:58:26.636 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[X.X.X.X:X]
2018-12-05 17:58:26.832 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

canal/logs/example/example.log


三、canal-client端
1.使用框架:spring-boot
2.依赖jar包

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.1</version>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.1</version>
    <optional>true</optional>
</dependency>
`注意jar包依赖的冲突,canal依赖的zookeeper版本为3.4.5`

4.canal数据结构
canal的数据传输有两块,一块是进行binlog订阅时,binlog转换为我们所定义的Message,第二块是client与server进行TCP交互时,传输的TCP协议。
Entry数据结构

Entry
    Header
        version         [协议的版本号,default = 1]
        logfileName     [binlog文件名]
        logfileOffset   [binlog position]
        serverId        [服务端serverId]
        serverenCode    [变更数据的编码]
        executeTime     [变更数据的执行时间]
        sourceType      [变更数据的来源,default = MYSQL]
        schemaName      [变更数据的schemaname]
        tableName       [变更数据的tablename]
        eventLength     [每个event的长度]
        eventType       [insert/update/delete类型,default = UPDATE]
        props           [预留扩展]
        gtid            [当前事务的gitd]
    entryType           [事务头BEGIN/事务尾END/数据ROWDATA/HEARTBEAT/GTIDLOG]
    storeValue          [byte数据,可展开,对应的类型为RowChange]    
RowChange
    tableId             [tableId,由数据库产生]
    eventType           [数据变更类型,default = UPDATE]
    isDdl               [标识是否是ddl语句,比如create table/drop table]
    sql                 [ddl/query的sql语句]
    rowDatas            [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
        beforeColumns   [字段信息,增量数据(修改前,删除前),Column类型的数组]
        afterColumns    [字段信息,增量数据(修改后,新增后),Column类型的数组] 
        props           [预留扩展]
    props               [预留扩展]
    ddlSchemaName       [ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName]
Column 
    index               [字段下标]      
    sqlType             [jdbc type]
    name                [字段名称(忽略大小写),在mysql中是没有的]
    isKey               [是否为主键]
    updated             [是否发生过变更]
    isNull              [值是否为null]
    props               [预留扩展]
    value               [字段值,timestamp,Datetime是一个时间格式的文本]
    length              [对应数据对象原始长度]
    mysqlType           [字段mysql类型]

4.canal功能含义

- 连接,connector.connect() 
- 订阅,connector.subscribe 
- 获取数据,connector.getWithoutAck() 
- 业务处理 
- 提交确认,connector.ack() 
- 回滚,connector.rollback() 
- 断开连接,connector.disconnect()

4.application.yml配置文件信息

canal:
  server:
    url:   #单机配置,local/dev/test环境使用
    port:   #单机配置,local/dev/test环境使用
    destination: example #环境共用参数
    username:
    password:
    subscribe: #需要监控库的mysql表
      bc_address_library,bc_contact,bc_contact_company,bc_customer,
      bill_entrust,bill_transport,
      consigner_info,fleet_info,
      fleet_vehicle_info,vehicle_info_temp,fleet_vehicle_mapping,
      bc_goods,queue_appoint_query,queue_appoint_record_log,
      queue_appoint_trucker,queue_appoint_vehicle,queue_warehouse_notice,
      queue_warehouse_notice_trucker_mapping,queue_warehouse_notice_vehicle_mapping
    refreshSeconds: 10 #单位:秒
    zkServers: X.X.X.X:2181,X.X.X.X:2181 # zookeeper HA高可用配置,forecast/prod环境使用
    dbname: kuaiche #数据库名称

5.构建连接实例

/**
     * 异常等待时间
     */
    private static final long EXCEPTION_SECONDS = 10;
    /**
     * 线程最长休眠时间
     */
    private static final long MAX_SLEEP_SECONDS = 30;

    @Value("${canal.server.zkServers}")
    private String zkServers;

    @Value("${canal.server.subscribe}")
    private String subscribe;

    @Value("${canal.server.destination}")
    private String destination;

    @Value("${canal.server.refreshSeconds}")
    private long refreshSeconds;

    @Value("${canal.server.dbname}")
    private String dbname;

    private final JsonUtility jsonUtility;

    @Autowired
    public CanalClusterClient(JsonUtility jsonUtility) {
        this.jsonUtility = jsonUtility;
    }

    private CanalConnector connector;
try {
            while (true) {
                //初始化连接,或连接失效时,连接canal server,每次获取数据时都检查,确保连接有效性
                if(null==connector || !connector.checkValid()){
                    try {
                        connector = CanalConnectors.newClusterConnector(zkServers, destination, "", "");
                        connector.connect();
                        connector.subscribe(subscribe);
                        log.debug(">>>> Connection canal server successful,zkServers:【{}】,subscribe:【{}】 <<<<<",zkServers,subscribe);
                    } catch (Exception e) {
                        log.debug(">>>> Connection canal server failed,zkServers:【{}】,subscribe:【{}】, exception:【{}】...try again after 10s <<<<<",zkServers,subscribe,e.getMessage());
                        Thread.sleep(EXCEPTION_SECONDS);
                        continue;
                    }
                }
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                if(null == message){
                    log.debug(">>>> Canal client connect zookeeper server is running, get Message is null! <<<<<");
                    Thread.sleep(EXCEPTION_SECONDS);
                    continue;
                }
                //刷新间隔时间不超过30s
                if(refreshSeconds>MAX_SLEEP_SECONDS){refreshSeconds=EXCEPTION_SECONDS;}
                //获取同步id
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    Thread.sleep(refreshSeconds*1000);
                    refreshSeconds++;
                    if(refreshSeconds<MAX_SLEEP_SECONDS){
                        log.debug(">>>> Canal client connect zookeeper server is running, get Message size is empty, ...try again after {}s! <<<<<",refreshSeconds);

                    }else if(refreshSeconds == MAX_SLEEP_SECONDS){
                        log.debug(">>>> Canal client connect zookeeper server【{}】,subscribe:【{}】is running, get Message size is empty, ...try again after {}s! <<<<<",zkServers,subscribe,refreshSeconds);
                    }
                } else {
                    try {
                        // 异步请求开始时间
                        long asyncBeginTime = System.currentTimeMillis();
                        jsonUtility.printEntry(message.getEntries(),batchId);
                        //结束时间
                        long asyncEndTime = System.currentTimeMillis();
                        log.debug(">>>>Canal clent batchId:{},Async method time-consuming:{}ms<<<<<" , batchId,asyncEndTime-asyncBeginTime);
                    } catch (Exception e) {
                        log.error(">>>> PrintEntry Exception:{} <<<<<" , e);
                    }
                }
                // 提交确认
                connector.ack(batchId);
            }
        } catch (Exception e) {
            log.error(">>>> get message from canal zooleeper server:【{}】,subscribe:【{}】error:【{}】 <<<<<",zkServers,subscribe,e.getMessage());
        }

四、安装zookeeper
1.安装详见另一篇:zookeeper
注意:canal中依赖的zookeeper的版本为3.4.5,下载时注意选择对应的版本号


五、HA模式测试
1.前提条件:

`以下服务都已正常启动`
- canal-server,X.X.X.50:2100X.X.X.54:2100
- canal-client,X.X.X.107:8999X.X.X.85:8999
- zookeeper,X.X.X.50:2181X.X.X.54:2181

2.状态检测

  • 连接任一zookeeper客户端
    ./zkCli.sh -server X.X.X.50:2181
  • 获取正在同步数据的canal server
    get /otter/canal/destinations/example/running
{"active":true,"address":"X.X.X.50:2100","cid":2101}
cZxid = 0x100028621
ctime = Wed Dec 05 17:56:01 CST 2018
mZxid = 0x100028621
mtime = Wed Dec 05 17:56:01 CST 2018
pZxid = 0x100028621
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x16772fcc6300017
dataLength = 57
numChildren = 0
  • 获取正在连接的canal client
    get /otter/canal/destinations/example/1001/running
{"active":true,"address":"X.X.X.X:15586","clientId":1001}
cZxid = 0x10002a72c
ctime = Wed Dec 26 17:13:25 CST 2018
mZxid = 0x10002a72e
mtime = Wed Dec 26 17:13:25 CST 2018
pZxid = 0x10002a72c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x16772fcc6300024
dataLength = 63
numChildren = 0
numChildren = 0

3.HA切换测试

  • a)在zookeeper客户端使用 get /otter/canal/destinations/example/running命令,获取到当前正在同步数据的canal server服务的ip地址,正常关闭canal server(会释放instance的所有资源,包括删除running节点);
  • b)根据上一步获取到的服务地址,登录对应的服务器使用sh bin/stop.sh命令停止该server服务;
  • c)重复步骤a,会看到另一台canal server成为正在运行的server端;
  • d)在zookeeper客户端使用 get /otter/canal/destinations/example/1001/running 命令,获取到当前正在同步数据的canal client服务的ip地址;
  • f)根据上一步获取到的服务地址,登录对应的服务器 kill该canal client的进程;
  • g)重复步骤d,会看到另一台canal client成为正在连接的client端;
    在切换期间,可以实时修改mysql数据库的数据,查看对应的canal客户端日志输出信息。

Logo

更多推荐