项目需要实现一个上传功能,每次上传20个视频,如果不处理直接上传,会出现阻塞问题。因为一个网页最多只能有6个同域名的tcp连接。不控制6条数据同时并发传输,但后续用户调用其他接口的所有操作都会被挂起。网络不好的时候就更灾难了,所以要控制并发,一次最多只转3个。正好又出一道这种类型的面试题,写个demo说明一下原理。后端代码后端需要有基础的返回,随机延迟,随机错误,能够跨域consthttp=require('http')http.createServer((req,res)=>{constdelay=parseInt(Math.random()*5000)console.log(delay);setTimeout(()=>{constreturnererror=Math.random()>0.8//if(returnerror){//res.writeHead(500,{////'内容-Type':'text/plain',//'Access-Control-Allow-Origin':'*',//'Access-Control-Allow-Headers':'Content-Type,Content-Length,Authorization,Accept,X-Requested-With,yourHeaderFeild',//'Access-Control-Allow-Methods':'GET',//}).end('error,from'+req.url+'after'+delay+'ms')//}else{res.writeHead(200,{//'Content-Type':'text/plain','Access-Control-Allow-Origin':'*','Access-Control-Allow-Headers':'Content-Type,Content-Length,Authorization,Accept,X-Requested-With,yourHeaderFeild','Aaccess-Control-Allow-Methods':'GET',}).end('OK,from'+req.url+'after'+delay+'ms')//}},delay)}).listen(3000,()=>{console.log('服务从3000开始!');})前端代码html主要是一个动画的展示,就不细说了。js分为两部分,一部分控制dom渲染,详见总结源码地址,这个不详述,第二部分是并发控制器的输入数组中的请求地址constinput=newArray(10.fill('http://localhost:3000/').map((item,index)=>`${item}${index+1}`)基础版本的并发控制主要是两个函数和一个承诺队列。一个负责不断递归地向队列中添加承诺。另一个负责产生promise和处理异步请求逻辑functionhandleFetchQueue(input,max){constrequestsQueue=[];//请求队列addReactive(requestsQueue)//给requestsQueue添加响应性,重写push、splice方法,可以同时改变页面上的队列dom。让我=0;constreq=(i)=>{//如果一个promise请求成功,删除队列中的promise,加入另一个请求returnfetch(input[i]).then(res=>res.text()).then(res=>{addEl(res)//结果渲染页面列表constindex=requestsQueue.findIndex(item=>item===req)requestQueue.splice(index,1)checkAddReq()})}constcheckAddReq=()=>{if(i>=input.length)return//请求数不能超过限制if(requestsQueue.length+1<=max){//并发数不能超过限制setTimeout(()=>{//添加延迟为了提高动画效果,requestsQueue.push(req(i++))checkAddReq()},50)}}checkAddReq();}handleFetchQueue(input,3)处理异常要考虑的问题是如果发生异常怎么办。异常和并发控制应该没有什么区别,也就是计数原则,只是这里需要在promise生成中添加额外的业务逻辑。我们在后端代码中打开随机错误的注释,然后更改前端核心逻辑constreq=(i)=>{returnfetch(input[i]).then(async(res)=>{if(res.status===500){//这里异步改变,让catchconsttext=awaitres.text()thrownewError(text);}else{returnres.text()}}).then(res=>addEl(res),error=>addEl(error,true)).finally(()=>{//无论成功还是失败,并发逻辑都是一样的,但是then里面的业务逻辑不一样constindex=requestsQueue.findIndex(item=>item===req)requestsQueue.splice(index,1)checkAddReq()})}总结全地址https://github.com/fyy92/code...index.html不考虑asynchronousindex1.htmlconsiderexceptionserve.jsbackend代码总结并发控制的关键在于两个函数,一个是控制业务Promise的生成,一个是迭代控制并发队列添加业务Promise。对于异常,需要保证并发逻辑相同,不同的是业务逻辑的处理。补充最近发现一个库asyncpool模仿一下consttimeout=i=>newPromise(resolve=>setTimeout(()=>resolve(i),i));constresults=awaitasyncPool(2,[1000,5000,3000,2000],timeout);//调用迭代器(i=1000)//调用迭代器(i=5000)//达到池限制2,等待更快完成...//1000完成1秒后//调用迭代器(i=3000)//达到池限制2,等待更快完成...//30004秒后完成//Calliterator(i=2000)//Itaration完成,等到运行完成...//5000在5秒后完成//2000在6秒后完成//解决,结果按给定的数组顺序传递`[1000,5000、3000、2000]`。网上的版本一般是使用Promise.settlePromise.all,所以比如生成一个完整的Promse数组,而我们前面介绍的是分阶段生成Promise,所以晚上,相当于封装了我们生成promise的操作,然后就可以使用Promise.settlePromise.all了。这里直接使用Promise.settlePromise.all的原理还是按照前面的思路做unctionasyncPool(poolLimit,array,iteratorFn){returnnewPromise((resolve,reject)=>{constqueue=[]constresult=[]letnum=0leti=0constadd=()=>{//console.log(i,queue.length)if(i>=array.length)returnif(queue.length>=poolLimit)returnconstp=iteratorFn(array[i]).then((res)=>{//这里最好用finally但是注意finallyres是undefined//console.log(i)result.push(res)queue.splice(queue.findIndex(i=>i===p),1)add(i)//i已经添加了返回的num++console.log(num)if(num===array.length){resolve(结果)}})queue.push(p)add(++i)}add(0)})}letoldtime=newDate().getTime()consttimeout=i=>newPromise(resolve=>setTimeout(()=>{resolve(i)//console.log(i,newDate().getTime()-oldtime)},i));asyncPool(2,[1000,5000,3000,2000],超时).then(res=>{console.log(res)//1000300050002000})
