当前位置: 首页 > 后端技术 > Node.js

PromiseisnotCallback

时间:2023-04-03 23:56:17 Node.js

这篇文章是实际工程中难得遇到的例子;它反映了Node.js中两种编程范式之间的设计冲突。这种冲突是普遍的,但本文只分析问题的本质,不讨论更高层次的抽象。我正在写一个类似于HTTP的资源协议,叫做RP,ResourceProtocol,它和HTTP的区别在于RP建立在一个中立的传输层上;该传输层中的最小数据单元消息是一个JSON对象。该协议内置了对多路复用的支持,即一个传输层连接可以同时维护多个RP请求响应过程。考虑客户端请求类设计,类似于Node内置的HTTPClient,或者流行的npm包,如request或superagent;可以使用EventEmitter来发出error和responses事件,也可以使用NodeCallback,用户需要提供一个接口,形式为(err,res)=>{}回调函数。随着async/await的流行,request类也可以提供.then接口,实现方式如下(其实superagent就是这样实现的):classRequestextendsDuplex{constructor(){super()...this.promise=newPromise((resolve,reject)=>{this.resolve=resolvethis.reject=reject})}then(...args){returnthis.promise.then(...args)}}RP的实际设计和形式与我们熟悉的HTTPClient略有不同。响应对象本身不是流,而是将流作为属性提供。也就是说,回调函数的形式为:(err,{data,chunk,stream})=>{}如果请求返回的不是stream,则data或chunk有值;如果返回的是流,只有流有值,并且是stream.Readable类型。这种形式上的区别与本文讨论的问题无关。RP底层从传输层获取二进制数据,解析出报文,然后发送给上层;它采用一种简单的方法循环解析接收到的数据块,直到没有完整的消息。这意味着一次可以发送多条消息。请求对象还必须能够一次处理来自服务器的多条消息。我们要讨论的具体情况是服务器连续发送了两条消息:status200withstreambort。第一条消息表示后面还有消息流,第二条消息abort表示服务器出现意外,无法继续发送。当请求对象收到第一条消息时,它创建一个响应对象,其中包含流对象:this.res={stream:newstream.Readabe({...})}//this.emit('response',this.res)//this.callback(null,this.res)this.resolve(this.res)像评论中emit或trigger用户提供的回调,没有问题;但是如果你调用resolve,注意Promise保证是异步的,这意味着用户通过then提供的onFulfilled不会在当前tick被调用。下一个第二个消息,abort,在同一个tick中被处理;但是此时,因为用户还没有来得及挂载任何监听器,包括错误处理程序,如果设计要求这个流发出错误-非常合理的设计要求-此时,根据Node的约定,错误没有处理程序,整个程序崩溃。有很多方法可以解决这个问题。首先是request.handleMessage方法,如果不能同步完成消息的处理,又需要保证消息的处理顺序,应该对消息进行缓冲。这是node中最常见的同步方式,代表实现是stream.Writable。但是这里有个难点,this.resolve函数不提供回调,必须提前知道运行环境的Promise实现方法;在node中,是nextTick,所以nextTick在this.resolve之后,同时buffer中其他后续消息的处理可以让用户在onFulfilled函数中给stream挂载一个handler。这里可以看出callback和emitter其实是同步的。当callback或者listener被调用的时候,request和用户做一个约定,你在这个函数里要做什么(把所有的listeners挂载到对象上),然后我继续做什么(处理下一条消息,发出数据或者错误);这相当于接口协议对命令的约定。我们可以称之为同步顺序组合,这是程序语义意义上的。对应的异步版本呢?如果我们不假设运行时环境的Promise实现呢?它应该与同步版本具有相同的语义,对吗?回过头来看问题,如果streamemiterror没有导致系统崩溃,用户在onFulfilled中拿到{stream}对象时看到的是什么?发生错误后结束的流。这个可能有点难用,需要判断,但我还是觉得不是什么大问题。更进一步,如果是另外一种情况呢?服务器在一个块中发送了3条消息;status200withstreamdataabort这时候用户看到的还是erroredstream,数据去哪了?你还能说异步顺序组合的语义和同步的一致吗?不能,对吧?synchronized版本对数据进行了处理,很可能会影响结果。理想情况下,顺序组合的语义(执行结果),无论是同步还是异步,都应该是一致的。那么我们就来看看如何实现一个与PromiseA+的实现无关的方法来保证异步和同步行为一致。如果你愿意用“通信”来理解计算,这个问题的答案很容易思考:假设这个异步处理程序位于半人马座阿尔法星上,我们唯一能做的就是老老实实地按照事件的顺序和将它们发送给它,不能出现故障,就像我们收到它们一样。但是当我们将传入的消息翻译成流时,我们无法保证顺序,包括:中止消息被抢占/乱序数据消息丢失。这是问题的根本原因。当我们异步处理一个消息序列时,前面的write和break的实现保护了序列和内容的完整性。从数学思维上来说,我们说Promise增加了一个callback/EventEmitter没有的属性,deferredevaluation,这是编程中少见的时态属性;当然,这并不奇怪,因为这就是Promise的目的。同时,Promise->Value还有一个属性,它可以被不同的用户多次访问,维护了Value的属性。这并不奇怪。只是Stream作为一个可以无限大的值,在实践中不可能把所有的值都缓存起来,把它当成一个整体值,所以可以无限提取的“值”属性就消失了。但这并不意味着流作为一个对象,它的行为,不能等到它被构造并用于开始处理消息。一种方法是编写一个具有这种能力的流;stream.Readable有一个flow属性,必须通过readable.resume启动,这是一个触发方法;另一种方式有点tricky,可以拦截response.stream的getter,在Triggers中第一次访问缓冲消息的异步处理。这种方式不需要依赖PromiseA+的实现;但它不是100%异步顺序组合,因为流的处理程序必须是同步的。完全异步可以参考Dart使用await消费流的方式。它的逻辑可以这样理解:把所有的Events,不管来自哪里,包括错误,都写成一个stream,然后用await来消费这个stream;但实际上,当await返回时,它仍然面临着一个状态机。优点是Throw威力大;进程等待方便,即在处理流输出的对象时可以有await语句,相当于在取下一个流输出的对象之前的一种阻塞;但是这种阻塞需要谨慎,它是反并发的;summary:Node的Callback和EventEmitter结合时,handler/listener是同步的;Promise反过来确保每个处理程序/侦听器都是异步组合,这是两者之间的根本区别。在顺序组合函数(或过程代数意义上的过程)中,同步组合是紧密耦合的;体现在函数中一旦某个同步逻辑因为任何原因需要修改为异步,就需要开战。拿内存,后来变成了读文件。如果程序自然写成异步组合,类似的改动不会对实现逻辑产生太大的影响;但是细粒度的异步组合有巨大的性能损失,这与现代处理器和编译器的设计和实现有关。理想的情况应该是开发者只表达“顺序”,不表达是同步实现还是异步实现;如上所见,其实同步实现也有对应的异步实现,区别仅在于执行效率和内存使用上(buffer有更多的内存开销,而同步处理其实更多的是“读完即烧”);但是我们使用的命令式语言不是这样的,它是强迫你表达命令;而另一种所谓的future其实就是狗屎语言,反过来强制你不能表达命令。都是神经病。学术界不会真正了解行业的实际问题。