082022.07

技术干货 | 数据中间件如何与GreatSQL数据同步?

2022.07.08
  • 1.引入2.传统方案介绍3.监控binlog实现"同步"更新4.总结


1.引入

先前介绍了ElasticSearch,以及ES配合MySQL的问题,这种方案是让ES上的数据根据MySQL的数据做对照从而形成对应的索引,再将数据通过处理和封装存放在ES当中。(可回顾:技术分析 | 浅析MySQL与ElasticSearch的组合使用

回到生产环境,比如,我们如何保证MySQL系开源数据库GreatSQL中与ES对照的数据,发生更新的时候ES也进行更新呢?下面以ES为例进行分析。


2.传统方案介绍


2.1直接的"同步"更新

第一种方式十分直接,当发生对GreatSQL数据的更新操作时,由服务器对GreatSQL和ES同时进行更新操作,如图:

图片

这种方式实现起来十分“简单粗暴”,容易理解,显然可以解决问题,但绝不是最优解,原因如下:

  • 首先,这种方法使得我们进行数据库的数据写入、修改、删除等操作,后面都要跟上ES的同步操作,代码书写也过于冗长,且大大加大了业务的耦合度;

  • 其次,这种方法不能很好解决“同步”的问题,如果在执行对应操作的时候发生了断电等情况,就可能导致数据不同步的问题;

  • 最后,为了保证两者的更新要么同时完成要么都不完成,需要开启事务来处理,系统的性能有所降低。并且,在高并发情况下,有可能造成服务的“雪崩”。


2.2异步的"同步"更新

针对前面的方案,可以考虑加入消息队列的中间件来优化,与第一种方法不同的是,当发生对GreatSQL数据的更新操作时,服务器会完成GreatSQL数据的更新,并通过MQ的队列设置好的交换机发送更新ES的消息,给对应的接收更新消息的队列,进而完成对应ES数据更新的实现。如图:

图片


这种方案将直接的更新方式转换为异步的更新方式,性能显然提高了,同时降低了业务耦合度,也优化了数据“同步”的问题。但是,这种方案会出现MQ的消费者在消费时可能因为网络等原因导致用户数据有延时。同时,从编码角度看,每次系统要进行同步时都要编写MQ代码,仍然存在业务的耦合,且系统架构的设计也因为加入新的中间件要重新考虑维护的问题。


3.监控binlog实现"同步"更新

上面两种方案中都存在硬编码问题,同时存在强的业务耦合,以至于实现GreatSQL数据更新后的数据同步问题的代价要么是植入ES更新代码,要么替换为MQ代码,代码的侵入性太强,且性能降低。因此可以通过监控GreatSQL的binlog来实现数据的同步。


3.1问题分析

binlog,该日志存在于Server层次中,是使用存储引擎都可以使用的日志模块,binlog是逻辑日志,记录的是这个语句的原始逻辑,比如“给test表id=5这一行的col1字段值加1”。binlog的日志文件是可以追加写入的。“追加写入”是指binlog日志文件写到一定大小后会切换到下一个文件进行写入,可以设置sync_binlog为1,让每次事务的binlog都持久化保存到磁盘中。binlog在ROW模式下会记录每次操作后每行记录的变化。虽然此模式下所占用的空间较大,但此模式可以保持数据的一致性。因此不管SQL是什么,引用了什么函数,他记录的是执行后的效果。


3.2使用Canal来监控binlog

Canal是阿里用Java开发的基于数据库增量的日志解析,是提供增量数据订阅&消费的中间件。目前,Canal主要支持了 MySQLbinlog 解析,解析后可利用 Canal Client 来处理获得的相关数据。

详细可参考:https://github.com/alibaba/canal/wiki


图片

Canal的实现原理基于MySQL主从复制进行设计:

  • Master主库将改变记录到逻辑日志(binary log)中(这些记录叫做逻辑日志事件,binary log events,可以通过show binlog events进行查看);

  • Slave从库将Master主库的binary log events拷贝到它的中继日志(relay log);

  • Slave从库读取从重做中继日志中的事件,将改变反映它自己的数据同步到数据库中。

图片

(源自canal官方文档)


而Canal就是将自身伪装成一个Slave从库,假装从Master主库复制数据:

  • Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议;

  • MySQL Master收到dump请求,开始推送binary log给Slave(也就是Canal);

  • Canal解析binary log对象(原始为byte流)。

(源自canal官方文档)


这种方案的好处是程序中没有代码侵入、没有硬编码。同时,原有系统不需要任何变化对原方案的高耦合进行了业务解耦,不需要关注原来系统的业务逻辑。

图片



对于自建 MySQL如GreatSQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:


[mysqld]
# 开启 binlog
log-bin=mysql-bin 
# 选择 ROW 模式
binlog-format=ROW 
# 指定开启binlog的数据库,不指定则全部数据库开启
binlog-do-db=databasename
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1


创建canal账户,授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限


CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;


下载并启动Canal

wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz


mkdir /tmp/canal
tar zxvf canal.deployer-1.1.2.tar.gz  -C /tmp/canal


修改Canal的配置文件


vi conf/example/instance.properties


## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*


Canal操作


# 启动
sh bin/startup.sh
# 查看server日志
vi logs/canal/canal.log

# 查看 instance 的日志 vi logs/example/example.log # 关闭 sh bin/stop.sh


以Java为例,创建测试项目Maven工程,导入应用开发场景:


                    com.alibaba.otter            canal.client            1.1.2                            org.apache.kafka            kafka-clients            2.4.1


编写日志监视类CanalClient来从日志中抓取信息,首先,获取canal的连接对象并连接:


//获取 canal 连接对象
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),"example", "canal","canal");
//连接
canalConnector.connect();


指定需要监控的数据库,并根据数据量来获取 Message :


//指定要监控的数据库
canalConnector.subscribe("databasename.*");
//获取 Message
Message message = canalConnector.get(100);


接着就可以通过处理 Message 来得到监控信息内容了:


Listentries = message.getEntries();
    if (entries.size() > 0) {
    for (CanalEntry.Entry entry : entries) {
        //获取表名
        String tableName = entry.getHeader().getTableName();
        //Entry 类型
        CanalEntry.EntryType entryType = entry.getEntryType();
        //判断 entryType 是否为 ROWDATA
        if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
            //序列化数据
            ByteString storeValue = entry.getStoreValue();
            //反序列化
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
            //获取事件类型
            CanalEntry.EventType eventType = rowChange.getEventType();
            //获取具体的数据
            List rowDatasList = rowChange.getRowDatasList();
            //遍历并打印数据
            for (CanalEntry.RowData rowData : rowDatasList) {
                List beforeColumnsList = rowData.getBeforeColumnsList();
                JSONObject beforeData = new JSONObject();
                for (CanalEntry.Column column : beforeColumnsList) {
                    beforeData.put(column.getName(), column.getValue());
                }
                JSONObject afterData = new JSONObject();
                List afterColumnsList = rowData.getAfterColumnsList();
                for (CanalEntry.Column column : afterColumnsList) {
                    afterData.put(column.getName(), column.getValue());
                }
                System.out.println("TableName:" + tableName
                        +
                        ",EventType:" + eventType +
                        ",Before:" + beforeData +
                        ",After:" + afterData);
            }
        }
    }
}


从代码中可以看出,当系统与Canal建立连接后可以获取Message来监控数据库的操作,Message是一次Canal从MySQL的 bin log 中抓取的信息,一个Message中可以有多个SQL执行的结果,每个SQL执行结果(SQL命令)称为Entry,如图:



Entry中包含 TableName 、EntryType和StoreValue,其中StoreValue 包含了数据变化的内容。如下:


要想进行使用还需要进行反序列化操作才可以进行使用,如下:



当然,实际生产环境Canal可以配置MQ模式,配合RocketMQ或者Kafka,canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理。首先需要修改canal.properties文件,这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的输出model,默认 tcp,改为输出到 kafka,instance.properties文件输出得到主题为kafka,可配置集群,再次启动canal就可以启动 Kafka 消费客户端测试,查看消费情况了。


4.总结

本文介绍了三种方式使得中间件的数据与GreatSQL的数据保存同步,前两种方法在使用性能和设计上都存在较大漏洞,而第三种通过读取GreatSQL的bin log日志,获取指定表的日志信息来实现数据同步的方法,在编码上看没有代码侵入,业务耦合度低,且原有系统不需要任何变化。但构建bin log监控系统需要做好规划,不过多赘述了。


EnjoyGreatSQL :)