当前位置: 首页 > 后端技术 > Node.js

基于nodejs实时同步mongodb数据到elasticsearch

时间:2023-04-03 19:31:53 Node.js

1.前言因为公司需要用elasticsearch做全文搜索,所以用mongodb做持久化存储,但是希望mongodb里面的数据变化可以实时同步到elasticsearch。一开始主要使用elasticsearchv1.7.2的版本,mongo-river可以解决这个问题。随着elasticsearch的升级,发现elasticsearch放弃了mongo-river。好乱。。。谷歌搜到神器mongo-connector,国外高手用python写的工具,MongoDB官网也很推荐。但是,我们需要将文档中的附件信息同步到elasticsearch,mongo-connector对附件同步的支持不是很好。能不能有nodejs版本的数据同步?在github上找到了一个高手写的node-elasticsearch-sync,很好用,但是功能太简单了,而且不支持复杂的数据过滤,也不支持附件同步。活人不能被尿憋死。参考node-elasticsearch-sync,写一个同步工具node-mongodb-es-connector。2、准备工作2.1安装mongodb要安装mongodb,可以到官网下载:https://www.mongodb.com/PS:关于如何搭建mongodb副本集群:https://www.cnblogs。com/ljhdo...2.2安装elasticsearch安装elasticsearch可以到官网下载:https://www.elastic.co/cn/dow...PS:elasticsearch-head的安装请自行google,kibana,logstash等2.3安装nodejs安装nodejs官网下载:http://nodejs.cn/PS:别忘了安装npm,请自行google如何安装。以上是使用node-mongodb-es-connector的前提2.4node-mongodb-es-connector下载地址github:https://github.com/zhr8521007...npm:https://www.npmjs.com/package...3.文件结构├──crawlerDataConfig    项目构建配置(这里添加你要同步数据的配置)│├──mycarts.json  一索引一配置文件(唯一的配置文件)需要自己添加或修改,此文件只是一个例子,如果不用可以删除)│└──……├──lib  │├──pool││├──elasticsearchPool.js  elasticsearch连接池││├──mongoDBPool.js      mongodb连接池│├──promise││├──elasticsearchPromise.jselasticsearch方法类(增删改查)││├──mongoPromise.js      mongodb方法类(增删改查)│├──util││├──fsWatcher.js        配置文件监控类(主要监控crawlerDataConfig目录下的配置文件)││├──logger.js          day日志类││├──oplogFactory.js      mongo-oplog触发事件触发后的执行方法(增删改查)││├──tail.js           监控mongodb数据是否变化 ││├─util.js工具类│├──main.js主要方法(主要是第一次启动后立即将mongodb中的数据同步到elasticsearch)├──logs│├──logger-2018-03-23.log同步数据打印日志│└──......├──test│├──img││├──elasticsearch.jpg  图片不解释││├──mongoDB.jpg        图片不解释││└──structure.jpg      图片不解释│└──test.js      测试类(没写)├──app.js       启动文件               ├──index.js      接口文件(仅用于增删改查配置文件)├──package-lock.json├──package.json├──ReadMe.md      英文文档(markdown)├──README.zh-CN.md  中文文档(markdown)└──LICENSEmycarts.json文件(此文件仅提供示例){"mongodb":{"m_database":"myTest","m_collectionname":“购物车”,“m_filterfilds”:{“版本”:“2.0”},“m_returnfilds”:{“cName”:1,“cPrice”:1,“cImgSrc”:1},“m_connection”:{“m_servers”“:["localhost:29031","localhost:29032","localhost:29033"],"m_authentication":{"username":"UserAdmin","password":"pass1234","authsource":"admin","replicaset":"my_replica","ssl":false}},"m_documentsinbatch":5000,"m_delaytime":1000},"elasticsearch":{"e_index":"mycarts","e_type":"carts","e_connection":{"e_server":"http://localhost:9200","e_httpauth":{"username":"EsAdmin","password":"pass1234"}},"e_pipeline":"mypipeline","e_iscontainattachment":true}}m_database-MongoDB中需要监控的数据库。m_collectionname-MongoDB中需要监控的集合。m_filterfilds-MongoDB中的查询条件,目前支持一些简单的查询条件。(默认值为空)m_returnfilds-MongoDB需要返回字段。(默认值为空)m_connectionm_servers-MongoDB服务器的地址。(副本结构,数组格式)m_authentication-如果需要MongoDB登录验证,使用如下配置(默认值为null)。-username-MongoDB连接的用户名。-密码-MongoDB连接的密码。-authsource-MongoDB用户认证,默认为admin。-replicaset-MongoDB副本结构的名称。-ssl-MongoDB的ssl。(默认值为假)。m_documentsinbatch-一次从mongodb导入Elasticsearch的数据条数。(你可以设置一个比较大的值,默认是1000)。m_delaytime-每次进入elasticsearch数据的间隔时间(默认值为1000ms)。e_index-ElasticSearch中的索引。e_type-ElasticSearch中的类型,这里的type主要是为了使用bulk.e_connectione_server-ElasticSearch连接字符串。e_httpauth-如果ElasticSearch需要登录认证使用下面的配置(默认值为null)。username-ElasticSearch连接用户名.password-ElasticSearch连接密码.e_pipeline-ElasticSearch管道名称。(如果没有管道,则填空)e_iscontainattachment-管道是否包含附件规则(默认值为false)。4、使用方法用户可以提前在/crawlerDataConfig目录下编辑自己的配置文件,文件必须以json格式存放。在文件根目录下,打开cmd命令窗口,输入如下信息:nodeapp.js项目启动后,修改配置文件(如:mycarts.json),数据会实时同步或者mongodb中的一条数据会被实时修改同步到elasticsearch。PS:如何同步mongodb的主文档和附件信息到elasticsearch?使用elasticsearch的pipeline来实现。首先,您需要在elasticsearch中创建一个管道:PUT_ingest/pipeline/mypipeline{"description":"Extractattachmentinformationfromarrays","processors":[{"foreach":{"field":"attachments","processor":{"attachment":{"target_field":"_ingest._value.attachment","field":"_ingest._value.data"}}}}]}然后修改配置文件中的节点(eg:mycarts.json)数据可以是"e_pipeline":"mypipeline"5.结果显示mongodb中的数据和elasticsearch中的数据