当前位置: 首页 > 后端技术 > Java

平安保险基于SPI机制的RocketMQ定制化应用

时间:2023-04-01 22:51:36 Java

简介:本文讲解平安保险选择RocketMQ的原因,以及在决定使用消息中间件后如何选择消息中间件。作者:孙媛媛|平安寿险资深开发为什么选择RocketMQ首先说说为什么选择RocketMQ。在技??术选型的过程中,首先要考虑清楚应用场景。只有应用场景确定了,技术在选择过程中才能有明确的目标和衡量标准。异步、解耦、削峰填谷等消息中间件的共性特性不再一一介绍。这些特性决定了你的场景是否需要使用消息中间件。这里主要说一下消息中间件的使用,以及如何选择消息中间件。同步双写,保证业务数据安全可靠不丢失。我们在搭建消息中间件平台的时候,定位是把业务数据传递给业务系统。业务数据最重要的一个需求就是不允许数据丢失,所以我们选择RocketMQ第一点是它有同步双写机制,只有数据刷写成功才算发送成功主服务器和从服务器。在同步双写的情况下,MQ的写性能相对于异步刷写的异步赋值肯定会有所下降,相比异步情况会有20%左右的下降。在单主从架构下,1K消息的写入性能依然可以达到8W+的TPS,性能完全可以满足大部分业务场景的需求。此外,减少的业绩可以通过券商的横向扩张来弥补。因此,在同步双写的情况下,性能可以满足要求。业务需求。在多主题应用场景中,表现依然强劲。第二点,业务系统会有很多的使用场景。使用场景广泛带来的问题是会产生大量的话题。因此,有必要衡量消息中间件在多主题场景下的性能。性能是否满足需求。我在测试的时候用1K消息随机写数据到10000个topic,单broker状态下可以达到2W左右的TPS,比kafka强多了。所以在多topic的应用场景下,性能还是很强的,这是我们选择topic的第二个原因。这也是由底层文件存储结构决定的。Kafka、RocketMQ等消息中间件可以实现接近内存的读写能力,主要依赖于文件的顺序读写和内存映射。RocketMQ中所有的topic消息都写在同一个commitLog文件中,而Kafka中的消息是以topic为基本单元组织的,不同的topic之间是相互独立的。在多主题场景下,会产生大量的小文件。大量小文件读写时有一个寻址过程,类似于随机读写,影响整体性能。支持事务性消息、顺序消息、延迟消息、消息消费失败重试等。RocketMQ支持事务性消息、顺序消息、消息消费失败重试、延迟消息等,功能丰富,更适合复杂多变的业务场景.活跃的社区建设,阿里开源系统另外,在选择消息中间件的时候,还要考虑社区的活跃度和源码使用的开发语言。RocketMQ使用Java开发,对Java开发者更友好,无论是阅读源码排查还是在MQ的基础上做二次开发都比较容易社区的大部分同学都是国内的小伙伴,他们是比较接近大家参与RocketMQ开源贡献。在此,我们也希望更多的小伙伴能够参与进来,为国内的开源项目做出更多的贡献。SPI机制介绍及应用介绍完为什么选择RocketMQ,接下来介绍一下我们是如何基于SPI机制来应用RocketMQ的。SPI的全称是(ServiceProviderInterface),是JDK内置的一种服务发现机制。我个人的理解是面向接口编程,给用户留一个扩展点,比如springBoot中的spring.factories,也是SPI机制的一部分。应用。如图所示,是SPI在RocketMQ中的一个应用。我们基于SPI机制的RocketMQ客户端应用也是受到SPI机制在MQ中应用的启发。RocketMQ在实现ACL权限校验时,实现了AccessValidator接口。PlainAccessValidator是MQ中的默认实现。权限验证可能因组织结构不同而有不同的实现方式。通过SPI机制提供接口,为开发者定制开发提供扩展点。当有定制需求时,只需重新实现AccessValidator接口即可,无需与源码开战。下面给大家介绍一下我们的配置文件的一个简单模型。该配置文件中,除了sendMsgService、consumeMsgConcurrently、consumeMsgOrderly这三个配置项外,其余均为RocketMQ原生配置文件。配置项是SPI机制的应用,是为具体实现提供的接口。有的同学可能会有疑惑,SPI的配置文件不应该放在META-INF.service路径下吗?这里简单的和MQ的配置文件放在一起,方便配置文件的管理。前面说了,META-INF.service只是一个默认路径,为了方便管理做相应的修改,并不违背SPI机制的思想。我们再来看看这个配置文件模型。这里的配置项包括使用MQ时需要配置的所有选项。proConfigs支持所有MQ原生配置,从而实现配置与应用实现的解耦。应用端只有你需要关注的具体业务逻辑才能使用。可以通过配置文件指定生产者和消费者的实现以及消费者消费的主题。此外,该配置文件还支持在多个环境中使用多个名称服务器。在更复杂的应用中,支持向多个RocketMQ环境发送消息,消费多套不同环境的消息。消费者提供了两个接口,主要是为了支持RocketMQ的并发消费和顺序消费。下面给大家分享下如何根据这个配置文件来初始化生产者消费者。首先介绍一下我们抽象出来的一个客户端加载的核心流程。在客户端核心进程详图中可以看到,客户端的核心进程被抽象为三个部分,分别是启动期、运行期和终止期。首先加载配置文件就是加载刚刚介绍的配置文件模型。在配置和应用完全解耦的状态下,必须先加载配置文件,才能初始化后续流程。在初始化生产者和消费者之前,需要先创建应用程序实现的生产者和消费者的业务逻辑对象,供生产者和消费者使用。监控运行时配置文件的变化,根据变化动态调整生产者和消费者实例。这里还是要强调一下,配置和应用的解耦为动态调整提供了可能。终止期比较简单,就是关闭生产者和消费者,将他们从容器中移除。这里的终止期指的是生产者和消费者的终止,而不是整个应用程序的终止。动态调整过程中可能会发生生产者和消费者的终止,因此必须将终止的实例从容器中移除。方便初始化后续的生产者和消费者。介绍完基本流程,再介绍一下配置文件的加载过程。如何加载配置文件如果加载配置文件,过程比较简单。这里最主要的是如何兼容老项目。RocketMQ客户端支持的JDK最低版本为1.6,所以客户端打包时要考虑新老项目的兼容性。这里我们客户端的核心包支持JDK1.6。Spring早期的项目配置文件一般都放在resources路径下。我们实现了一套读取配置文件和监控配置文件的方法。具体可以参考acl中配置文件的读取和监控。springBoot在核心包的基础上封装了一套自动加载配置文件供微服务项目使用的包,以及spring用于读取和监控配置文件的一套。配置文件加载完成后,应用在RocketMQ的生产者和消费者关联的配置文件中是如何实现生产者和消费者的?接下来我就给大家分享一下这方面的内容。如何将生产消费者与业务实现关联起来首先,让我们看一下消费者是如何关联的。上图是MQ消费者的消息监听器,需要我们实现具体的业务逻辑处理。这里通过关联配置文件中实现的消费逻辑,可以将配置文件中的消费者关联到RocketMQ消费者。消费者的接口定义也很简单,就是消费消息。可以通过泛型指定消费消息的类型。在初始化消费者时,获取具体实现的参数类型,将MQ接收到的消息转化为具体的业务类型数据。消息类型的转换由客户端统一封装。您可以根据需要将消费消息的返回值与MQ提供的状态进行映射。这里的demo只是一个简单的展示。在获取具体的应用消费者实例时,如果你的消费逻辑中使用了spring管理的对象,那么你实现的消费逻辑对象也应该交给spring管理,通过spring上下文获取初始化的对象;如果你的Spring在消费逻辑上没有用来管理,可以通过反射创建具体的应用实例。与消费者不同的是,生产者需要将初始化后的生产者对象传递给应用代码,而消费者则是获取应用中实现的逻辑对象,那么如何将生产者传递给业务应用呢?业务代码中实现的生产者需要继承SendMessage,使得业务代码获取到RmqProducer对象,该对象是一个封装的生产者对象,规范了发送消息的方法,使其符合公司相应规范的System,方法在这个对象还会检查topic的命名规范,规范topic有统一的命名规范。如何动态调整生产和消费者首先说到动态调整,我们要说说发生动态调整的场景。如果没有合适的使用场景,实现动态调整就有点华而不实了。这里我列举四种配置文件变化的场景:当nameserver发生变化时,所有的生产者和消费者都需要重新初始化。一般在MQ迁移或者当前MQ集群不可用,需要紧急切换MQ时进行;在减少实例的场景下,只需要启动或关闭对应的实例即可。在增加应用实例的场景下,一般需要增加一个consumer来消费新的topic。consumer的减少一般需要在某个consumer出现异常时紧急关机。这位消费者应该及时止损。在调整消费者线程的场景中,我们对源码做了一些修改,让应用端可以获取到消费者的线程池对象,从而动态调整线程池的核心线程数。这个的应用场景一般是当一个consumer消费数据量大,占用CPU资源过多,导致优先级高的消息没有得到及时处理,可以先减少consumer的线程。应用优势原文链接本文为阿里云原创内容,未经允许不得转载。