当前位置: 首页 > 科技观察

干货分享:利用Java多线程技术向Elasticsearch导入数据

时间:2023-03-22 16:06:35 科技观察

前言最近接到一个任务,需要修改现有的MTE(mysqlToEs)工具,用于从mysql向Elasticsearch导入数据,因为之前单线程导入,1000亿数据需要两周左右才能完成导入,导入效率很低。于是楼主花了3天时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提升了十多倍(合理调整线程数据,效率为更高)。关键技术栈ElasticsearchjdbcExecutorService\Threadsql工具说明maven依赖mysqlmysql-connector-java${mysql.version}org.elasticsearchelasticsearch${elasticsearch.version}org.elasticsearch.客户端transport${elasticsearch.version}org.projectlomboklombok${lombok.version}com.alibabafastjson${fastjson.version}java线程池默认线程池大小为21,可以调整,其中POR为处理数据的线程池,ROR为处理数据的线程池。privatestaticintTHREADS=21;publicstaticExecutorServicePOR=Executors.newFixedThreadPool(THREADS);publicstaticExecutorServiceROR=Executors.newFixedThreadPool(THREADS);定义生产者线程/读取生产者线程:ZlPendProducer/ZlReadProducerpublicclassZlPendProducerimplements{@System.println(threadName+"::start...");for(intj=0;jcount){//函数是size***如果没有100条数据,newList剩余的几条会补几条size=count-i;}Stringsql="select*from"+tableName+"limit"+i+","+尺寸;System.out.println(tableName+"::sql::"+sql);rs=statement.executeQuery(sql);Listlst=newArrayList<>();while(rs.next()){HistPendingEntityp=PendUtils.getHistPendingEntity(rs);lst.add(p);}MteExecutor.POR.submit(newZlPendConsumer(lst));Thread.sleep(2000);}....}catch(Exceptione){e.printStackTrace();}}}publicclassZlReadProducerimplementsRunnable{...读生产者的处理逻辑与运行生产者相同}定义运行消费者线程/读生产者线程:ZlPendConsumer/ZlReadConsumerpublicclassZlPendConsumerimplementsRunnable{privateStringthreadName;privateListlst;publicZlPendConsumer(Listlst){this.lst=lst;}@Overridepublicvoidrun(){...lst.forEach(v->{try{Stringjson=newGson().toJson(v);EsClient.addDataInJSON(json,Const.ES.HistPendDB_Index,Const.ES.HistPendDB_type,v.getPendingId(),null);Const.COUNTER.LD_P.incrementAndGet();}catch(Exceptione){e.printStackTrace();System.out.println("err::PendingId::"+v.getPendingId());}});...}}publicclassZlReadConsumerimplementsRunnable{//读取消费者的处理逻辑与已处理消费者相同}定义导入Elasticsearch数据监控线程:Monitor监控线程-Monitor用于计算每分钟导入Elasticsearch数据的总数,使用监控线程,可以调整线程池中线程数的大小,以便使用多线程更快的导入数据publicvoidmonitorToES(){newThread(()->{while(true){StringBuildersb=newStringBuilder();sb.append("已完成表数::").append(Const.TBL.TBL_PEND_COUNT).append("::完成总数::").append(Const.COUNTER.LD_P_TOTAL).append("::入库总数::").append(Const.COUNTER.LD_P);sb.append("~~~~读表数::").append(Const.TBL.TBL_READ_COUNT);sb.append("::读表总数::").append(Const.COUNTER.LD_R_TOTAL).append("::读入库总数::").append(Const.COUNTER.LD_R);if(ldPrevPendCount==0&&ldPrevReadCount==0){ldPrevPendCount=Const.COUNTER.LD_P.get();ldPrevReadCount=Const.COUNTER.LD_R.get();start=System.currentTimeMillis();}else{longend=System.currentTimeMillis();if((end-start)/1000>=60){start=end;sb.append("\n##########################################\n");sb.append("每分钟TPS::"+(Const.COUNTER.LD_P.get()-ldPrevPendCount)+"articles");sb.append("::每分钟TPS::"+(Const.COUNTER.LD_R.get()-ldPrevReadCount)+"栏");ldPrevPendCount=Const.COUNTER.LD_P.get();ldPrevReadCount=Const.COUNTER.LD_R.get();}}System.out.println(sb.toString());try{Thread.sleep(3000);}catch(InterruptedExceptione){e.printStackTrace();}}}).start();}初始化Elasticsearch:EsClientStringcName=meta.get("cName");//es集群名称StringesNodes=meta.get("esNodes");//esclusteripnodeSettingssSetting=Settings.builder().put("cluster.name",cName).put("client.transport.sniff",true)//添加嗅探机制寻找EScluster.put("thread_pool.search.size",5)//增加线程池数量,暂时设置为5.build();String[]nodes=esNodes.split(",");client=newPreBuiltTransportClient(esSetting);for(Stringnode:nodes){if(node.length()>0){String[]hostPort=node.split(":");client.addTransportAddress(newTransportAddress(InetAddress.getByName(hostPort[0]),Integer.parseInt(hostPort[1])));}}初始化数据库连接conn=DriverManager.getConnection(url,user,password);启动参数nohupjava-jarmte.jarES-Cluster2019node1:9300,node2:9300,node3:9300root123456!jdbc:mysql://ip:3306/mte130130>>./mte.log2>&1&参数说明ES-Cluster2019为Elasticsearch集群名node1:9300,node2:9300,node3:9300为es的节点IP130,130已经是处理读取子表的数据程序的入口:MteMain//监控线程MonitormonitorService=newMonitor();monitorService.monitorToES();//处理生产者线程ThreadpendProducerThread=newThread(newZlPendProducer(conn,"ZlPendProducer"));挂起生产者线程。start();//读取生产者线程ThreadreadProducerThread=newThread(newZlReadProducer(conn,"ZlReadProducer"));readProducerThread.start();