预览。先给出一个基础类代码。constEventEmitter=require('events')constdebug=require('debug')('transform')classTransformextendsEventEmitter{constructor(options){super()this.concurrency=1Object.assign(this,options)this.pending=[]this.working=[]this.finished=[]this.failed=[]this.ins=[]this.outs=[]}push(x){this.pending.push(x)这个.schedule()}pull(){让xs=this.finishedthis.finished=[]this.schedule()returnxs}isBlocked(){return!!this.failed.length||//被失败阻止!!this.finished.length||//被输出缓冲区阻塞(惰性)this.outs.some(t=>t.isBlocked())//被输出阻塞transform}isStopped(){return!this.working.length&&this.outs.every(t=>t.isStopped())}root(){returnthis.ins.length===0?this:this.ins[0].root()}pipe(next){this.outs.push(next)next.ins.push(this)returnnext}print(){debug(this.name,this.pending.map(x=>x.name),this.working.map(x=>x.name),this.finished.map(x=>x.name),this.failed.map(x=>x.name),this.isStopped())this.outs.forEach(t=>t.print())}schedule(){//如果阻塞则停止工作if(this.isBlocked())returnthis.pending=this.ins.reduce((acc,t)=>[...acc,...t.pull()],this.pending)while(this.working.length{this.working.splice(this.working.indexOf(x),1)if(err){x.error=errthis.failed.push(x)}else{if(this.outs.length){this.outs.forEach(t=>t.push(y))}else{if(this.root().listenerCount('data')){this.root().emit('data',y)}else{这个。完成的。推(y)}}}这个。计划()这个。根()。emit('step',this.name,x.name)})}}}module.exports=Transform此代码目前仍处于起步阶段。Transform类的设计类似于node中的stream.Transform,但其设计目的不是缓冲或流式性能,而是作为并发编程的基础模块。如果你熟悉流式编程,Transform的设计很容易理解;在内部,Transform维护着四个队列:pending是输入bufferworking是当前正在执行的tasksfinished是输出buffer,它的目的不是为了buffer输出,而是在没有其他输出方式的时候Make一个buffer。失败是一个失败的任务。Transform可以组合成DAG(DirectedAcyclicGraph)。输入和输出用于存储对转换前和转换后的引用。管道方法负责设置这个双向链接;最常见的情况是双向链表,即Bothins和outs只有一个对象。但是将它们设计为阵列允许扇入、扇出结构。推和拉相当于写和读。schedule是核心函数,它的任务是填充工作队列。构造函数的参数中应提供一个名为transform的异步函数。计划使用此功能来运行任务。操作完成后,根据结果将task推入failed队列,推入下一个Transformer,随根节点的emit输出。或者将其推送到您完成的队列。Transform设计的核心思想是不使用对象属性,只使用队列位置来编码并发任务的状态;任何子任务,在任何时候,只存在于一个Transform对象的某个队列中。换句话说,它相当于用资源对并发任务进行建模。如果您熟悉restfulapi模型过程或状态的方式,这很容易理解。在Transform中,任何transform异步函数的返回都是一个step;step是使用Transform实现并发组合的最重要的概念;transform函数每次返回,都会改变自己的queue或者push任务给后续的Transform对象,这个push动作会触发后续Transform的schedule方法;当步骤结束时,它自己的schedule方法也会被调用,它会重新填充任务。这些动作完成后,所有Transform队列的变化都是整个组合任务状态机的下一个状态。这个状态是显式的,可以打印出来,对调试很有帮助;虽然异步I/O会使这种状态变得不确定,但至少在处理并发问题时遵循了组合状态机模型的同步原则,在每一步结束时作为一个整体进行一次状态转换。这种状态转换可以很好地定义和观察。这是Event模型下的并发编程和Thread模型的一个重要区别。后者遇到并发逻辑引起的细微错误时,很难捕捉到现场分析,因为每个Thread都是一个黑盒子。从transform的返回到emit(step)的一系列链式动作是中间过程,最终实现了一个完整的状态转换。这个过程必须是同步的。此处不应该有async、setImmediate或process.nextTick调用,它们会引入额外的不确定性和极难查找和修复的错误。在之前长时间的并发编程实践中,指出了Promise在某些场景下race/settle和错误处理逻辑的难点。Promise的流程逻辑并不完整。我也花了不少心思在Processalgebra层面定义error,success,finish,race,settle,abort,pause,resume,以及它们的组合逻辑,但最后发现很难,因为各种都有太多的情况要处理。所以在Transform的设计中,这些逻辑都被抛弃了,因为实际上它们并不是真正的基本并发逻辑。Transform试图实现组合的基本并发逻辑只有一种:停止。stopped的定义很简单:在一个步骤结束时,所有Transform工作队列都为空,即(整体)停止。在这里我想再次强调在上述步骤结束时使用同步方法的必要性。如果在schedule中使用异步方法调用,那么stopped的判断可能是错误的,因为schedule可能会在eventloop中放置一个immediatecall。会产生一个新的workingtask动作,isStopped()的判断是错误的。停止时,整体组合状态可能是成功、错误、暂停等,不难判断,但代码还不稳定,不打算加语法糖。在blockingi/o和synchronous的编程模式下,因果链和代码编写形式是一致的,但是在异步编程中,因果关系是异步并发的,只能改变原因,再观察结果,这是一个很多程序员无法适应异步编程的根本原因在于它改变了思维习惯。使用Transform来处理并发编程,仍然是试图重建因果链,即使它们是并发的,但我们必须有一种方法将它们联系起来;前面提到的isStopped()是一个观察结果,可以影响它因为是isBlocked()函数,这个函数在schedule中被调用,如果值为true,会阻止schedule继续向工作队列。这里写的isBlocked()的代码实现只是一个例子;阻塞调度的原因可能有很多,比如错误,或者输出缓冲区已满,这可以由实现者定义。它们是策略,而isBlocked()本身是一种机制。该策略的粒度是每个Transform对象都可以有自己的策略。例如,删除临时文件的操作结果是无关紧要的,因此不应该因为错误而被阻塞。isBlocked()的逻辑可以像示例代码中那样链下来,即只要有后续任务块,前导任务就应该停止;在大多数情况下,这是合理的逻辑。因为虽然我们写的是流处理方法,但是我们处理的不是octet-stream。性能的缓冲和流量控制是没有意义的。如果复制文件后面的任务需要移动到目标文件夹,如果目标文件文件夹有问题,前面快速移动大量文件,最终没有成功。如果组合状态机停止,对任意一个Transform对象执行推或拉操作可以使整个状态机继续移动。从根节点推送是常见的,从叶节点推送也是常见的,也可以推送到中间节点;资源建模的好处之一是您可以将状态呈现给用户。如果由于文件名冲突导致复制文件的任务失败,您还可以让用户选择一种处理策略,例如覆盖或重命名。用户选择一个操作后,代码会从一个Transform对象的failed队列中取出一个对象,修改策略参数重新压入,然后这个状态机Execution可以继续;这种可处理的错误不应该成为阻止整个状态机工作(复制其他文件和文件夹)的原因,除非它们累积到相当数量,这在Transform模式下很容易实现,开发人员的策略是Blocked()可以写得很简单;和node的stream一样,Transform是惰性的,纯push机可能会在中间节点缓存大量的task,不适合将task作为stream来处理;同时,Lazy对于停止的组合状态机继续运行非常重要。pull方法就是为此目的而设计的。它的调度逻辑和push一样,只是方向相反;如果叶子节点由于输出缓冲而被设置为阻塞,它可以阻塞整个状态机(或其中的一部分),这在某些情况下也是一个有用的功能,如果整个状态机的输出不能立即被消费因为某些原因。代码中没有实现abort逻辑,但是很容易,遍历所有Transforms,如果工作队列中的对象有abort方法,就调用;这不是立即暂停,对象仍然必须通过回调返回才能停止。如果要全局阻塞,可以把所有的LeafNode都pipe到一个sink节点,强制这个sink节点被Blocked,就可以全部阻塞。暂停和恢复也是非常相似的逻辑。当然,你可能会遇到像finally这样必须执行的逻辑。即使发生错误,也意味着Transform会将isBlocked信息向前传递,但它的Schedule方法不必停止工作。它可以运行,直到处理完所有队列任务。也可以重载schedule方法;例如,如果您的任务具有上下文相关的逻辑,您可以重载schedule方法来实现您自己的调度方法。另外,这里的schedule代码只是基于transform函数。显然,如果transform本身是一个Transform对象,那么它应该也可以实现组合过程,包括Sequencer,Parallel等,都需要实现。总而言之,isBlocked和schedule是分开的逻辑。他们有不同的设计目的和使命。您可以重载它们以获得您想要的结果。因此,这里写的代码,重要的不是它们的实现,而是它的机制设计、接口设计、接口承诺;所有逻辑足够原子化,每个函数只做一件事,isBlocked是原因,可以根据需要选择策略,isStopped是结果,通过step观察并执行后续逻辑。您应该避免通过向基类添加新方法来扩展功能,因为Transform使用队列和任务来描述状态。这个描述是完整的,机制是完整的。正如我在另一篇介绍JavaScript语言的文章中所写,如果问题的模型是完整的,即使它是抽象的,你也可以通过结合基本操作和概念而不是向模型添加概念来获得更多的特征,除非你认为模型不够完整。在软件工程中,并不是处处都需要状态机(自动机)等严格的模型工具。项目软件写的bug数量已经够少了,但是如果你要写系统软件或者对正确性有严格要求的东西,如果你不使用状态机建模,那么你其实没有一个完整的设计。当然,设计完备并不代表软件没有bug,但是好的设计可以帮助你在遇到问题的时候理解问题,找到原因,帮助很大。在一个复杂的系统中,上面的同步方法状态机组合和Hierarchical状态机组合是我们目前已知的两种完整的模型方法。但两者不同。Transform的组合虽然看起来像Hierarchy,但实际上就像是在纸上画了一棵树。它仍然是二维的。每一步整体状态联动的迁移只在populatestate迁移范围内,而不是呈几何级数递增的状态组合;所以我们仍然可以构建一个线性因果链,每一步因果因果关系都这样延续下去,和没有并发的状态机是一样的。本质上这就是数学归纳法:如果我们能证明如果n是正确的,那么n+1也是正确的,这就证明状态链组合是正确的,即使它是无限的。第二段代码是使用示例。这个类不是必须要保证兼容老代码接口,因为有些项目中有其他代码的依赖就不解释了,大体逻辑很容易看懂;这里列出的只是显示使用Transform时管道过程的代码。constPromise=require('bluebird')constpath=require('path')constfs=Promise.promisifyAll(require('fs'))constEventEmitter=require('events')constdebug=require('debug')('dircopy')constrimraf=require('rimraf')constTransform=require('../lib/transform')const{forceXstat}=require('../lib/xstat')constfileCopy=require('./filecopy')classDirCopyextendsEventEmitter{constructor(src,tmp,files,getDirPath){super()letdst=getDirPath()letpipe=newTransform({name:'copy',concurrency:4,transform:(x,callback)=>(x.abort=fileCopy(path.join(src,x.name),path.join(tmp,x.name),(err,fingerprint)=>{deletex.abortif(err){callback(err)}else{callback(null,(x.fingerprint=fingerprint,x))}}))}).pipe(newTransform({name:'stamp',transform:(x,callback)=>佛rceXstat(path.join(tmp,x.name),{hash:x.fingerprint},(err,xstat)=>err?callback(err):callback(null,(x.uuid=xstat.uuid,x)))})).pipe(newTransform({name:'move',transform:(x,callback)=>fs.link(path.join(tmp,x.name),path.join(dst,x.name),err=>err?callback(err):callback(null,x))})).pipe(newTransform({name:'remove',transform:(x,callback)=>rimraf(path.join(tmp,x.name),()=>callback(null))})).root()letcount=0//排出数据pipe.on('data',data=>this.emit('data',data))pipe.on('step',(tname,xname)=>{debug('--------------------------------------')debug(`step${count++}`,tname,xname)pipe.print()if(pipe.isStopped())this.emit('停止')})files.forEach(name=>pipe.push({name}))pipe.print()this.pipe=pipe}}module.exports=DirCopy