上面的问题其实是前段时间接到的一个真实的业务需求,将旧系统的历史数据通过离线文件的方式迁移到新的生产系统中。由于老板们已经敲定了新系统的上线时间,我只有一周的时间将历史数据导入生产系统。由于时间紧迫,数据量大,在设计过程中想到了一个解决方案:将文件拆分成多个线程,将拆分后的文件导入。首先,我们可以编写一个小程序,或者使用split命令split将大文件拆分成小文件。--将一个大文件拆分成几个小文件,每个10万行.假设程序在读取到一半突然崩溃,会直接丢失文件读取的进度,需要重新开始读取。文件拆分后,一旦读取到小文件,我们就可以将小文件移动到指定的文件夹中。这样即使应用崩溃重启,我们重新读取的时候,只需要读取剩下的文件即可。2、一个文件只能被一个应用程序读取,限制了导入速度。文件拆分后,我们可以使用多节点部署进行横向扩展。每个节点读取文件的一部分,这样导入速度可以提高一倍。多线程导入当我们拆分文件时,那么我们需要读取文件的内容并导入。之前拆分的时候,设置每个小文件包含10w行数据。因为担心一次性将10w条数据读入应用程序会导致堆内存占用过高,频繁“FullGC”,所以下面采用流式读取的方式逐行读取数据。当然,如果拆分后的文件很小,或者应用的堆内存设置很大,我们可以直接将文件加载到应用内存中进行处理。这个比较简单。逐行读取的代码如下:nextLine();convertToDB(line);}}上面的代码使用了commons-io中的LineIterator类,它使用BufferedReader来读取文件内容。它把它封装成迭代器的方式,这样我们就可以很方便的进行迭代和读取。如果目前使用的是JDK1.8,以上操作更简单。我们可以直接使用JDK原生类Files将文件转换成Stream读取。代码如下:Files.lines(Paths.get("文件路径"),Charset.defaultCharset()).forEach(line->{convertToDB(line);});事实上,仔细看看Files#lines的底层源代码。其实原理和上面的LineIterator类似,也是封装成了迭代器的方式。多线程引入的问题上面读取代码不难写,但是存在效率问题,主要是只有单线程导入,导入完上一行数据才能操作下一行.为了加快导入速度,我们再增加几个线程并发导入。对于多线程,我们自然会用到线程池的方式。相关代码转化如下:Filefile=...;ExecutorServiceexecutorService=newThreadPoolExecutor(5,10,60,TimeUnit.MINUTES,//文件个数,假设文件包含10W行newArrayBlockingQueue<>(10*10000),//guava提供newThreadFactoryBuilder().setNameFormat("test-%d").build());try(LineIteratoriterator=IOUtils.lineIterator(newFileInputStream(file),"UTF-8")){while(iterator.hasNext()){Stringline=iterator.nextLine();executorService.submit(()->{convertToDB(行);});}}在上面的代码中,每读到一行内容,就会直接交给线程池执行。我们知道线程池的原理是这样的:如果核心线程数未满,则直接创建线程执行任务。如果核心线程数已满,任务将被排队。如果队列已满,将创建另一个线程来执行任务。如果最大线程数已满且队列已满,则将执行拒绝策略。线程池执行流程图由于上面线程池我们设置的核心线程数为5,很快就会达到最大核心线程数,后续任务只能加入队列。为了防止后续任务被线程池拒绝,我们可以采用如下方案:将队列容量设大,包括整个文件的所有行,将最大线程数设大,它大于整个文件中所有行的数量。同样存在问题,第一种相当于把文件的内容全部加载到内存中,会占用太多内存。第二个创建了太多的线程,也占用了太多的内存。一旦内存过多,GC无法清理,就可能造成频繁的“FullGC”,甚至导致“OOM”,导致程序导入速度过慢。当然,我们也可以采用第三种方案,结合前两种方案,设置合适的队列长度和合适的最大线程数。但是,“适合”的程度真的很难把握,还是存在**“OOM”**的问题。所以为了解决这个问题,我日思夜想了两种方案:CountDownLatch批量执行扩展线程池CountDownLatch和批量执行JDK提供的CountDownLatch,让主线程等待子线程执行继续之前要完成的线程。接下来执行。利用这个特性,我们可以修改多线程导入的代码。主要逻辑如下:try(LineIteratoriterator=IOUtils.lineIterator(newFileInputStream(file),"UTF-8")){//存储每个任务执行的行数Listlines=Lists.newArrayList();//存放异步任务Listtasks=Lists.newArrayList();while(iterator.hasNext()){Stringline=iterator.nextLine();lines.add(line);//设置执行的行数每个线程if(lines.size()==1000){//创建一个新的异步任务,注意需要创建一个Listtasks.add(newConvertTask(Lists.newArrayList(lines)));lines.clear();}if(tasks.size()==10){asyncBatchExecuteTask(tasks);}}//文件读取结束,但可能还有内容tasks.add(newConvertTask(Lists.newArrayList(lines)));//最后执行asyncBatchExecuteTask(tasks);}这段代码中,每个异步任务会导入1000行数据,累积10个异步任务后,会调用asyncBatchExecuteTask线程池异步执行。/***批量执行任务**@paramtasks*/privatestaticvoidasyncBatchExecuteTask(Listtasks)throwsInterruptedException{CountDownLatchcountDownLatch=newCountDownLatch=newCountDownLatch(tasks.size());for(ConvertTasktask:tasks){task.setCountDownLatch(countDownLatch);执行服务。submit(task);}//主线程等待异步线程countDownLatch执行完毕,然后在主线程中调用await方法等待所有异步线程执行结束。ConvertTask异步任务逻辑如下:/***异步任务*数据导入完成后,必须调用countDownLatch.countDown()*否则会阻塞主线程,*/privatestaticclassConvertTaskimplementsRunnable{privateCountDownLatchcountDownLatch;privateList行;publicConvertTask(Listlines){this.lines=lines;}publicvoidsetCountDownLatch(CountDownLatchcountDownLatch){this.countDownLatch=countDownLatch;}@Overridepublicvoidrun(){try{for(Stringline:lines){convertToDB(line);}}finally{countDownLatch.countDown();}}}ConvertTask任务类的逻辑很简单。它遍历所有行并将它们导入数据库。当所有数据导入完成后,调用countDownLatch#countDown。一旦所有异步线程执行完毕,调用CountDownLatch#countDown,主线程就会被唤醒,继续执行文件读取。这种方法虽然解决了上述问题,但是在这种方法中,每次都需要积累一定数量的任务,然后才开始异步执行所有任务。另外,需要等待所有任务执行完毕,才能开始下一批任务。批处理执行消耗的时间等于最慢的异步任务消耗的时间。这样线程池中的线程就有了一定的空闲时间,那么有没有办法一直压榨线程池,让它一直工作呢?扩展线程池回到最初的问题,文件的读取和导入其实是一种“生产者-消费者”的消费模型。主线程作为生产者,不断读取文件并放入队列。作为消费者,异步线程不断从队列中读取内容,导入到数据库中。“一旦队列满了,生产者就应该阻塞,直到消费者消费完任务。”其实我们使用线程池也是一种“生产者-消费者”的消费模型,同样使用了阻塞队列。那为什么线程池在队列满的时候不阻塞呢?这是因为线程池内部使用了offer方法。该方法在队列满时“不阻塞”,直接返回。那有什么办法可以让我们在线程池队列满的时候阻塞主线程去添加任务呢?事实上,这是可能的。我们自定义线程池拒绝策略,当队列满时调用BlockingQueue.put来阻塞生产者。RejectedExecutionHandlerrejectedExecutionHandler=newRejectedExecutionHandler(){@OverridepublicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){if(!executor.isShutdown()){try{executor.getQueue().put(r);}catch(InterruptedException}}{}/s;所以一次线程池满了,主线程就会被阻塞,使用这个方法后,我们就可以直接使用上面提到的多线程导入代码了。ExecutorServiceexecutorService=newThreadPoolExecutor(5,10,60,TimeUnit.MINUTES,newArrayBlockingQueue<>(100),newThreadFactoryBuilder().setNameFormat("test-%d").build(),(r,executor)->{if(!executor.isShutdown()){try{//主线程会被阻塞executor.getQueue().put(r);}catch(InterruptedExceptione){//shouldnotbeinterrupted}}});Filefile=newFile("文件路径");try(LineIteratoriterator=IOUtils.lineIterator(newFileInputStream(file),"UTF-8")){while(iterator.hasNext()){Stringline=iterator.nextLine();executorService.submit(()->convertToDB(line)));}}总结一个非常大的文件,我们可以将文件拆分成多个文件,然后部署多个应用来提高阅读速度。另外,在读取过程中,我们也可以使用多线程并发导入,但需要注意的是,线程池加载满后,后续任务会被拒绝。我们可以扩展线程池,自定义拒绝策略,阻塞主线程读取。好了,今天的文章就这些了。不知道大家有没有其他更好的解决办法。