当前位置: 首页 > 后端技术 > PHP

Netty的核心概念ChannelHandler&Pipeline&ChannelHandlerContext

时间:2023-03-30 02:17:15 PHP

主流程作为Producer,核心就是梳理两个东西:Sender和RecordAccumulatorSender:是kafka发送流程的主要服务,负责接收数据,放入RecordAccumulator,或者从RecordAccumulator中取出数据发送给Kafka服务器,或者负责更新一些元服务等。RecordAccumulator:Kafka整个发送过程是异步的,主要目的是对一些数据进行批处理以提高吞吐量,而RecordAccumulator是负责管理数据缓存的主要对象,作为上面Sender单循环体的核心流程如图所示,我们可以按照图中的流程从上到下拆解每一步的细节。如何在Sender#sendProducerData中判断获取上述过程中可以发送的kafka节点首先,在RecordAccumulator内部,数据以Map的形式进行缓存:TopicPartition显然指的是topic-partionProducerBatch,需要同批发送的Record请求,ProducerBatch本身不是线程安全的,实际运行时会锁定在Deque粒度。在ProducerBatch中,实际的recrod是以MemoryRecordsBuilder的形式维护的。同时,ProducerBatch还会包含很多其他的数据,比如一些请求数据回调等,如果后面可以继续讲,这个阶段还是先回到主流程的分析finallongcreatedMs;finalTopicPartitiontopicPartition;finalProduceRequestResultproduceFuture;privatefinalListthunks=newArrayList<>();privatefinalMemoryRecordsBuilderrecordsBuilder;privatefinalAtomicIntegerattempts=newAtomicInteger(0);privatefinalbooleanisSplitBoleanfinalState=复制代码newAtomicReference<>(null);intrecordCount;intmaxRecordSize;privatelonglastAttemptMs;privatelonglastAppendTime;privatelongdrainedMs;privatebooleanretry;privatebooleanreopened;复制代码判断哪些数据准备好核心代码在kafka中RecordAccumulator的ready部分:首先,server需要满足一定的条件:要发送的partion的leader是已知的。如果包含未知的leader,则需要访问kafka服务器查询元数据,但这部分内容会被屏蔽。整体流程,所以它实际上会被做成一个当前正在发送并且还没有被静音的异步partion,也就是没有被设置为阻塞状态。当前分区不处于退避状态。这主要是说当前partion处于触发重试的状态。二是当前part的batch需要满足一定的条件。当前batch距离上次发送时间的等待时间>允许的等待延迟(如果是第一次尝试,使用lingerMs,如果是重试逻辑,使用retryBackoffMs)当前双端队列中是否有满batch,比如队列中原来的个数大于1,或者只有一个元素但大小满足发送条件。当前Producer已经处于关闭状态,整体内存已满:我们已经知道Producer的数据需要缓存一段时间,Producer有一个控制内存的内存池,即BufferPool。如果内存不够用,就会排队等待申请。如果pairofqueues不为空,说明总内存不够,还有正在刷新的线程:这里有点难理解,等有把握再补充。事务完成,(高版本kakfa支持的事务模型,暂不赘述)如何获取待发送的Batch数据主要逻辑总结如下:遍历ConcurrentMap在RecordAccumulator中,对每个TopicPartitionBatch列表尽量获取不超过maxRequestSize的,关闭这些batch,放入要发送的列表中。但是在实现上还是有一些逻辑需要注意的。我们都知道Kafka的broker和Kafka的topic-partion的基本概念。不同的分区可以分配给同一个代理。在kafka的实现中,每个drain进程只会从当前node节点调出一个partion来发送消息。为了避免每次投递都从0开始,导致序列化的大分区饿死,客户端虚拟了一个drainIndex,每次drain都会自增,实际节点从start开始。intstart=drainIndex=drainIndex%parts.size();复制代码但是这里有一点我不是很明白,为什么drainInde是全局的,如果我做的话可能会做nodeId维度,不知道这里考虑的点是什么如果是全局的drainIndex,其实是单个Node的分区太多,远远多于其他Node,导致饥饿?另一个有趣的问题是,当出现一些极端情况时,比如单个batch中只有一条消息,但是这条消息的大小大于请求大小限制,那么它会尝试将这条消息作为单个发送批。为了实现这一点,Kafka客户端只是在待发送列表不为空的情况下检测当前待发送大小+nextbatch之和是否大于请求大小,一部分是RecordAccumulator中缓冲数据的ConcurrentMap中的过期数据,另一部分是Sender中Map中待发送的数据。价值。失败的回调将被调用无效的数据。数据发送和打包完成上述数据过滤后,会将数据打包成一个ProduceRequest发送给客户端发送。Configurationandconstraints梳理完上面的条件,我们来看看是哪些配置控制了上面的一些流程:batch.size:这个指的是每个双端队列的ProducerBatch的大小buffer.memory:这个指的是它是RecordAccumulator的Buffer的大小max.request.size:这里指的是drain过程中发送给每个Node的大小。如果单个消息大于此值,将跳过检测,但会影响打包方式。linger.ms:在非重试场景下,从创建ProducerBatch到排到待发送区域之间,数据驻留在buffer中的时间。retry.backoff.ms:和上面基本一样,不同的是在重试场景下允许在buffer中驻留的最大时间和发送的区域,这个配置其实是为了避免一些极端的场景,比如在重试场景下,可能是服务器端的问题,如果不增加客户端驻留内存的时间,重试次数可能会在很短的时间内耗尽。delivery.timeout.ms:该配置是指从添加到Kafka客户端到客户端处理发送过程所花费的总时间,包括缓冲区中的时间和待发送列表中的时间。request.timeout.ms:这部分时间实际上是指客户端发送请求到收到响应之间的时间。比如我遇到的在线问题:e.kafka.common.errors.TimeoutException:Failedtoallocatememorywithintheconfiguredmaxblockingtime60000ms。从代码来看,卡在了申请内存的阶段,其实是buffer的大小不够。经过对比Producer的配置,发现batch.size设置过大,导致batch.size之后下游的topicpartions数量远超buffer.memory,即buffer只能容纳一部分至多分区的数据,进而导致整个Producer的生产过程。堵塞。最后,如果您觉得这篇文章对您有点帮助,请点个赞。或者可以加入我的开发交流群:1025263163互相学习,我们会有专业的技术解答。如果您觉得这篇文章对您有用,请给我们的开源项目一个小星星:http://github。crmeb.net/u/defu非常感谢!PHP学习手册:https://doc.crmeb.com技术交流论坛:https://q.crmeb.com