当前位置: 首页 > 后端技术 > Node.js

异步源码系列

时间:2023-04-03 13:49:21 Node.js

前言最近在看Node设计模式中异步编程的顺序异步迭代。简单的实现如下:functionseries(tasks,callback){letresults=[];functioniterate(index){if(index===tasks.length){returnfinish();}consttask=tasks[索引];任务(函数(错误,资源){结果。推(资源);迭代(索引+1);});}functionfinish(){//迭代完成的操作callback(null,results);}迭代(0);}series([callback=>{setTimeout(function(){console.log(456);callback(null,1);},500);},callback=>{console.log(123);callback(null,2);}],function(err,results){console.log(results);});//456//123//[1,2]异步库是处理Node中异步代码的非常流行的解决方案.js和JavaScript。它提供了一组函数,可以大大简化一组任务在不同配置下的执行,并为异步处理集合提供有用的帮助。async库可以极大地帮助我们实现复杂的异步控制流,但是一个难点是选择合适的库来解决问题。例如,对于顺序执行,大约有20种不同的函数可供选择。当我好奇的时候,我想看看一个成熟的库与我们简单的代码有何不同。series按顺序运行任务集合中的函数,每个函数在前一个函数完成后运行。如果系列中的任何函数将错误传递给其回调函数,则不再运行任何函数并立即使用错误值调用回调函数。否则,回调会在任务完成时收到一组结果。constasync=require('async');async.series({一:function(callback){setTimeout(function(){callback(null,1);},200);},二:function(callback){setTimeout(function(){callback(null,2);},100);}},function(err,results){console.log(results);//结果现在等于:{one:1,two:2}});我们看一下源码,找到系列的方法。可以看到:functionseries(tasks,callback){_parallel(eachOfSeries,tasks,callback);}除了我们传的这两个参数,我们还默认传了一个eachOfSeries,再往下看:function_parallel(eachfn,tasks,callback){//noop:空函数callback=callback||没有;//isArrayLike:检查'value'是否类似于数组varresults=isArrayLike(tasks)?[]:{};eachfn(tasks,function(task,key,callback){//wrapAsync:包装成异步wrapAsync(task)(function(err,result){if(arguments.length>2){result=slice(arguments,1);}结果[键]=结果;回调(错误);});},函数(错误){回调(错??误,结果);});}这里可以看出_parallel方法其实就是eachOfSeries方法的调用首先解释一下eachOfSeries的三个参数:一个参数是要执行的函数集合。第二个参数可以看成是各个函数的执行(wrapAsync可以先忽略,直接看成是这个函数)。第三个参数是所有函数执行完后的回调。让我们看看eachOfSeries是如何实现的:vareachOfSeries=doLimit(eachOfLimit,1);functioneachOfLimit(coll,limit,iteratee,callback){_eachOfLimit(limit)(coll,wrapAsync(iteratee),callback);}functiondoLimit(fn,limit){returnfunction(iterable,iteratee,callback){returnfn(iterable,limit,iteratee,callback);}};}让我们转换上面的内容以使其更清楚:soga,最后是调用_eachOfLimit完成的://limit:异步操作的最大次数,传1看成一个系列,执行完一个函数再进行下一个函数_eachOfLimit(limit){returnfunction(obj,iteratee,callback){//once:函数只运行一次callback=once(callback||noop);如果(限制<=0||!obj){返回回调(空);}//iterator:迭代器,按类型分类,这里简单使用数组迭代器createArrayIterator来分析varnextElem=iterator(obj);变种完成=假;变种运行=0;变种循环=假;函数oniterateeCallback(err,value){running-=1;如果(错误){完成=真;回调(错误);}elseif(value===breakLoop||(done&&running<=0)){done=true;返回回调(空);}elseif(!looping){replenish();}}functionreplenish(){looping=true;while(running{setTimeout(function(){console.log(456);callback(null,1);},500);},callback=>{console.log(123);callback(null,2);},callback=>{setTimeout(function(){console.log(789);callback(null,3);},0);}],function(err,results){console.log(results);});//456//123//789//[2,1,3]根据我自己的理解,主线程和EventLoop都是executedOneround:第一轮按照上面的流程,主线程去_eachOfLimit()调用replenish()。根据while循环(运行次数<一次异步操作的最大次数限制),running+=1,进入集合中第一个函数setTimeout的调用,setTimeout进入EventTable,注册回调函数。回到while循环,running=limit,结束循环,结束主线程。setTimeout事件完成,回调函数进入EventQueue。主线程从EventQueue中读取并执行回调函数,回调iterateeCallback,running-=1,调用replenish()。第二轮重复第一轮。唯一不同的是,集合中的第二个函数是同步的,一切都由主线程一路往下执行。第三轮重复第一轮。第四轮收集的三个函数都执行完了,通过iterator()闭包得到的结果为null,回调最终结果。wrapAsyncfunctionwrapAsync(asyncFn){返回isAsync(asyncFn)?asyncify(asyncFn):asyncFn;}varsupportsSymbol=typeofSymbol==='function';functionisAsync(fn){returnsupportsSymbol&&fn[StringTag]==to.'AsyncFunction';}wrapAsync()首先判断是否为异步函数,如果是es7AsyncFunctions,则调用asyncify,否则返回原函数。functionasyncify(func){returninitialParams(function(args,callback){varresult;try{result=func.apply(this,args);}catch(e){returncallback(e);}//如果结果是承诺对象if(isObject(result)&&typeofresult.then==='function'){result.then(function(value){invokeCallback(callback,null,value);},function(err){invokeCallback(callback,err.message?err:newError(err));});}else{callback(null,result);}});}varinitialParams=function(fn){返回函数(/*...args,callback*/){varargs=slice(参数);var回调=args.pop();fn.call(this,args,回调);};};接受一个同步函数并使其异步,并将其返回值传递给回调函数。如果传递给asyncify的函数返回一个Promise,则该Promise的已解决/拒绝状态将用于调用回调,而不仅仅是同步返回值。综上所述,平日习惯async-await和promise。好用,但也导致了思维的匮乏。但是尝试使用原生js来模拟和阅读源码可以带来更多的收获。Github地址,喜欢就支持star,谢谢?(?ω?)?。