如果你在Flask中开启了一个子线程,然后在子线程中读写g对象或者试图从request对象中读取url参数,那么你一定对下面的错误不陌生:RuntimeError:在请求上下文之外工作。例如,以下Flask代码:(f'userIDis:{doc_id}')@app.route('/start_thread')defstart_thread():thread=threading.Thread(target=inner_func)thread.start()return{'success':True,'msg':'获取用户ID成功!'}request/start_thread接口会报错,如下图:如果搜索flaskthreadRuntimeError:Workingoutsideofrequestcontext。在网上,那么你可能会看到官方文档或者StackOverFlow提供的一个装饰器@copy_current_request_context。如下图:这么写确实可以解决问题,如下图:但是不管是官网还是StackOverFlow,它的例子都很简单。但是我们知道启动线程的方式有很多种,例如:#方法一,启动一个简单的线程importthreadingjob=threading.Thread(target=函数名,args=(参数1,参数2),kwargs={'parameter3':xxx,'参数4':yyy})job.start()#方法二、使用类定义threadimportthreadingclassJob(threading.Thread):def__init__(self,parameter):super().__init__()defrun(self):print('子线程开始运行')job=Job(parameter)job.start()#方法三,使用线程池frommultiprocessing.dummyimportPoolpool=Pool(5)#线程池5个线程池。map(functionname,parameterlist)网上的方法只能解决第一种写法的问题。如果要使用方法2和方法3来启动子线程,代码应该怎么写呢?如果在子线程中再次启动子线程,再使用@copy_current_request_context是不是就可以了?相信我,如果你在网上搜索一下午,结果只有两种:一种是你找不到答案,另一种是你找到的答案晚于2023年1月14日,因为别人看了我的文章再写。在回答上述问题之前,我先说明一下我对后台启动子线程行为的看法。比如有些人喜欢在后端挂一个爬虫。请求接口后,通过线程启动爬虫,爬虫开始爬取数据。或者,有人在后台挂了一些复杂的程序代码。请求接口后,后端启动一个子线程,在子线程中运行这些代码。我一向不建议在后端开启子线程来做复杂的操作。无论您使用的是Flask还是Django或FastAPI。正确的做法应该是使用消息队列,后端只是将触发任务的相关参数发送到消息队列中。下游真正运行的程序从消息队列中读取到触发参数后,开始运行。但有时候,你可能会综合考虑成本效益,觉得再增加一个消息队列成本太高;或者您只是因为截止日期而不得不暂时使用多线程来解决问题,那么本文将对您有很大帮助。.尽量不要在子线程中读取request相关的参数如果你的子线程不需要读写g对象,也不需要从request中读取各种参数,那么你可以关闭这篇文章。因为你的子线程可以直接运行,不会遇到任何问题,例如:所以最好的解决办法是在启动子线程之前,提前获取子线程需要的每一个参数,然后把这些参数放在在启动子线程时将其作为函数参数传入。如果您是从头开始编写代码,那么一开始就这样做可以为您省去很多麻烦。但是如果你是修改已有的代码,嵌套太深,没办法一层层传入参数,或者代码量太大,不知道在什么地方悄悄调用了g对象或者请求上下文是读写的,那么你可以继续读下去。装饰闭包函数,而不是在一级函数上面简单多线程的写法。有一个地方需要特别注意。@copy_current_request_context修饰的子线程入口函数inner_func必须是闭包函数,不能是一级函数。如下图所示:如果不小心修饰了一级函数,会报如下错误:threadpoolcopyrequestcontext当我们使用multiprocessing.dummy实现线程池时,代码如下:frommultiprocessing.dummyimportPoolfromflaskimportFlask,request,copy_current_request_context,gapp=Flask(__name__)@app.route('/start_thread',methods=['POST'])defstart_thread():@copy_current_request_contextdefcrawl(doc_id):url_template复制代码=request.json.get('url_template','')url=url_template.format(doc_id=doc_id)print(f'startcrawling:{url}')doc_id_list=[123,456,789,111,222,333,444]pool=Pool(3)pool.map(crawl,doc_id_list)return{'success':True,'msg':'抓取文章成功!'}运行效果如下图所示:整体写法类似于threading.Thread方法启动一个简单的线程。用类定义线程时复制请求上下文当我们额外定义一个线程类时,我们需要将修饰的闭包函数传入子线程,然后在子线程的run()方法中运行:importthreadingfromflask导入Flask,请求,copy_current_request_contextapp=Flask(__name__)classJob(threading.Thread):def__init__(self,func):super().__init__()self.func=funcdefrun(self):self.func()@app.route('/start_thread',methods=['POST'])defstart_thread():@copy_current_request_contextdefrunner():doc_id=request.json.get('doc_id','')print(valueoff'docIdYes:{doc_id}')job=Job(runner)job.start()return{'success':True,'msg':'阅读文章成功!'}运行效果如下图所示:嵌套子线程复制请求上下文有时候,我们先创建一个子线程,然后在子线程中,我们需要创建一个孙子线程。并读取孙子线程中的请求上下文。例如下面的代码:importthreadingfrommultiprocessing.dummyimportPoolfromflaskimportFlask,request,copy_current_request_contextapp=Flask(__name__)defdeep_func_runner(doc_id_list):@copy_current_request_contextdefdeep_func(doc_id):category=request.args.get('category','')url=f'https://www.kingname.info/{category}/{doc_id}'print(f'开始取:{url}')pool=Pool(3)pool.map(deep_func,doc_id_list)@app.route('/start_thread',methods=['POST'])defstart_thread():@copy_current_request_contextdefrunner():doc_id_list=[111,222,333,444,555,666,777,888,999]deep_func_runner(doc_id_list)job=threading.Thread(target=runner)job.start()return{'success':True,'msg':'读取文章成功!'}此时使用@copy_current_request_context就会报您的一个错误:ValueError:
