当前位置: 首页 > 后端技术 > Python

这么一搞,再也不怕线程打架了_0

时间:2023-03-26 12:40:45 Python

这样我们就不会再害怕线程打架了。在另一个文件中。最简单的方法是打开文件并逐行读取。读取一行后,处理这一行,添加到目标文件中,然后再回来读取下一行。这就是线性处理方法。如果处理一行数据需要0.1秒,那么线性处理方式需要:10万秒,也就是28小时左右。显然,这个时间对我们来说有点长。有什么办法可以缩短吗?当然有办法,那就是使用多线程!为什么?因为多线程是提高效率、实现更有效程序的必然状态。比如你需要处理大量的数据,你需要响应各种请求,你需要与处理缓慢的进程进行交互等等,你需要使用线程编程。但是,线程的概念并不好理解,使用起来总是不方便,而且容易出错。一方面,我们的思维是线性的,另一方面,多线程本身就有很多概念需要掌握。学习理解难度比较高。今天分享一下我是如何在工作中使用多线程技术来提高速度和效率的。对于前面的例子,原来的处理流程可以分解成多个。比如前面的处理可以分解成三个自流:读行、做计算、存文件。在这种情况下,相当于让更多的人去做了只能由一个人完成的工作,从而形成了类似的管道效应。那么使用多线程可以让我们的三个作业同时运行,提高效率,比如先读一行,然后边处理数据边读下一行,等等。感觉还好吗?别着急,首先你需要解决一个问题——如何避免重复阅读和跳读。重复读是指多个线程读取了同一条数据;跳过意味着一些数据行没有被任何线程处理。这里有一个帮助我处理很多多线程问题的方法,一个数据源类。多线程数据源类数据源类对数据进行集中管理,然后以线程安全的方式为多线程程序提供数据。注意:这不是最好的方法,但很实用。废话不多说,直接看代码:importthreadingclassDataSource:def__init__(self,dataFileName,startLine=0,maxcount=None):self.dataFileName=dataFileNameself.startLine=startLine#第一行的行号为1self.line_index=startLine#当前读取位置self.maxcount=maxcount#读取最大行数self.lock=threading.RLock()#同步锁self.__data__=open(self.dataFileName,'r',encoding='utf-8')foriinrange(self.startLine):l=self.__data__.readline()defgetLine(self):self.lock.acquire()try:ifself.maxcountisNoneorself.line_index<(self.startLine+self.maxcount):line=self.__data__.readline()ifline:self.line_index+=1returnTrue,lineelse:returnFalse,Noneelse:returnFalse,NoneexceptException作为e:归法lse,"处理错误:"+e.argsfinally:self.lock.release()def__del__(self):ifnotself.__data__.closed:self.__data__.close()print("关闭数据源:",self.dataFileName)init初始化方法,接受3个参数lock属性是同步锁,这样多线程读取不会有冲突待分配处理,maxcount读取的最大行数。配合startLine,可以读取指定部分的数据。默认是读取所有getLine方法。每次调用都会返回一个元组,包括状态和获取的数据。对象销毁时会调用del方法,在这里记录当前的处理位置,以便在多线程程序中应用,承担读取挂起记录的任务例如业务处理的核心处理程序如下:importtimedefprocess(worker_id,datasource):count=0whileTrue:status,data=datasource.getLine()ifstatus:print(">>>Thread[%d]isgettingdataandcurrentlyProcessing..."%worker_id)time.sleep(3)#等待3秒模拟处理过程print(">>>thread[%d]processingdatacompleted"%worker_id)count+=1else:break#退出循环print(">>>线程[%d]结束,处理[%d]条数据"%(worker_id,count))参数worker_id为线程号,即用于区分输出消息。参数datasource是DataSource的实例,每个线程共享数据源count,用于记录当前线程处理的记录数。使用死循环,驱动反复处理,直到没有数据可读为止。线程组装部分也很简单:importthreadingdefmain():datasource=DataSource('data.txt')workercount=10#开启的线程数,注意:越多越快workers=[]foriinrange(workercount):worker=threading.Thread(target=process,args=(i+1,datasource))worker.start()workers.append(worker)forworkerinworkers:worker.join()先初始化一个DataSourceworkercount为根据需要创建线程数在实际应用中可以通过配置或参数来提供。另外,线程越多越好,一般设置为CPU核心数的2倍。threading.Thread是一个线程类,可以实例化一个线程。target参数是Thread的处理方法,这里是前面定义的process方法,args是提供给处理方法的参数。线程的start方法就是启动线程,因为创建不等于start,start是异步方法,调用会瞬间完成。join方法是等待线程处理完成,是一个同步方法,只有线程真正处理完才会结束extensionDataSource的getLine实现就可以了,比如数据源来自数据库。另外,上面的DataSource也不是最优的,只是起到了规范读取接口,防止数据误读的作用,性能上根本算不上最优。那么如何实现更好,这里提出一个思路,使用生产者消费者模型,使用队列,预读技术,实现更好的数据源类。比如在DataSource中,是逐行读取的,可以使用预读,即提前读取一些数据。当线程需要数据时,先给出预读,当预读数据消耗到一定量时,再异步读取一部分。这样做的好处是每个线程不用等待IO时间(简单理解为从文件或网络读取的等待时间)。如何实现,大家可以了解队列(queue)的概念,Python提供了两种队列,同步队列queue和queueset。想想怎么办?欢迎在留言区写下你的方法和建议。小结今天分享一个在实际工作中使用的多线程数据处理的例子。例子虽然简单,但是非常实用,帮助我处理了很多重要的任务。说说一些感悟,Python的应用不仅仅局限于数据分析、AI等热门领域,更多的可以应用到日常工作中,比如处理数据、替代人工操作、简单计算等。我们知道最好的学习东西的方法就是使用它。Python技能也是如此。多用于日常工作,用于解决实际问题。以上就是本次分享的全部内容。想了解更多python知识,请前往公众号:Python编程学习圈,每日干货分享