当前位置: 首页 > 科技观察

如何设计实时同步百万商品数据的秒级搜索系统?

时间:2023-03-21 13:54:24 科技观察

前段时间老板布置了一个新任务,为商家搭建商品搜索系统,可以为用户提供快速准确的搜索能力。图片来自Pexels。设计要求用户在输入搜索内容时,必须能够从商家名称和产品名称两个维度进行搜索。搜索结果按照准确率排序,数据结构按照业务的产品之间的关系进行组合。同时提供API,供业务系统调用。背景很简单,但现实却相当复杂!我们面临以下问题:商户数据库和商品数据库是多个不同的服务器,数据量达到百万级别。如何实现跨数据库的数据同步?商家和商品的数据是有隶属关系的,不然肯德基的麻辣鸡腿堡就贴在麦当劳了,尴尬!商家商品数据经常更新,比如修改价格、库存、上架等,所以搜索服务搜索不到一堆过时的数据。如果客户明明搜索出来的商品点进去后就被下架了,那么客户就会投诉!如何实现搜索数据与源库增删改查的实时同步?带着以上三个问题,我们开始了搜索服务的整体架构设计。系统架构设计思路为了设计出合适的系统架构,我们分析了目前的情况:首先,商户数据和商品数据存储在两个独立的MySQL8数据库中。为了满足商户数据和产品数据的关联,我们需要将所需表的两个实时ETL合并到我们的搜索系统数据库中。其次,数据从商户商品数据库ETL传输到搜索系统数据库后,需要实时组合成商户相关的商品数据结构,并以父子格式存储在ES中文档。最后,商户和商品数据库的增删改查操作需要实时同步到ES,即ES中的数据需要支持实时增删改查。为此,我们设计了两个Canal组件。第一个Canal实现数据ETL,从商户和产品数据库中提取某些表和字段到搜索服务数据库。然后使用第二个Canal读取搜索服务MySQL数据库的Binlog,实时传输到Kafka消息队列,然后使用canaladapter进行数据关联,映射父子文档等,以及将处理后的数据存储在ElasticSearch中。具体系统架构设计如下图:商户商品搜索系统架构设计项目实战环境及软件说明操作系统:CentOS7canal:canal.adapter-1.1.4,canal.deployer-1.1.4kafka:kafka_2.12-2.3.0ElasticSearch:elasticsearch-6.3.2kibana:kibana-6.3.2使用Canal实现数据ETL到MySQL8这一步是使用Canal从两个独立的MySQL8数据库中提取需要的表到搜索服务的MySQL数据库中。①安装canaldeployer,解压canal.deployer-1.1.4.tar.gz,配置canaldeployer。进入canaldeployer/conf目录,修改canal.properties文件,主要配置三个部分:serverMode、MQ和destination。首先,我们将serverMode改为Kafka模式,增加系统缓冲能力,提高系统稳定性:serverMode接下来配置Kafka的MQ信息(Kafka请自行安装):KafkaMQ信息最后配置需要实例化的实例,这里有3个configurations,也就是说canaldeploy会启动这3个实例,同步MySQL的Binlog到Kafka的Topic。如下图:destinationsinstanceconfiguration配置canaldeployerinstance:进入canaldeployer/conf/example目录,找到一个instance.properties文件,这是Canal给的一个例子,我们可以参考它的配置。我们复制整个示例目录并将其重命名为上一步中配置的目标之一,例如xxxsearch。进入xxxsearch目录,编辑instance.properties文件,主要配置源数据库信息,需要的数据表和字段,指定Kafka的topic名称。这样源库的Binlog会被转换成Json数据,通过canaldeployer实时传输到kafkatopic中。如下图:Canaldeploy实例源数据库配置canaldeploy实例kafka主题配置进入canaldeployer/bin目录,执行./startup.sh,启动canaldeployer及其实例。至此,canaldeployer已经搭建完成。②安装canal.adapter,我们需要使用canal.adapter将binlogjson数据在MySQL8清洗转换后存放在KafkaTopic中。由于Canal原生不支持MySQL8,我们需要做一些调整。添加MySQL8连接驱动:解压canal.adapter-1.1.4.tar.gz,进入canaladapter/lib目录,去掉mysql-connector-java-5.1.40.jar,导入mysql-connector-java-8.0.18.jar.配置canaladapter向MySQL8输出数据:进入canaladapter/conf目录,编辑application.yml文件,主要配置消费Kafka、源数据库信息、搜索系统数据库信息。如下图:ETL转MySQL8配置接下来进入canaladapter/conf/rdb目录,以官方mytest_user.yml为例,配置KafkaTopic名称、源数据库名称、源数据表名称、目标数据库名称和目标数据表名,建议一张表对应一个yml文件。ETL表结构映射配置启动canaladapter:进入canaladapter/bin目录,执行./startup.sh,启动canaladapter,观察logs/adapter/adapter.log日志文件,在搜索系统数据库中手动添加一条记录,并看看会不会打印如果下面的log中有2条记录,一条INFO,一条DEBUG,说明配置成功。Canaladapterlog至此,数据ETL阶段搭建完成,可以将数据从两个不同的MySQL8数据库实时同步到搜索服务的MySQL数据库中。实现数据多表关联和父子文档映射①配置第二个Canal的canaladapter进入canaladapter/conf目录,编辑application.yml文件,主要配置消费Kafka,搜索系统数据库,ES连接信息。如下图:canaladapterMQ和MySQL配置canaladapterES配置②配置多表关联进入canaladapter/conf/es目录,vimmytest_user.yml,编辑多表关联配置:多表关联配置注意sql支持自由组合多表关联,但是有一定的限制:主表不能是子查询语句。只能使用leftouterjoin,即最左边的表必须是主表。如果关联的从表是子查询,则不能有多个表。主sql中不能有where查询条件(从表的子查询中有where条件,但不推荐,可能会造成数据同步不一致,比如在where条件中修改字段内容).关联条件只允许主键和外键的'='操作,不允许其他常量判断如:ona.role_id=b.idandb.statues=1。关联条件必须有一个出现在主查询语句中的字段,例如:ona.role_id=b.id其中a.role_id或b.id必须出现在主查询语句中。ElasticSearch的mapping属性会对应SQL的查询值(不支持select*)。例如:selecta.id作为_id,a.name,a.email作为_emailfromuser,其中name会映射到es映射的name字段,而_email会映射到mapping的_email字段,这里是一个别名(如果有别名)作为最终的映射字段。这里的_id可以填写在配置文件的_id:_id映射中。③配置父子文档以官方biz_order.yml为例,vimbiz_order.yml,配置父子文档映射:配置父子文档映射④在ElasticSearch6中,建立索引与父子文档的映射关系,输入Kibana页面,点击DevTools,执行以下命令,即可创建索引和父子文档映射:createanindexandparent-childdocumentmapping。其中,ES6和Kibana的安装这里没有特别配置,不再赘述。⑤启动canaladapter进入canaladapter/bin目录,执行./startup.sh,启动canaladapter,观察logs/adapter/adapter.log日志文件,在搜索系统数据库中手动添加一条记录,看是否会打印如下日志,如Printing表示配置成功。正确配置适配器日志示例运行结果现在,我们可以使用Kibana执行DSL语句查询查看。我们预先在商家系统中添加了一个“肯德基”店铺,然后在商品系统中添加了两个商品“西红柿”和“新鲜西红柿”,并将商品与“肯德基”关联起来。然后我们查询“KFC”或者“tomato”,得到如下查询结果(去掉ES默认字段):通过DSL查询的结果如图所示。我们可以通过商家名称查询商品,也可以查询店铺和商品,而Canal支持数据的实时增删改查,所以ES数据也会和商户系统和商品系统保持一致,数据结构包括商户和相应的商品以满足业务需求。综上所述,基于Canal、Kafka、MySQL8、ElasticSearch6技术的商户商品搜索系统基本框架搭建完成。我们使用canaldeployer实时读取商户和产品系统的MySQL数据库Binlog,并发送给Kafka。然后canaladapter消费Kafka,对binlogjson数据进行多表关联和父子文档映射,最后存储到ES6中,供上层搜索服务调用。搜索服务系统终于成功上线,为公司百万级商户的产品提供实时数据同步,秒级展示搜索结果,满足业务需求。老板说,给研发组每人加一个鸡腿!想想就有点小激动,嘿嘿!