当前位置: 首页 > 网络应用技术

带您知道三个kafka消息发送模式

时间:2023-03-08 13:41:30 网络应用技术

  摘要:在KAFKA-0.8.2之后,生产者不再区分同步(同步)和异步方法(异步)。所有请求均发送异步异步方法,从而提高客户端效率。

  在KAFKA-0.8.2之后,生产者不再区分同步(同步)和异步方法(异步)。所有请求都是异步发送的,可以提高客户端效率。产品请求将返回响应对象,包括偏移或错误字母。此异步发送消息发送给Kafka Broker节点可以减少服务器的费用 - 侧面资源。新生产者。所有服务器网络通信都是异步的。当ACK = -1模式需要等待所有复制副本完成副本时,它可以大大减少等待时间。

  1. bootstrap.servers:此属性指定经纪人的地址列表,地址的格式是主机:po.无需包含列表中的所有经纪人地址,生产者将在给定的经纪人中找到其他经纪人由制片人。但是,建议至少提供两个制动器信息。一旦和其中一个,生产者仍然可以连接到群集。

  2. key.serializer:经纪人的密钥和所需消息的值是字节数组。生产者界面允许参数化类型,因此可以将Java对象作为密钥和值发送给经纪人的键。生产商需要知道如何将这些Java对象转换为字节数组。序列化器必须设置为实现org.apache.kafkka.common.serialization.stringserialializer的类。生产者将使用此类将序列排序到字节阵列中。KAFKA客户端默认情况下提供bytearrayserializer(这只是很小),stringserializer和integeserialializer。因此,如果您仅使用几种常见的Java对象类型,则无需实现自己的序列化器。注意,即使您打算仅发送值内容,也必须设置keerialializer。

  3. value.serializer:像key.serializer一样,value.serializer将序列化值。如果键和值是字符串,则可以使用序列化器与键相同。Serialializer.Serializer.semerializer.如果键是整数类型值是字符串,您需要使用不同的序列化器。

  1.火和尸体

  仅发送消息,不关心该消息是否成功。这也是一种异步发送方式。该消息首先存储在缓冲区中,并在批处理中满足设置条件。当然,这是kafka吞吐量的最高方法,并且使用参数acks = 0,因此生产者无需等待服务器的服务器响应,并将消息发送到网络可以支持的最大速度。但这也是新闻的最不可靠的方式,因为未处理发送失败的消息的新闻。

  在发送消息之前,诸如序列化exception之类的异常情况,例如序列化消息的失败,缓冲区区域中的BuffexHaustedException,TimeOutOutEfexception或发送超时的中断中断的中断,发送消息后没有异常。

  2.同步发送

  同步发送,send()方法返回futrue对象。通过调用futrue对象的get()方法,等待直到返回结果。根据返回的结果,您可以确定发送的发送是否成功。当发送失败时,申报表的发送。消息成功发送后,立即发送冲洗以控制消息顺序。

  调用send()方法后,调用get()方法并等待结果返回。如果发送失败会引发异常,如果发送成功,它将返回recordMetadata对象,然后您可以调用offset()方法以在当前分区中获得消息的偏移。

  Kafkaproducer中有两种异常。第一个是可以检索的。这种类型的异常可以通过重新发送消息来解决。超过了重新测试的数量。第二类是非审判的。例如,“消息大小也是laarge”(太多新闻),这种异常将立即返回错误。

  3.异步发送

  异步,在调用send()方法时指定回调函数。当经纪人收到退货时,将触发回调函数。如果业务需要知道消息是否已成功发送而不是关心消息的顺序,则可以以异步+回调方法发送消息,并与参数重试= 0,并记录发送到日志文件的失败;使用回调使用callbackfor函数,首先实现org.apache.kafka.clients.producer.callback接口,该接口仅具有一个onComplating方法。如果它发送了一个例外,则oncomplting参数异常e将是非empperty的。

  在发送异步发送时,当设置条件触发缓冲区池塘消息发送时,KAFKA将首先将消息存储在缓冲池中。

  (1)消息缓存到达批处理。

  (2)Linger.ms与上一条消息发送时间分开;

  (3)调用flush()方法,该方法将立即触发发送并将其阻止到当前缓冲区。

  (4)致电Close()触发发送并在完成后将其关闭。

  4.1 buffer.Memory

  此配置设置生产器可用于缓冲等待发送到经纪人消息的总内存字节,并且默认值为33554432 = 32MB。如果将消息发送到高速缓存区域的速度比发送给经纪人的速度更快,则生产者将被阻止(根据max.block.ms配置的时间,默认值为60000ms = 1分钟.. buffer.full配置),此后会引发例外。

  4.2 compression.type

  生产者使用的所有数据的默认值都不是(即没有压缩),而有效值无,GZIP,Snappy或LZ4。它可以减少CPU的使用并确保良好的性能,同时提供更好的压缩比。因此,建议同时使用性能和带宽。GZIP压缩技术通常使用更多的CPU和时间,但会产生更好的压缩比。因此,建议在网络带宽更加限制时使用它。通过启用压缩功能,它可以减少网络利用率和存储空间,这通常是向Kafka发送消息的瓶颈。

  4.3重试

  默认值为0。当将其设置为大于零的值时,客户端将发送任何消息以发送任何失败。注意,当客户端收到错误时,它与发送消息没有什么不同。。但是第二批成功以及第一批消息将再次发送,第二批新闻将首先写。注意,此参数可能会更改消息的顺序。

  4.4批次尺寸

  当多个消息发送到同一分区时,生产者将一起处理它们。此配置设置用于每批使用的内存字节数,默认值为16384 = 16KB。使用内存后,生产者将发送发送者当前批处理中的所有新闻。但是,这并不意味着生产者将始终等待内存完全使用。根据Linger.ms配置的时间,也将触发该消息。设置较小的值将增加发送频率,这可能会减少吞吐量。设置大价值将使用更多的内存。设置为0将关闭批处理处理功能。

  4.5 Linger.ms

  此配置设置为等待新消息,然后再发送当前批处理消息,而默认值为0.Kafkaproducer将在当前批处理中使用的内存已满或等待时间到达Linger.ms配置时间时发送消息。当Linger.ms> 0时,延迟将增加,但是由于消息发送的频率将减少,因此会增加吞吐量。

  4.6客户端

  用于识别消息发送消息的客户端通常用于日志和性能指标和配额。

  4.7 max.in.flight.requests.per.connection

  此配置设置可以在单个连接上发送的最大数量不令人满意的请求。默认值为5,这将导致该数字上的阻塞。设置大价值可以增加吞吐量,但增加了记忆使用的使用。但是,应该注意的是,当设置值大于1并且发送失败时,如果启用了重试配置,则可以更改消息的顺序。即使设置为1,即使再次发送消息,发送顺序与写作顺序一致。

  4.8 request.timeout.ms

  此配置为请求响应的最长时间设置客户端,默认值为30000ms = 30秒。如果此时未收到响应,客户端将重新发出请求。此配置应大于replica.lag.lag.time.max.ms(代理配置,默认为10秒),以减少重复重复该重复的可能性。由于对生产者的不必要审查,消息。

  4.9 max.block.ms

  当发送缓冲区已满或元数据不可用时,生产者将阻止()和partitionsfor()方法,并且默认阻塞时间为60000ms = 1分钟。使用用户定义的串行器和分区者引起的障碍物目前不会会有会计。

  4.10 max.request.size

  生产者可以在单个请求中发送的最大字节数,默认值为1048576字节= 1MB。接收消息的限制。使用的配置为message.max.bytes = 1000012字节(奇怪的数字,约为1MB)。

  4.11 receed.buffer.bytes and send.buffer.bytes

  RECEDE.BUFFER.BYTES:读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小,默认值为32768字节= 32KB.IF设置为-1,将使用操作系统的默认值。Send.Buffer.buffer.buffer.buffer.bytes:发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小,默认值为131072字节= 128KB.IF设置为-1,将使用操作系统的默认值。

  本文分享了华为云社区的诚意,作者:dayu_dls。