不常见,意味着它们在实际场景中的出现率不高,但仍然非常先进实用。让我们请来今天的主角上台:卡夫卡拦截器。什么是拦截器?如果你用过SpringInterceptor或者ApacheFlume,对拦截器这个概念应该不会陌生。基本思想是让应用程序在不修改逻辑的情况下动态地实现一组可插拔的事件处理逻辑。链。可以在主业务运行前后的多个时间点插入相应的“拦截”逻辑。下图展示了SpringMVC拦截器的工作原理:图片来源:https://o7planning.org/en/11229/spring-mvc-interceptors-tutorial拦截器1和拦截器2分别在发送请求之前发送,之后和完成后在三个地方插入相应的处理逻辑。Flume中的拦截器也是如此。它们插入的逻辑可以是修改要发送的消息,创建新消息,甚至丢弃消息。这些功能通过配置拦截器类动态插入到应用程序中,因此可以在不影响主程序逻辑的情况下快速切换不同的拦截器。Kafka拦截器借鉴了这种设计思想。可以在消息处理前后的多个点动态植入不同的处理逻辑,比如消息发送前,消息消费后。Kafka拦截器作为一个非常小众的特性,从0.10.0.0版本推出以来,并没有得到太多的实际应用,我也从未在任何Kafka技术峰会上看到有公司分享其拦截器的成功使用。不过即便如此,把这么好用的东西放到自己的Kafka工具箱里还是值得的。今天我们将让它发挥作用并展示一些非常酷的功能。Kafka拦截器Kafka拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前和消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息之前和提交位移之后编写特定的逻辑。值得一提的是,这两个拦截器都支持chain方式,即可以将一组拦截器链成一个大的拦截器,Kafka会按照添加的顺序执行拦截器逻辑。例如,假设你想在产生一条消息之前做两个“pre-actions”:第一个是给消息添加一个header,封装消息发送的时间,第二个是更新sentmessagenumber字段,那么当你把这两个拦截器串联起来,赋值给Producer后,Producer会依次执行上面的动作,然后发送消息。Kafka拦截器当前的设置方式是通过参数配置来完成的。生产者和消费者两端都有一个相同的参数,名称为interceptor.classes,它指定了一组类的列表,每个类都是一个特定逻辑的拦截器实现类。还是上面的例子,假设第一个拦截器的全类路径是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么在在Producer端指定拦截器:Propertiesprops=newProperties();Listinterceptors=newArrayList<>();interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor");//拦截器1interceptors.添加(“com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor”);//Interceptor2props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);...那么问题来了,AddTimeStampInterceptor和UpdateCounterInterceptor类应该怎么写呢?其实很简单。这两个类和所有自己写的Producer端拦截器实现类都必须继承org.apache.kafka.clients.producer.ProducerInterceptor接口。这个接口是Kafka提供的,里面有两个核心方法。onSend:该方法将在消息发送之前被调用。如果您想在发送前“美化”一条消息,此方法是您唯一的机会。onAcknowledgment:消息提交成功或发送失败后调用该方法。还记得上一期提到的发送回调通知回调吗?onAcknowledgment在调用回调之前被调用。值得注意的是,这个方法和onSend不是在同一个线程中调用的,所以如果在这两个方法中调用一个共享变量对象,一定要保证线程安全。还有一点很重要,这个方法在Producer发送的主路径中,所以最好不要往里面放一些重逻辑,否则你会发现你的ProducerTPS直线下降。同样,指定一个消费者拦截器也是同一个方法,但是具体的实现类必须实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口,这个接口也有两个核心方法。onConsume:在消息返回给Consumer程序之前调用该方法。也就是说,在开始正式处理消息之前,拦截器会先阻塞一个,做一些事情,然后返回给你。onCommit:消费者在提交位移后调用该方法。通常你可以在这个方法中做一些记账的动作,比如记录日志等等。必须注意的是,在指定拦截器类时,必须指定它们的完全限定名,即全限定名。通俗地说,就是加上完整的包名,而不仅仅是一个类名,保证你的Producer程序能够正确加载你的拦截器类。Kafka拦截器典型的使用场景可以用在什么地方?事实上,和很多拦截器一样,Kafka拦截器可以应用于客户端监控、端到端系统性能检测、消息审计等场景。下面我以端到端的系统性能检测和消息审计为例进行介绍。现在Kafka默认提供的监控指标都是针对单个client或者Broker的,你很难从具体的消息维度去追踪集群间的消息流向。同时,如何监控一条消息从生产到最终消费的端到端延时也是很多Kafka用户急需解决的问题。从技术上讲,我们可以在客户端程序中加入这样的统计逻辑,但是对于使用Kafka作为企业级基础设施的公司来说,在应用代码中编写统一的监控逻辑其实是有难度的。事物是非常灵活的,不可能事先确定所有的计算逻辑。另外,在软件工程中不推荐将监控逻辑与主要业务逻辑耦合。现在,通过实现拦截器逻辑和可插拔机制,我们可以快速观察、验证和监控集群之间的客户端性能指标,尤其是从特定的消息级别收集这些数据。这是一个非常典型的Kafka拦截器的使用场景。我们再来看消息审计场景。想象一下,你的公司使用Kafka作为私有云消息引擎平台,为整个公司提供服务,其中必然涉及到多租户和消息审计功能。作为私有云PaaS提供商,你必须能够随时查看每条消息是哪个业务方发布的,哪些业务方又是在什么时候消费的。一个可行的做法是写一个拦截器类,实现相应的消息审计逻辑,然后强制所有访问你的Kafka服务的客户端程序设置拦截器。案例分享下面我用一个具体的案例来说明拦截器的使用。在这种情况下,我们写一个拦截器类来统计消息端到端处理的延迟,非常实用。我建议大家可以直接移植到自己的生产环境中。我曾经给一家公司做过Kafka培训。在培训过程中,那家公司的一个人提出了一个要求。他们的场景很简单。一个企业只有一个生产者和一个消费者。他们想知道从业务消息产生到最终消费的平均总时间。但是,Kafka目前并没有提供这种端到端的延迟。统计数据。了解了拦截器之后,我们现在知道拦截器可以用来满足这个需求。既然要计算总延迟,就必须有一个公共场所来保存它,而且这个公共场所必须是producer和consumer程序都可以访问的。在这个例子中,我们假设数据存储在Redis中。好吧,这个需求显然需要producer拦截器和consumer拦截器的实现。让我们先实现前者:publicclassAvgLatencyProducerInterceptorimplementsProducerInterceptor{privateJedisjedis;//省略Jedis初始化@OverridepublicProducerRecordonSend(ProducerRecordrecord){jedis.incr("totalSentMessage");退货记录;}@OverridepublicvoidonAcknowledgment(RecordMetadatametadata,Exceptionexception){}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Mapconfigs){}上面的重点代码是在发送消息之前更新发送消息的总数。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能会导致发送总数不准确。好在处理思路一致,可以有针对性地调整代码逻辑。下面是消费者端的拦截器实现,代码如下:publicclassAvgLatencyConsumerInterceptorimplementsConsumerInterceptor{privateJedisjedis;//省略Jedis初始化@OverridepublicConsumerRecordsonConsume(ConsumerRecordsrecords){longlantency=0L;对于(ConsumerRecordrecord:records){lantency+=(System.currentTimeMillis()-record.timestamp());}jedis.incrBy("totalLatency",lantency);longtotalLatency=Long.parseLong(jedis.get("totalLatency"));longtotalSentMsgs=Long.parseLong(jedis.get("totalSentMessage"));jedis.set("avgLatency",String.valueOf(totalLatency/totalSentMsgs));退货记录;}@OverridepublicvoidonCommit(Mapoffsets){}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map