当前位置: 首页 > 网络应用技术

卡夫卡的消费者使用和基本原则

时间:2023-03-09 01:38:50 网络应用技术

  Kafka生产商:https://juejin.cn/post/7094178506199793672

  KAFKA消费者基本概念:https://juejin.cn/post/7096419302416318478

  消费者是负责KAFKA群集的消费者消息数据的应用程序。从Kafka 0.9版本,Java版本的消费者SDK版本

  Kafka正式支持语言SDK,还有更多SDK,由第三方社区维护。如果您需要使用相应语言的SDK,则需要下载其他。

  第三-Party库信息地址:https://docs.confluent.io/platform/currents/index.html

  与消费者的消费消息的完整守则如下:如下:

  要创建消费者实例需要以下步骤:

  使用先前的属性实例来构造kafkaconsume对象

  先前创建的对象可以通过构造函数传递给KafkaconSumer类,以转移到参数中。

  致电kafkaconsume.subscribe方法订阅主题

  一个消费者可以订阅多个主题,应该指出的是,它将执行多种订阅方法,这仅基于上次,即涵盖,而对主题的语法订阅如下:

  您也可以使用手动订阅主题和相应的分区,但不建议使用此方法。

  消费者的订阅延迟有效。订阅信息仅在下一次民意调查开始生效。如果您在民意调查前打印订阅信息,您会发现它是空的,因为它没有生效。

  消费者还可以以正则表达式的形式配置主题订阅,即动态订阅。当系统中发生规则条件时,它将一起阅读:

  订阅正则表达式必须指定ConsumerReerBalancelistener。该类是一个回调接口,用于编写Conleuser分区分配方案更改的逻辑。

  如果用户配置自动位移(enable.auto.commit = true),则不需要类,并将其直接设置为:

  但是,如果是手动提交的,至少您必须在“分区分配方案”中更改Onpartitionsrevoked方法时处理位移提交。

  骑自行车致电kafkaconsume.poll方法读取消息

  民意测验方法使用类似于Linux的Selecti/O机制。所有相关事件(重新平衡,获取消息)都发生在事件循环中,

  通用事件循环消息的写作如下:

  持续时间(1)参数还传达了上述持续时间(1)参数。此参数表示超时为1秒。

  在正常情况下,Consuser获得足够的数据以立即返回,但是如果数据不够,消费者将处于阻塞状态。为了防止阻塞时间,上述参数曲折块1秒钟,然后立即返回。

  处理获得的消息对象consuuumerrecord

  使用轮询方法获取消息收集后,您需要相应地处理消息。应当指出的是,从Kafka消费者的角度来看,在返回民意调查方法之后,即使消费成功,它也是成功的。

  但是,从业务角度来看,收到消息后需要处理一系列消息。处理完成后,它被认为是成功的消费。当业务处理逻辑相对较重时,应使用新线程来处理消息,以避免自行车的沉重业务逻辑导致新闻缓慢消费,

  如果轮询的参数配置未正确配置并且消耗速度很慢,则应适当调整轮询的参数,例如超时。

  关闭kafkaconsumer

  消费者程序结束后,应执行关闭操作以释放操作占用的系统资源,例如线程,内存插座等。遵循关闭方法:

  以上是使用Java语言进行消费,但是KAFKA程序本身还提供了一个控制台脚本来验证和调试消费者。脚本名称是

  在KAFKA安装目录(在bin/windows下的Windows)中的bin目录中,脚本启动参数很常见:

  使用以下方式使用演示:

  enable.auto.commit

  此参数用于指定消费者是否自动提交位移。如果将其设置为True,则消费者将自动在后台提交位移,否则用户需要手动提交位移。对于丢失消息,可以将其设置为false,用于手动手动提交。

  fetch.max.bytes

  它用于指定最大字节数以获取消费者终端的数据。如果信息在实际业务方案中非常好,则需要调整参数,否则消费者将无法消耗这些消息。

  max.poll.records

  用于指定每次返回的消息的最大数量,您可以根据实际情况调整参数的值。

  heartbeat.interval.ms

  该参数用于通知消费者组中的成员以进行新的重新平衡。假设该小组协调员决定开始新的一轮刷新,则

  它将以Rebalance_in_progress的形式以Conunuser的心跳请求的形式将决定以简历的形式放在简历中。收到简历类型后,消费者知道他需要重新加入该小组。

  该值的值越小,需要重新平衡时需要重新刷牙,并且该值必须小于session.time.ms。,协调员会认为他失败了,因此无需通知他。

  connections.max.idle.ms

  此参数用于定期指定卡夫卡的自由套接字的长度。默认值为9分钟。此参数可能会导致消费者在要求下一个请求减少-1时申请套接字资源。

  消费者需要为订阅的分区(即在哪里处理)节省消费进度,并定期将当前的消费位置提交给Kafka经纪人以保持持久性。该消费位置称为位移。

  位移还表示下一条消费者消息的位置。假设消费者已经阅读了分区中的NI,则应提交位移值N,因为位移从0开始,位移的位置为n+1 The News,下次要消耗的新闻。

  提交时间的时间是确保消息传递语义语义的基石。有三个常见的消息语义:共有三个:常见消息语义:

  消费者在消费业务处理之前提交流离失所,这可以实现第一语义,因为即使它崩溃,它也会在恢复后消耗。

  相反,如果通过业务流程移动提交的位置,则可以实现第二种语义。因为在正常情况下,不可能确保业务处理和流离失所的提交符合原子质,因此只能保证它不会丢失,但不能保证在没有提交后不提交业务处理提交疾病导致恢复后重复治疗,

  卡夫卡(Kafka)自0.11版以来一直支持交易,并且通过事务可以实现第三种语义。

  除偏移外,还有一些与消费者有关的位置信息,如下:

  之所以要有水位,是因为Kafka希望确保消息成功地由所有副本编写,然后允许处理消息。

  消费者将在代理列表中选择一个经纪人作为消费者组的协调员,该协调员用于实施诸如组成员管理,消费分配方案和流离失所等功能。

  当消费者组首次启动时,由于没有排量信息,因此反映了auto.offset.reset的作用。通常,它要么是从最新的地方读到的,要么是从最早的独特之处开始的。

  当消费者运行一段时间时,它必须提交自己的位移值。如果消费者崩溃或关闭,则负责的分区将分配给其他消费者。因此,会出现反复的消费,

  消费者提交的位移的主要机制是实施将请求提交其归属人位移的请求。每个提交将向与__consume_offsets相对应的分区添加消息。

  默认情况下,消费会自动提交它,并且自动提交间隔为5秒。参数参数可以控制自动提交之间的间隔。

  通常建议手动提交消息,因为尽管不需要处理自动提交,但消息可能会丢失。在构造Kafkaconsume时,提交配置参数将更改为手册。

  并在代码中使用或提交。业务处理完成后,提交可以确保消息不会丢失,但不能保证消费。

  这两种方法同时提交,这将阻止用户线程继续运行。

  提交方法还提供了带有参数的重载方法,该方法可以在提交中提供更多良好的控制。如下所示,每个过程都被处理以获取记录:

  重新平衡是一组协议,它定义了消费者组中的所有消费者如何在主题中访问所有分区。

  例如,总共有一百个砖块和5名工人,因此如何移动这五名工人达成协议,并且不能同时同时移动砖块,这将造成不必要的资源浪费。

  例如,一个名为A的主题有100个分区。现在有一个小组可以订阅一个主题,该主题中有5个消费。默认情况下,此分发过程称为重新启发,

  当消费者成功执行重新平衡时,应用于主题的每个分区只能将其分配给组中的消费者实例。

  KAFKA具有构建的组坐标协议(组协调器协议)。

  协调员负责管理小组的状态。主要责任是,当小组中有新成员时,该组将为小组进行,即协调主题分区订阅以重新分配。

  小组重新平衡的触发条件是以下3:

  常用的触发条件是第一种类型,即消费者崩溃。这里的崩溃不一定是指消费者流程悬挂或消费者流程下降的机器。

  相反,当消费者无法在指定的时间内完成该消息时,协调员将相信消费者已经崩溃,这将触发新一轮的恢复。

  在实际的业务处理中,有必要避免在民意测验的主线程中执行较重的逻辑处理,这将导致过度处理时间并由协调员考虑

  频繁的重新刷量将大大减少消费者的吞吐量。在生产环境中,需要与业务结合使用一些消费者的参数:

  ,避免不必要的重新平衡。

  分区分配策略确定主题方法分配给消费者以订阅消费者。

  消费者默认情况下有三种类型的分销策略:

  round-robin

  循环策略将所有主题的分区顺序列出,然后分配给每个章节中的每个消费者。

  假设我们有一个名为T1的主题,其中包含10个分区,然后我们有两个消费者(C1,C2)来消耗这10个分区中的数据。

  然后,对C1的最终分配为:0、2、4、6、8

  对C2的分配是:1、3、5、7、9

  黏

  粘性策略是粘度策略。根据先前的分配方案,将重新平衡分配给每个消费者。

  假设我们有两个名为T1和T2的主题,每个主题的每个主题,然后我们有三个消费者(C1,C2,C3)来消耗这6个分区的数据。

  当前的分配如下

  C1:T1-0,T2-0

  C2:T1-1,T2-1

  C3:T1-2,T2-2

  突然,C2崩溃了,然后重新平衡,重新分配了:

  C1:T1-0,T1-1,T2-0

  C3:T1-2,T2-1,T2-2

  也就是说,在维持统一分布的情况下,最初属于消费者的分区也被分配给消费者。

  Kafka消费者的默认分配策略是范围。如果小组中所有消费者订阅的所有主题都是相同的,那么使用旋转策略将更加均匀地分发。

  通过设置ALL -IN -IN分配策略,除了KAFKA的分配策略之外,用户还可以自定义分配器(分配器)。

  消费者群体可以反复执行重新平衡。一代人的引入是为了保护消费者群体的官方抵销。

  一代代表重新平衡的分裂。起初是0。进行重新平衡时,值将增加。假设以前的消费者成员由于某些原因延迟了偏移,则偏移被推迟了。

  当提交以抵销时,携带了旧一代信息,因此消费者群体拒绝了提交信息。很多时候,消费者投掷illegal_generation例外是原因。

  重新刷新本质上是一组协议。组和协调员使用这组协议来完成组的协调员。协议如下:

  在重新平衡过程中,协调员主要处理消费者的Jayingroup和Syncroup请求,以在消费者积极离开小组时向协调员发送离开组请求。

  恢复成功之后,该小组的所有消费者都会定期将心跳请求发送给协调员。每个消费者确定rebrance_in_process是否包含在心跳请求的响应中。

  消费者组必须在执行重新平衡之前确定协调员所在的经纪人,并创建与经纪人通信的套接字连接。

  确定协调员的算法与办公室提交给__consumer_offsets目标分区相同,如下:

  成功连接协调器后,您可以执行斥责操作。重新平衡主要分为两个步骤:

  消费者默认情况下将排量提交给__consumer_offsets。实际上,KAFKA还支持用户将位移提交到外部存储中,例如数据库,例如数据库,例如数据库,例如数据库。

  如果要实现此功能,则用户必须使用Rebalance侦听器。使用Rebarance侦听器的前提是用户使用消费者组。

  rebalarance侦听器主要是接口回调类。有两种方法需要实现。前者在新一轮重新平衡之前的呼叫,后者在重大后的呼叫完成后。

  重新平衡听众最常见的用法是手动将位移提交给第三方存储库,并在重大之前和之后执行一些审计操作。演示如下:

  一般思维:

  如果启用了启动 - UP位移,则用户可以在监视器中手动提交唯一手册。每次消费者重新平衡时,都会检查用户是否启用自动位移。

  消费者要求重新平衡在短时间内完成它,因此请勿将长期执行时间的逻辑重新平衡,尤其是某些阻塞方法。

  解码序列化和生产者发件人的序列化是反向操作,另一方序列化后的字节数组将恢复为原始外观。

  与生产者的序列化相呼应,常用的求职者如下:

  如果用户的序列化需求更为复杂,则可以自行定义求职者。

  构造消费者对象时,指定相应的序列化值以使用序列化:

  首先定义一个类以实现Deserializer界面:

  指定刚刚创建的类的消费者解码序列化的值:

  结合以前的生产者计划以获得消费结果:

  与kafkapropucer不同,kafkaconsume是非线程安全性。因此,在练习过程中,建议使用Kafkaproducer单个实例进行多线程使用。

  Kafkaconsumer的非线程安全有两种实用方法:

  由于KafkaconSumer实例是非线程安全性的,因此创建每个线程时,它可以创建一个仅利用自己来避免问题的kafkaconsume。

  样本设计设计,首先定义三类:

  大量线程和相应的KafkaconSumer创造了更多资源来占用大量资源,因此您可以选择带有消息的处理逻辑的消息获取消息以将消息删除,以使一个或多个消费者实例在全球情况下获得,然后使用消息获取消息。只是处理逻辑,然后将其放入单独的工作线程中。

  样本设计设计,首先定义三类:

  两种方式是有益的和缺点。用户可以根据实际业务方案做出相应的选择:

  方法1(每个线程保持独特的kafkaconsumer)很简单;无线时间表之间的交互式开销,快速速度;方便的位移管理;易于维护分区中信息消耗的顺序,插座数量很大;性别差;由于插座连接很多,因此会有更多的请求,因此代理终端负载相对较高。重新平衡2(全球消费者 +多工人线程)的可能性的增加消息采集和处理与工人人数解耦,良好的射精需要实现;分区间隔中的消息消耗顺序难以维护;处理链路变长,位移管理很困难。异常的工人线程可能会导致消费者数据失去消费者组,因为使用该组最合适。

  如果用户需要严格控制消费者固定消耗和某些分区,则场景如下:

  在这种情况下,食用组不适用。有必要应用独立消费者(独立消费者)和独立消费者独立工作。任何消费者崩溃都不会影响其他消费者。

  当使用消费者群以消息消费消息时,我们使用直接订阅主题,独立消费者用来消费,

  如果发生多次呼叫,则仅上次生效,并且将涵盖前一个电话。同时,分配和订阅不能混合在同一消费者中。

  接收分区列表的方法,该列表直接使消费者访问这些分区的功能。代码如下:

  简而言之,不建议使用旧版本。请在生产环境中使用新版本的SDK。

  原始:https://juejin.cn/post/7098920723602898958