欢迎来到思维库

思维库

数据异构就该这么玩儿

时间:2025-11-04 00:11:04 出处:系统运维阅读(143)

互联网背景下的数据数据同步需求

在当今互联网行业,尤其是异构现在分布式、微服务开发环境下,该玩为了提高搜索效率,数据以及搜索的异构精准度,会大量使用Redis、该玩Memcached等NoSQL数据库,数据也会使用大量的异构Solr、Elasticsearch等全文检索服务。该玩那么,数据这个时候,异构就会有一个问题需要我们来思考和解决:那就是该玩数据同步的问题!如何将实时变化的数据数据库中的数据同步到Redis/Memcached或者Solr/Elasticsearch中呢?

例如,我们在分布式环境下向数据库中不断的异构写入数据,而我们读数据可能需要从Redis、该玩Memcached或者Elasticsearch、Solr等服务中读取。那么,数据库与各个服务中数据的实时同步问题,成为了我们亟待解决的问题。

试想,由于业务需要,我们引入了Redis、亿华云Memcached或者Elasticsearch、Solr等服务。使得我们的应用程序可能会从不同的服务中读取数据,如下图所示。

图片

本质上讲,无论我们引入了何种服务或者中间件,数据最终都是从我们的MySQL数据库中读取出来的。那么,问题来了,如何将MySQL中的数据实时同步到其他的服务或者中间件呢?

注意:为了更好的说明问题,后面的内容以MySQL数据库中的数据同步到Solr索引库为例进行说明。

数据同步解决方案

1.在业务代码中同步

在增加、修改、删除之后,执行操作Solr索引库的逻辑代码。例如下面的代码片段。

复制public ResponseResult updateStatus(Long[] ids, String status){ try{ goodsService.updateStatus(ids, status); if("status_success".equals(status)){ List<TbItem> itemList = goodsService.getItemList(ids, status); itemSearchService.importList(itemList); return new ResponseResult(true, "修改状态成功") } }catch(Exception e){ return new ResponseResult(false, "修改状态失败"); } }1.2.3.4.5.6.7.8.9.10.11.12.

优点:

操作简便。

缺点:

业务耦合度高。

执行效率变低。

2.定时任务同步

在数据库中执行完增加、修改、b2b供应网删除操作后,通过定时任务定时的将数据库的数据同步到Solr索引库中。

定时任务技术有:SpringTask,Quartz。

哈哈,还有我开源的mykit-delay框架,开源地址为:https://github.com/sunshinelyz/mykit-delay。

这里执行定时任务时,需要注意的一个技巧是:第一次执行定时任务时,从MySQL数据库中以时间字段进行倒序排列查询相应的数据,并记录当前查询数据的时间字段的最大值,以后每次执行定时任务查询数据的时候,只要按时间字段倒序查询数据表中的时间字段大于上次记录的时间值的数据,并且记录本次任务查询出的时间字段的最大值即可,从而不需要再次查询数据表中的所有数据。

注意:这里所说的时间字段指的是源码库标识数据更新的时间字段,也就是说,使用定时任务同步数据时,为了避免每次执行任务都会进行全表扫描,最好是在数据表中增加一个更新记录的时间字段。

优点:

同步Solr索引库的操作与业务代码完全解耦。

缺点:

数据的实时性并不高。

3.通过MQ实现同步

在数据库中执行完增加、修改、删除操作后,向MQ中发送一条消息,此时,同步程序作为MQ中的消费者,从消息队列中获取消息,然后执行同步Solr索引库的逻辑。

我们可以使用下图来简单的标识通过MQ实现数据同步的过程。

图片

我们可以使用如下代码实现这个过程。

复制public ResponseResult updateStatus(Long[] ids, String status){ try{ goodsService.updateStatus(ids, status); if("status_success".equals(status)){ List<TbItem> itemList = goodsService.getItemList(ids, status); final String jsonString = JSON.toJSONString(itemList); jmsTemplate.send(queueSolr, new MessageCreator(){ @Override public Message createMessage(Session session) throws JMSException{ return session.createTextMessage(jsonString); } }); } return new ResponseResult(true, "修改状态成功"); }catch(Exception e){ return new ResponseResult(false, "修改状态失败"); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.

优点:

业务代码解耦,并且能够做到准实时。

缺点:

需要在业务代码中加入发送消息到MQ的代码,数据调用接口耦合。

4.通过Canal实现实时同步

Canal是阿里巴巴开源的一款数据库日志增量解析组件,通过Canal来解析数据库的日志信息,来检测数据库中表结构和数据的变化,从而更新Solr索引库。

使用Canal可以做到业务代码完全解耦,API完全解耦,可以做到准实时。

Canal简介

阿里巴巴MySQL数据库binlog增量订阅与消费组件,基于数据库增量日志解析,提供增量数据订阅与消费,目前主要支持了MySQL。

Canal开源地址:https://github.com/alibaba/canal。

Canal工作原理

MySQL主从复制的实现

图片

从上图可以看出,主从复制主要分成三步:

Master节点将数据的改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看)。Slave节点将Master节点的二进制日志事件(binary log events)拷贝到它的中继日志(relay log)。Slave节点重做中继日志中的事件将改变反映到自己本身的数据库中。Canal内部原理

首先,我们来看下Canal的原理图,如下所示。

图片

原理大致描述如下:

Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL Slave ,向 MySQL Master 发送dump 协议MySQL Master 收到 dump 请求,开始推送 binary log 给 Slave (即 Canal )Canal 解析 binary log 对象(原始为 byte 流)Canal内部结构

图片

说明如下:

Server:代表一个Canal运行实例,对应一个JVM进程。Instance:对应一个数据队列(1个Server对应1个或者多个Instance)。

接下来,我们再来看下Instance下的子模块,如下所示。

图片

EventParser:数据源接入,模拟Slave协议和Master节点进行交互,协议解析。EventSink:EventParser和EventStore的连接器,对数据进行过滤、加工、归并和分发等处理。EventSore:数据存储。MetaManager:增量订阅和消费信息管理。

Canal环境准备

设置MySQL远程访问 复制grant all privileges on *.* to root@% identified by 123456; flush privileges;1.2. MySQL配置

注意:这里的MySQL是基于5.7版本进行说明的。

Canal的原理基于MySQL binlog技术,所以,要想使用Canal就要开启MySQL的binlog写入功能,建议配置binlog的模式为row。

可以在MySQL命令行输入如下命令来查看binlog的模式。

复制SHOW VARIABLES LIKE binlog_format;1.

执行效果如下所示。

图片

可以看到,在MySQL中默认的binlog格式为STATEMENT,这里我们需要将STATEMENT修改为ROW。修改/etc/my.cnf文件。

复制vim /etc/my.cnf1.

在[mysqld]下面新增如下三项配置。

复制log-bin=mysql-bin #开启MySQL二进制日志 binlog_format=ROW #将二进制日志的格式设置为ROW server_id=1 #server_id需要唯一,不能与Canal的slaveId重复1.2.3.

修改完my.cnf文件后,需要重启MySQL服务。

复制service mysqld restart1.

接下来,我们再次查看binlog模式。

复制SHOW VARIABLES LIKE binlog_format;1.

图片

可以看到,此时,MySQL的binlog模式已经被设置为ROW了。

MySQL创建用户授权

Canal的原理是模式自己为MySQL Slave,所以一定要设置MySQL Slave的相关权限。这里,需要创建一个主从同步的账户,并且赋予这个账户相关的权限。

复制CREATE USER canal@localhost IDENTIFIED BY canal; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@localhost; FLUSH PRIVILEGES;1.2.3.

图片

img

Canal部署安装

下载Canal

这里,我们以Canal 1.1.1版本进行说明,小伙伴们可以到链接 https://github.com/alibaba/canal/releases/tag/canal-1.1.1 下载Canal 1.1.1版本。

图片

img

上传解压

将下载好的Canal安装包,上传到服务器,并执行如下命令进行解压

复制mkdir -p /usr/local/canal tar -zxvf canal.deployer-1.1.1.tar.gz -C /usr/local/canal/1.2.

解压后的目录如下所示。

图片

各目录的说明如下:

bin:存储可执行脚本。conf:存放配置文件。lib:存放其他依赖或者第三方库。logs:存放的是日志文件。修改配置文件

在Canal的conf目录下有一个canal.properties文件,这个文件中配置的是Canal Server相关的配置,在这个文件中有如下一行配置。

复制canal.destinatinotallow=example1.

这里的example就相当于Canal的一个Instance,可以在这里配置多个Instance,多个Instance之间以逗号分隔即可。同时,这里的example也对应着Canal的conf目录下的一个文件夹。也就是说,Canal中的每个Instance实例都对应着conf目录下的一个子目录。

接下来,我们需要修改Canal的conf目录下的example目录的一个配置文件instance.properties。

复制vim instance.properties1.

修改如下配置项。

复制################################################################# ## canal slaveId,注意:不要与MySQL的server_id重复 canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName =canaldb canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = canaldb\\..* #################################################################1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.

选项含义:

canal.instance.mysql.slaveId : mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一;canal.instance.master.address: mysql主库链接地址;canal.instance.dbUsername : mysql数据库帐号;canal.instance.dbPassword : mysql数据库密码;canal.instance.defaultDatabaseName : mysql链接时默认数据库;canal.instance.connectionCharset : mysql 数据解析编码;canal.instance.filter.regex : mysql 数据解析关注的表,Perl正则表达式.启动Canal

配置完Canal后,就可以启动Canal了。进入到Canal的bin目录下,输入如下命令启动Canal。

复制./startup.sh1.

测试Canal

导入并修改源码

这里,我们使用Canal的源码进行测试,下载Canal的源码后,将其导入到IDEA中。

图片

接下来,我们找到example下的SimpleCanalClientTest类进行测试。这个类的源码如下所示。

复制package com.alibaba.otter.canal.example; import java.net.InetSocketAddress; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; /** * 单机模式的测试例子 * * @author jianghang 2013-4-15 下午04:19:20 * @version 1.0.4 */ public class SimpleCanalClientTest extends AbstractCanalClientTest { public SimpleCanalClientTest(String destination){ super(destination); } public static void main(String args[]) { // 根据ip,直接创建链接,无HA的功能 String destination = "example"; String ip = AddressUtils.getHostIp(); CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(ip, 11111), destination, "canal", "canal"); final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination); clientTest.setConnector(connector); clientTest.start(); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { logger.info("## stop the canal client"); clientTest.stop(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal:", e); } finally { logger.info("## canal client is down."); } } }); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.

可以看到,这个类中,使用的destination为example。在这个类中,我们只需要将IP地址修改为Canal Server的IP即可。

具体为:将如下一行代码。

复制String ip = AddressUtils.getHostIp();1.

修改为:

复制String ip = "192.168.175.100"1.

由于我们在配置Canal时,没有指定用户名和密码,所以,我们还需要将如下代码。

复制CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(ip, 11111), destination, "canal", "canal");1.2.3.4.5.

修改为:

复制CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(ip, 11111), destination, "", "");1.2.3.4.5.

修改完成后,运行main方法启动程序。

测试数据变更

接下来,在MySQL中创建一个canaldb数据库。

复制create database canaldb;1.

此时会在IDEA的命令行输出相关的日志信息。

复制

分享到:

温馨提示:以上内容和图片整理于网络,仅供参考,希望对您有帮助!如有侵权行为请联系删除!

友情链接: