当前位置: 首页 > 科技观察

Kafka封装-方法参数解析器,用起来真的爽!

时间:2023-03-22 10:34:15 科技观察

嗨,我叫詹森。一个想和大家一起打怪升级的程序员朋友。我们在写Kafka消费者的时候,有没有发现一件很麻烦的事情:在消费消息之前,我们每次都要手动解析Kafka消息,转换成我们想要的类型,然后进行业务操作,比如:/***order支付成功通知*@Author公众号:架构师实践记录*/@KafkaListener(topics="oms.orderPaySuccess",groupId="fms")publicvoidorderPaySuccess(ConsumerRecordconsumerRecord){//解析KafkaMessageOrderPaySuccessEvent事件=JSON.parseObject(consumerRecord.value(),OrderPaySuccessEvent.class);//TODO解析成功后的业务操作}执着于代码整洁度的同志们就比较难受了。每一个解析操作都差不多,但这不是业务代码……于是你想:有没有办法让系统自动解析成你想要的参数对象呢?但是也发现了一些比较难解决的问题,比如共享一个groupId,使用Java线程池进行管理,所以在项目中会存在性能瓶颈。专业的事交给专业的“人”。使用Kafka组件本身来管理不同组的消费会更可靠。Spring官方也发现了这个问题,并提出了解决方案——spring-messaging包下的HandlerMethodArgumentResolver接口。我们从官方文档中看到,Spring-kafka在2.4.2版本支持kafka消息的方法参数级转换。其实spring-web包下还有一个同名接口,用于自动解析Controller方法上的参数。这块资料网上资料很多,就不详细展开了,不过spring-messaging包下这个接口的非官方资料比较少。在这里给大家总结一下用法。HandlerMethodArgumentResolver方法参数处理器接口很简单,只有两个方法:publicinterfaceHandlerMethodArgumentResolver{booleansupportsParameter(MethodParametervar1);@NullableObjectresolveArgument(MethodParametervar1,Messagevar2)抛出异常;对于解析,resolveArgument方法是具体的解析逻辑,将MQ的传递参数转换为具体的类型参数。让我们看一个真实的案例。首先实现HandlerMethodArgumentResolver接口,它被定义为一个SpringComponent:@ComponentpublicclassKafkaListenerMethodArgumentResolverimplementsHandlerMethodArgumentResolver{@OverridepublicbooleansupportsParameter(@NonNullMethodParameterparameter){//默认类以com.xxx开头,所以不需要在参数@Payload注解前加上returnparameter.getParameterType().getName().startsWith("com.xxx")||parameter.hasParameterAnnotation(Payload.class);}@OverridepublicObjectresolveArgument(@NonNullMethodParameterparameter,@NonNullMessagemessage){ClassparameterType=parameter.getParameterType();StringmessageContent=(String)message.getPayload();对象主体;try{//在这里定义你自己的解析方法body=JsonUtils.fromJson(messageContent,parameterType);Objects.requireNonNull(body);}catch(Throwablecause){thrownewKafkaException("kafka消息解析失败:无效的JSON字符串",cause);}//可选,定义解析后的参数校验逻辑validate(parameter,body);返回身体;}privatevoidvalidate(MethodParameterparameter,Objecttarget){for(Annotationann:parameter.getParameterAnnotations()){ValidatedvalidatedAnn=AnnotationUtils.getAnnotation(ann,Validated.class);if(Objects.nonNull(validatedAnn)||ann.annotationType().getSimpleName().startsWith("Valid")){ValidationUtils.valid(target);}}}}至此,一个类自动解析了参数,下面看怎么用.orderPaySuccess",groupId="fms")publicvoidorderPaySuccess(@PayloadOrderPaySuccessEventorderPaySuccessEvent){//Kafka消息已经自动解析为orderPaySuccessEvent参数//解析成功后TODO业务操作}怎么样,代码是不是多了比原来简洁,香不香!如果你追求技术,不想一直写业务代码,不妨把项目中需要手动解析参数的地方全部换成自定义的方法参数解析器来实现~