当前位置: 首页 > 科技观察

高效读取大文件,再也不用担心OOM了!

时间:2023-03-23 11:45:36 科技观察

最近阿粉接到一个请求,需要从文件中读取数据,业务处理后存入数据库。说实话,这个要求并不难,阿芬很快就完成了第一个版本。在第一版内存读取中,阿芬采用的是内存读取的方式,所有的数据都是先读入内存。程序代码如下:Stopwatchstopwatch=Stopwatch.createStarted();//读取所有行的内存Listlines=FileUtils.readLines(newFile("temp/test.txt"),Charset.defaultCharset());for(Stringline:lines){//pass}stopwatch.stop();System.out。println("readalllinesspend"+stopwatch.elapsed(TimeUnit.SECONDS)+"s");//计算内存使用量logMemory();logMemory方法如下:MemoryMXBeanmemoryMXBean=ManagementFactory.getMemoryMXBean();//堆内存使用情况MemoryUsagememoryUsage=memoryMXBean.getHeapMemoryUsage();//初始总内存longtotalMemorySize=memoryUsage.getInit();//已用内存longusedMemorySize=memoryUsage.getUsed();System.out.println("TotalMemory:"+totalMemorySize/(1024*1024)+"Mb");System.out.println("FreeMemory:"+usedMemorySize/(1024*1024)+"Mb");在上面的程序中,阿粉使用了ApacheCommon-Io开源的第三方库,FileUtils#readLines会将文件的所有内容读入内存。这个程序简单测试没有问题,但是当拿到真正的数据文件,运行程序的时候,程序很快就会出现OOM。OOM发生的主要原因是数据文件太大。假设上述测试文件test.txt共有200W行数据,文件大小为:740MB。通过上面的程序读取内存后,我电脑上的内存使用情况如下:可以看到一个实际大小700多M的文件,读取内存占用的内存有1.5G之多.在我之前的程序中,虚拟机设置的内存大小只有1G,所以程序出现了OOM。当然,这里最简单的方法就是加内存,将虚拟机的内存设置为2G,甚至更多。然而,机器的内存总是有限的。如果文件比较大,还是没有办法全部加载到内存中。但是仔细想想,真的需要一次性把所有的数据加载到内存中吗?显然,不!在上面的场景中,我们将数据加载到内存中,最后对数据进行一个一个的处理。所以下面我们修改读取方式为逐行读取。逐行阅读逐行阅读的方法有很多种。这里阿芬主要介绍两种方式:BufferReaderApacheCommonsIOJava8streamBufferReader我们可以使用BufferReader#readLine来逐行读取数据。try(BufferedReaderfileBufferReader=newBufferedReader(newFileReader("temp/test.txt"))){StringfileLineContent;while((fileLineContent=fileBufferReader.readLine())!=null){//processtheline。}}catch(FileNotFoundExceptione){e.printStackTrace();}catch(IOExceptione){e.printStackTrace();}ApacheCommonsIOCommon-IO有一个方法FileUtils#lineIterator可以实现逐行读取。代码如下:Stopwatchstopwatch=Stopwatch.createStarted();LineIteratorfileContents=FileUtils.lineIterator(newFile("temp/test.txt"),StandardCharsets.UTF_8.name());while(fileContents.hasNext()){fileContents.nextLine();//传递}logMemory();文件内容。关闭();stopwatch.stop();System.out.println("readalllinesspend"+stopwatch.elapsed(TimeUnit.SECONDS)+"s");这个方法返回一个迭代器,就是我们每次都能拿到的一行数据。其实我们看代码其实可以发现FileUtils#lineIterator,其实就是BufferReader。感兴趣的同学可以自行查看源码。由于公众号不能插入外链,关注“Java极客技术”,回复“20200610”获取源码.秒表stopwatch=Stopwatch.createStarted();//lines(Pathpath,Charsetcs)try(StreaminputStream=Files.lines(Paths.get("temp/test.txt"),StandardCharsets.UTF_8)){inputStream.filter(str->str.length()>5)//过滤data.forEach(o->{//passdosamplelogic});}logMemory();stopwatch.stop();System.out.println("readalllinessspend"+stopwatch.elapsed(TimeUnit.SECONDS)+"s");使用这种方式的好处是我们可以方便的使用Stream链式操作来做一些过滤操作。注意:这里我们使用try-with-resources方法来安全保证读取结束,可以安全关闭流。并发读取和逐行读取的方法解决了我们的OOM问题。但是如果数据很多的话,我们一行一行的处理起来会花费很多时间。在上面的方法中,只有一个线程在处理数据,所以我们其实可以增加几个线程来提高并行度。接下来,阿奋将在上述基础上抛砖引玉,介绍阿奋比较常用的两种并行处理方式。第一种方式是逐行打包批次。首先逐行读取数据,加载到内存中,等到积累了一定的数据量,再交给线程池进行异步处理。@SneakyThrowspublicstaticvoidreadInApacheIOWithThreadPool(){//创建线程池,最大线程数10,最大队列数100ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(10,10,60l,TimeUnit.SECONDS,newLinkedBlockingDeque<>(100));//使用Apache方式逐行读取数据LineIteratorfileContents=FileUtils.lineIterator(newFile("temp/test.txt"),StandardCharsets.UTF_8.name());Listlines=Lists.newArrayList();while(fileContents.hasNext()){StringnextLine=fileContents.nextLine();lines.add(nextLine);//当读到100000时if(lines.size()==100000){//分成两个50000,handover异步线程处理List>partition=Lists.partition(lines,50000);ListfutureList=Lists.newArrayList();for(Liststrings:partition){Futurefuture=threadPoolExecutor.submit(()->{processTask(strings);});futureList.add(future);}//等待两个线程完成任务的执行,然后再次读取数据。这样做的目的是防止任务过多,数据加载过多,导致OOMfor(Futurefuture:futureList){//等待执行结束future.get();}//清空内容lines.clear();}}//lines如果还有剩余,则继续执行Stringline:strings){//模拟业务执行try{TimeUnit.MILLISECONDS.sleep(10L);}catch(InterruptedExceptione){e.printStackTrace();}}}以上方法,当内存中的数据达到10000条时,解包这两个任务为异步线程执行,每个任务处理50,000行数据。后续使用future#get()等待异步线程完成,主线程才能继续读取数据。主要原因是线程池中的任务过多,再次导致OOM问题。大文件拆分成小文件第二种方法,首先我们把一个大文件拆分成几个小文件,然后使用多个异步线程逐行处理数据。publicstaticvoidsplitFileAndRead()throwsException{//先将大文件拆分成小文件ListfileList=splitLargeFile("temp/test.txt");//创建一个线程,最大线程数为10,线程数最大100个池ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(10,10,60l,TimeUnit.SECONDS,newLinkedBlockingDeque<>(100));ListfutureList=Lists.newArrayList();for(Filefile:fileList){Future未来=线程池执行器。submit(()->{try(StreaminputStream=Files.lines(file.toPath(),StandardCharsets.UTF_8)){inputStream.forEach(o->{//模拟执行业务try{TimeUnit.MILLISECONDS.sleep(10L);}catch(InterruptedExceptione){e.printStackTrace();}});}catch(IOExceptione){e.printStackTrace();}});futureList.add(future);}for(Futurefuture:futureList){//等待所有任务完成future.get();}threadPoolExecutor.shutdown();}privatestaticListsplitLargeFile(StringlargeFileName)throwsIOException{LineIteratorfileContents=FileUtils.lineIterator(newFile(largeFileName),StandardCharsets.UTF_8.name());清单<斯特里ng>lines=Lists.newArrayList();//文件序号intnum=1;Listfiles=Lists.newArrayList();while(fileContents.hasNext()){StringnextLine=fileContents.nextLine();lines.add(nextLine);//每个文件10w行数据if(lines.size()==100000){createSmallFile(lines,num,files);num++;}}//如果还有剩余行,继续执行if(!lines.isEmpty()){//继续执行createSmallFile(lines,num,files);}returnfiles;}上面的方法先将一个大文件拆分成多个保存10W行数据的小文件,然后Small文件交给线程池进行异步处理。由于这里的异步线程每次都是一行一行的从小文件中读取数据,所以这种方式不需要像上面的方式那样担心OOM问题。另外,上面我们使用Java代码将大文件拆分成小文件。这里还有一个简单的方法,我们可以直接使用下面的命令将大文件直接拆分成小文件:#将大文件拆分成100000个小文件split-l100000test.txt后面的Java代码只需要直接读取小文件即可.总结当我们从文件中读取数据时,如果文件不是很大,可以考虑一次性读入内存,然后快速处理。如果文件太大,我们无法一次性加载到内存中,所以需要考虑逐行读取,然后再对数据进行处理。但是单线程处理数据毕竟是有限的,所以我们考虑使用多线程来加快数据处理速度。在本文中,我们只是简单介绍了几种从文件中读取数据的方法。数据读取出来之后,我们肯定需要对其进行处理,然后最后将其存入数据库或者输出到另一个文件中。这个过程说实话挺麻烦的,因为我们的数据源文件可能是txt或者excel,所以需要添加多种读取方式。同样,当数据处理完成后,也存在同样的问题。幸运的是,我们可以使用SpringBatch来完美解决以上问题。