当前位置: 首页 > 后端技术 > Java

kafka原理解析(四)-sender线程收发消息

时间:2023-04-01 14:57:13 Java

1总体流程(1)sender线程整体是一个while循环,不断轮询各个partition中是否有合适的batch数据发送-批。(2)如果有批量数据要发送,查看是否有元数据(如果没有元数据,必须标记,元数据只能稍后发送),只有有元数据才能发送知道将它发送到哪台特定机器。了解机器,还要建立连接。(3)聚合并分组要发送到统一机器的批次。聚合组封装成一个请求,方便一起发送,节省资源。(4)根据Kafka的协议规范对发送过来的数据进行二进制转换。(5)通过for循环,使用kafka的networkclient将每个request放入自己的inFlightRequests列表中。注意,它只是进入了发送队列,还没有被发送出去。(6)通过networkclient的网络读写事件,使用java的nio真正的收发数据。2收集可以发送的批次。对比上面accumulator组件的ready方法。该组件作为producer发送数据的缓冲区,按照topic-patition-batch队列的数据结构存储,遍历得到队列的第一个batch,然后判断各种条件:(1)如果批量是重试,没有达到重试间隔(默认100ms),则不会先发送。(2)只有一批,而且已经满了,可以发。(3)有多个批次,发送第一个。(多个是因为前面的都满了,所以会申请更多的内存)(4)没有满,但是达到了每批最大等待时间(配置为linger.ms),必须发送。如果linger.ms配置为0,那么只要这里有batch,就会发送。(5)内存不足、强制flush发送等场景需要立即发送。符合发送条件的批次将被收集。3连接建立过程元数据拉取过程这里单独介绍。下面是连接建立过程。是networkClient的ready方法。它主要是通过一些连接状态来观察连接的建立,并通过Kafka自身封装的selectable组件来发起连接。连接,注意连接是非阻塞的,keepalive,关闭nable算法(目的是加快发送速度)。connection在selectable中统一控制,组件关系为:NetworkClient持有Selectable,Selectable持有各机器的connection。4核心:NetworkClient的poll(1)networkClient的poll方法其实就是selectable的poll方法。上面说了,selectable持有每一个连接,所以也有一个selectKey,所以使用nio方法来处理各种网络事件,典型的是连接建立,读事件,写事件。核心组件是:inFlightRequests:正在发送或等待响应的消息。sender线程如上图,可以发送的batch调用了send方法。它没有被发送到那里,而是被放入这个队列中。socketSendBuffer,socketReveiceBuffer:发送/接收缓冲区(2)可选的nio的几个重要成员变量:achannels,nioselector:存储通道和处理nio的组件,封装在这个网络组件中,非常适合。bListconnected:已连接的节点实例列表ccompletedReveices:已接受的响应dcompletedSends:已发出的请求estagedReceive:临时接收的光球(3)selectable将轮询所有selectkeys,获取key跟事件,然后处理三个事件(4)ON_CONNECT连接事件,上面说了,发起的连接是非阻塞的,不管连接成功与否,直接往下走,这里需要调用finishConnect,这才是真正的阻塞等待连接建立,节省时间,这个思路值得学习。(5)OP_READ连接建立后,默认添加read事件,通过自定义的kafkachannel读取数据。response通过NetworkReceive抽象进行封装,Kafka的消息格式也是普通的消息长度(固定4字节)+消息内容。然后kafkachannel会有一个固定的4字节的buffer来读取长度,然后根据消息长度申请一个相应大小的contentbuffer来读取下一步的内容。采用这种方式,可以解决解包问题:消息长度为4字节的缓冲区未满,即解包完成。这次我们下次继续读取,只有缓冲区满了才考虑获取长度;那么如果contentbuffer还没有被读取,就说明它已经被解包了。同样,下次继续读,直到读满为止。读取到的数据会被放入stageReceives队列中,里面存放着channel和receive的对应关系,后面的方法会在receive之后立即添加到completedReceives中。请注意,这里一次只会将一个接收放入completeReceives。原因是如果一次读出多个receiveresponse,希望后面的逻辑能一个一个处理。(6)OP_WRITEop_write事件封装发送对象。这个send对象就是最外层send线程添加到networkclient的分组请求,然后kafkachannel使用nio发送。如果解包没有完成,则more一次发送,发送完后,去掉通道的write事件,然后将发送的请求加入selector的completeSends队列。所以总结一下,selectable的poll过程其实就是使用nio来处理对应的事件,然后将事件结果写入到selectable的多个队列中,这样networkclient在不同的场景下可以直接读取队列获取数据。这种封装解耦的思想值得学习。5networkclient的后续处理还是参考上图。其实后续就是调用返回的receive,调用成功或者失败的回调函数。(1)handleCompletedSend其实是立即返回ack=0的情况,因为不需要等待网络请求,只要累加器保存下来,基本可以认为发送成功,然后封装一个response和把它放在一个列表中。(2)handleCompletedReceive,这里将收集到的receive封装成response,放入list中(3)handleTimeoutRequests,标记超时请求的节点状态,下次再拉取metadata(4)最后遍历所有response并调用producer响应成功,正式发送完成。