生产者是负责向Kafka群集编写消息数据的应用程序。来自Kafka 0.9版本提供了Java版本的生产商SDK,供用户使用,
Kafka正式支持语言SDK,还有更多SDK,由第三方社区维护。如果您需要使用相应语言的SDK,则需要下载其他。
第三-Party库信息地址:https://docs.confluent.io/platform/currents/index.html
Kafka在内部具有一组二进制通信协议。用户可以根据协议使用任何语言来编程和消息。官员提供的官方SDK和第三方图书馆根据协议实施。在易用性和绩效方面没有其他东西。差异,
因为它是一种自定义二进制格式,所以消息的发送和接收将不需要依靠任何外部序列化框架,这似乎是轻量级和良好的扩展。
生产者比消费者在功能中更简单,因为每个生产商都独立工作,并且它们之间没有相交,并且不会涉及复杂的小组管理操作。
生产者的主要功能是将消息发送到主题的分区。每个主题都有几个分区。因此,生产者需要确定发送消息的位置。此确认过程是分区分区的功能。
默认分区KAFKA生产程序默认情况下提供了一个分区设备。对于每条消息,如果消息存在在密钥中,则分区者将根据密钥的哈希值选择目标分区;如果新闻没有密钥,则分区者使用旋转查询。方法确认目标分区(确保分区中消息的统一性)。
自定义区域用户还可以实现自定义分区策略,而不是使用默认分区。您可以根据自己的业务需要使用不同的分区策略,以将消息发送给适当分区的适当分区
生产商生产商的API允许用户跳过直接指定消息的区域,该区域应发送到哪个分区。
确定目标分区后,产品需要查找与该分区相对应的版权。只有版权的副本才能响应生产者发送的请求。
在产品中发送消息时,经纪人的响应有很多选择。它要求经纪人成功地返回响应,而无需等待成功编写任何副本,或者成功写作后的响应副本以及其他响应的副本。
Java版本的SDK的Java版本是正式提供和最广泛使用的。工作原理过程如下:
生产者的发展步骤如下:
属性(Java.util.properties)用于配置生产者对象以访问代理的参数配置。必须指定三个参数:
序列化方法的值必须是完整的限制类型,否则将丢弃错误。
根据上面提到的三个必要属性,可以创建一个产品对象
它也可以简化为
生成生产后,可以发送消息,因此我们需要构建一个消息对象。
在ProduceRrecord的构造函数中,您可以指定主题,键,价值和值的几个关键参数。
最好不要指定新闻时间。Kafka相对稳定地指定自身,因为时间戳索引文件中的索引项目是根据时间戳订单严格安排的。
消息的时间顺序将是混乱的。当使用时间戳查询位移功能时,将找不到消息。同时,KAFKA的消息保留策略也将受到影响。
发送产品发送消息的方法是发送的,该方法实现了底部的异步发送。Java提供的未来特征意识到同步的两种方式和异步+回调。
也有第三种发送被称为火和忘记的方式,即,将在发送后发送结果。在生产环境中不建议使用此方法,因为无法了解消息是否成功。
在onCompltion回调中,如果异常是空的,则否则元数据为空,并判断该消息是否成功通过IF发送。
通过未来,您一直在等待经纪人将结果归还给产品。当结果得到时,要么返回正常结果,要么丢弃异常。
如果没有异常,则get将返回recordMetadata类型的结果。结果包含已发送的所有元数据信息,即主题,分区,分区位移信息所在的位置等等。
不建议
无论是同步还是异步发送,发送失败都可能存在异常。发送例外分为两类:重试异常和非审判异常。
如果上述异常是在产品程序中配置的,则只要在指定的次数内恢复,该错误将不会在oncomplating之外出现,否则将其封装在异常中。
所有re -检验异常都从org.apache.kafka.comon.errors.retriable Exception抽象继承。从理论上讲,所有不继承此类别的异常属于属性。
上述异常无法通过重试解决,因此它们将被处理到生产者计划。
因为可以在例外替换,并且这两个异常可能需要不同的处理,因此可以对代码进行判断:
创建产品时,它将适用于系统的相关线程,内存和插座。当生产者完成已建立的任务时,应在不使用后及时关闭它。
调用普通非cinport关闭方法,生产者将等待直到发送之前才完成的消息,也就是说
关闭方法可以指定超时时间,即不再等待发送消息并强行退出。此方法将丢失所有未发送或未收到的消息。
ACKS参数用于控制产品之后消息的耐用性策略。
将消息发送给发送给主题分区的领导者经纪人时,制片人将等待写作结果将消息返回给领导者经纪人
当制作人确认当前消息已成功提交时,下一条消息将继续,卡夫卡保证消费者将永远不会阅读尚未提交的新闻。
当领导者经纪人将写作结果返回到问题时,这是一个问题。其响应结果的速度将直接影响产品的吞吐量和消息的持久质量。
收到领导者经纪响应的速度越快,您可以发送下一条消息的速度越快,吞吐量越大,ACKS参数用于控制响应速度。
ACKS参数的值表示,在领导者经纪人响应产品之前,领导者经纪人必须确保写入消息的副本数量。
Acks的值如下:
0最小允许的消息的最小值丢失,您不在乎该消息是否成功发送。默认值为32MB,因为Kafka的新闻是异步的,并且该消息将首先放置在缓冲区中。
如果向缓冲区写消息的速度超过了独家io线程发送消息的速度,则会导致缓冲空间增加。
如果生产者程序需要将消息发送到许多分区,则需要设置参数,以防止缓冲区太小降低生产者的整体吞吐量。
当产品发送消息时,如果消息被压缩,则网络IO的传输开销将大大减少以增加整体吞吐量,但同时,它将增加产品CPU的费用。
如果经纪人端上的压缩参数与生产者不同,那么当经纪人写消息时,消息压缩和消息的压缩的操作也将占据经纪人的CPU资源。
KAFKA支持的压缩算法包括GZIP,Snappy,LZ4,Zstandard等。根据实际经验,Zstandard算法的值最高。默认情况下,compression.type的值无,也就是说,没有压缩消息。
处理写作请求时,KAFKA经纪人可能因瞬时失败(瞬时领导选举或网络抖动)而失败。
因此,最好直接在产品中尝试。检索的值是重试的数量,默认值为0,即不重试。超过0的设置的价值可以很好地处理。
由于瞬时网络了解经纪人方面已成功写入消息,但是结果并未成功返回给生产者,因此生产者将相信发送消息的失败会失败,从而打开重试机制。
为了应对这种风险,Kafka要求消费者执行它以重新处理它。从0.11版本开始,Kafka已开始支持精确的处理语义语义,并使用诸如幂之类的功率来避免在消息审查时重复写作。
生产者将减少多个在内存中发送消息的请求(默认为5)。如果消息是重试的,则可能导致消息流的消息。
生产者SDK提供MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION参数,将值设置为1,也就是说,只有一条消息请求可以同时避免避免序列序列,但是设置将降低性能,并且可以通过设置诸如Power之类的功率来解决问题。
重试提出的两个问题实际上是合理的,最简单的解决方案是打开诸如enable.Idempotence = true处理之类的功率。
实施权力的原则可能如下:
两个关键术语是生产者ID(IE PID)和序列编号。
对于每个PID,产品都会向每个PID发送消息
同样,经纪人终端也将是每个
对于收到的每条消息,如果其序列号大于所维护的经纪人的序列号,则经纪人将接受它,否则将被丢弃,
如果消息序列号小于或等于经纪人维护的序列号,则意味着消息已保存,即重复消息,代理直接丢弃消息,
如果消息号的序列号之间的差异比经纪人的序列号大1,则意味着数据尚未写入中间,也就是说,即混乱的序列,此时,经纪人拒绝新闻,
该消息将在消息失败后检索,也就是说,保证每条消息都将发送给经纪人。当消费者从分区中获取数据时,必须有一个订单。
retry.backff.ms
生产商将有一个间隔,以重试未能防止频繁重试系统资源的失败。
默认值为100毫秒。通常,领导者选举是一个相对常见的瞬时误差。它可以通过计算平均领导者选举时间来合理地设置重试和重试的值。
生产者将将发送到同一分区的消息封装到批处理。当批处理中的消息大小达到批处理限制时,战斗中的消息将分批发送给经纪人。
但是,制片人不会等待新闻达到批量的大小。尺寸,否则,如果您无法达到这笔款项,那就不会等待。
因此,Linger.ms是在大多数控件等待之后发送的参数。
batch.size的值为16384,即16kb。如果值太小,则一次写的消息数将很少,这将减少产品的吞吐量。
如果此值太大,它也会对系统内存施加压力,因为是否可以填充,该产品将为批处理分配固定尺寸的内存。因此,在记忆和繁荣的情况下,该价值被适当增加以获得更高的吞吐量。
对于Linger.ms,默认值为0,即立即发送。在大多数情况下,我们希望该信息将
但是,这种方法将减少生产者的吞吐量。每个请求的信息越来越多,吞吐量就越高。因此,如果要增加吞吐量,则可以适当地调整Linger.ms的值。将其合适。
此参数用于设置消息的最大大小,默认值为1048576字节。如果您需要发送的消息的内容大于值,则需要调整参数,否则会发生异常。
当生产者将消息发送给经纪人时,经纪人需要在指定的时间内返回结果,也就是说,产品将在一定时间内等待结果。
这次是request.timeout.ms,默认情况下为30秒。如果经纪人尚未返回结果,则该产品将在回调函数中抛出超时Exception。
通常足够30秒。如果生产者负载较大,则可以对该值进行适当调整。
在发送消息的过程中,需要确定该消息将其发送到指定主题中的哪个分区,并提供分区策略和相应的分区者,供用户选择。
默认分区将尽力确保将相同的密钥消息发送到同一分区。如果没有指定的密钥,则将其均匀分配给主题的所有分区。
Java的默认分区版本使用MURMUR2算法计算消息密钥的哈希值,然后需要分区的总数才能获得需要发送的分区号。用户可以使用生产者提供的自定义分区策略来确定消息。
自定义分区机制需要做两件事:
假设现有业务需求,有一种专门用于审核功能的消息。当消息的关键是审核时,请将此类消息发送到主题的最后一个分区以促进后续处理,并在主题下使用其他消息。随机策略已发送到其他分区,代码实现如下:
首先创建自定义分区机制:
使用自定义分区类:
创建一个具有总数数量的主题:3:
运行KAFKAPRODUCERAPP主方法以发送消息,然后检查分区中的消息数:
可以看出,有7条消息,只有一条审核,最后一个分区只有一个消息:
其他两个分区是随机发送的。尽管第一个分区中只有一个分区,但我们可以继续运行kafkaproducerapp,然后查看结果:
显然,在再次运行后,消息是由消息随机发送的。最后一个分区增加了1。由于它运行了两次,因此只发送了两次审核的消息,这表明我们的自定义分区策略已生效。
KAFKA支持任何可以是字符串,整数,数组或其他对象类型的消息类型,但是在将数据发送给经纪人之前,生产者需要将数据转换为字节以在网络中传输。
换句话说,无论是哪种类型的消息,都需要在发送将消息转换为字节数组之前,需要序列化器,
接收器需要一个求职者将接收到的字节数组转换为相应的对象。序列化器与求职者之间的关系如下:
默认情况下,Kafka提供了更多的串行序列。常用的序列化器如下:
如果用户有更复杂的序列化需求,则可以自行定义序列化器。
构造产品对象时,请指定使用序列化的相应序列化值:
使用自定义类型,自定义序列化还需要编写相应的实现类。步骤如下:
因为我们发送的值是用户类型,请设置值。Serializerto我们自己编写的用户序列化器,然后发送。
如果序列化器指定错误,例如,下面的弦乐器不能序列化用户对象,这将引发异常。
生产者拦截器(Interceptor)可以在发送消息之前或日志还款逻辑处理逻辑处理之前对消息进行一些一般处理
可以在多个中指定拦截器,并在指定顺序(拦截器链)中形成同一消息中的拦截链
Interceptor的接口为:org.apache.kafkka.clients.producer.producerinterceptor。主要方法如下:
Interceptor不能保证线程安全性,并要求用户确保线程的安全性。多个拦截器以指定的顺序调用。同时,拦截器中捕获的异常将记录在错误日志中,而不是向上传输。
假设我们现在正在创建两个拦截器,一个是添加发送到消息正面的时间戳,另一个是成功的统计数据和失败的数量。
TimessTamppRependerInterceptor:
Countrinterceptor:
主运行类:
操作结果:
消费者听到的消息:
生产者使用异步消息。每次发送时,该消息都会放在批处理缓冲区中,并通过特殊的IO线程提取消息发送。该区域中的新闻丢失了,
其次,当没有性行为,例如转动力量和设置即时重试时,因为max_in_in_flights_requests_per_connection的值为5默认值时,也会有一个混乱的顺序。
对于消息的丢失,似乎避免了同步发送的使用,但这将大大降低性能。因此,我们考虑在异步情况下解决这个问题。
在不打开电源的情况下,可以使用两种情况来避免以上情况的相应配置:
在生产环境中,如果IO资源非常紧密,例如,生产者计划会消耗大量的网络带宽或经纪人方面的磁盘占用率,但是生产者的CPU资源非常丰富,那么您可以考虑开放产品的消息。设置消息压缩以保存CPU资源。
压缩的性能与批处理中消息的大小有关。批量的大小越大,压缩时间越长。如果压缩速度很慢,则意味着系统瓶颈在用户的主线程上,而不是IO发送线程。
在实际使用过程中,有两种使用产品实例的方法:
多线程单个KAFKAPRODUCER实例所有线程共享一个实例以实现简单,更好的性能1.所有线程共享内存缓冲区区域,更多内存2.一旦某个产品线程崩溃导致Kafkaproducer实例被销毁(例如,关闭),然后all生产者线程无法工作多线程多线程多线多线kafkaproducer实例每个线程使用单独的实例1.每个线程都有自己的实例,可以单独控制缓冲区空间和其他配置。2。某个实例崩溃不会不会影响其他生产者线程,随着时间的推移,工作需要大量的内存分配。如果具有错误数量分区数的Kafka群集,则可以使用第一个方法。如果分区的数量很大,则不同的线程具有自己的配置,并且可以使用第二种方法。
建议使用新版本的生产商SDK进行开发。新旧之间的最大区别是::
在上面的示例代码中,许多参数以字符串形式直接写入打印机。