当前位置: 首页 > 后端技术 > Java

Java是如何实现消费数据隔离的?_0

时间:2023-04-01 17:28:13 Java

今天我们要实现的是handler模??块的消费数据隔离。austin-api收到请求后,将请求发送给Kafka,topicName为austin。在austin-handler中设置了一个名为austinGroup的groupName,用于监听主题austin的数据,进而实现消息发送。从系统架构来看,Austin项目可以发送各种类型的消息:短信、微信小程序、邮件等。如果是单话题、单群,大家有没有想过一个问题:如果某个发送通道接口存在异常,超时,这时候会发生什么?没错,消息会因为消费相同的topic,使用相同的consumer而被阻塞。01.数据隔离会被打破吗?很简单。多个主题和多个组都可以。以上是否解决了所有问题?不是。即使是同一个通道,不同类型消息的发送特性也是不同的。比如我要发一条推送营销信息,可能某个时刻推送给4000W人。短时间内把这4000W人送出去是不现实的。这可能意味着影响通知类的推送消息将被破坏?很简单。毕竟我们在设计消息模板的时候就已经考虑到了这一点。消息模板有一个msgType字段来标识当前模板属于哪种类型,那么我们就可以根据不同的消息类型划分对应的组。理论上,我们可以为每个频道的每种消息类型设置一个单独的主题和组。因为topic之间的数据是隔离的,不同组之间的消费也是隔离的,那么我们消费的时候数据就必须是隔离的。但是,我目前的做法是:每个主题多个小组。消费是孤立的,但生产话题是共享的。我认为代码会更清晰,更容易理解。如果后期有瓶颈,我们可以继续修改。02.上面已经确定了consumer端的设计,通过单topic多group实现数据隔离。比如我目前定义了6个通道(im/push/email/SMS/小程序/微信服务号),3个消息类型(通知/营销/验证码),相当于18个消费者。从kafka拿到消息后,我的设想是分几个步骤:消息丢弃->去重->真正发送。本质上,去重和发送消息是网络IO密集型的。因此,为了提高吞吐量,我决定消费Kafka,存储在缓存中,做一层缓冲。做一层缓冲区可以提高吞吐量,但也会产生其他问题。例如:当应用程序重启时,缓冲区中的数据还没有被消费,会不会丢失?后面可以看看如何解决由此带来的问题(持续关注,后面会有更多的项目优化)。现在我还是觉得buffer利大于弊,所以又回到了buffers。buffer给我的第一反应是实现生产者消费者模式。要实现这种模式,我觉得很简单:作为生产者消费Kafka消息,然后把数据丢到阻塞队列中,开启多个线程消费阻塞队列中的数据就完事了。后来又想了想,直接线程池不就完了吗?线程池不就是生产者和消费者的实现吗?于是乎,架构就变成了下图:03.代码设计在消费者端,先看Receiver的代码。这个类看起来很简单。@KafkaListener注解修改方法只有一种,从Kafka消费,然后交给pending处理。我使用@KafkaListener注解从Kafka中拉取消息,而不是低级的Kafkaapi,没有别的原因:项目前期没必要完美,有的时候想办法就好了一个瓶颈。话虽如此,写的时候还是给我带来了很大的麻烦。第一个问题:@KafkaListener是注解。从源码注解来看,其传值只能使用SpringEL表达式,读取某种配置。但你需要知道的是,我的目的是让多个小组消费同一个话题。而且我也不能说给每个群体定义一个消费方式吧?(写这种破代码,我都睡不着)翻了一晚上技术博客也没找到解决办法,还发朋友圈吐槽。第二天仔细翻阅了Spring的官方文档,终于找到了适合我的解决方案。还是官方文件!有了解决办法,事情就好办了。由于我要隔离每个消息通道的每个消息类型,所以我将枚举这个,我就完成了!我的Receiver是多实例的,所以只要遍历List就可以了(在ReceiverStart类上初始化消费者)。解决了使用@KafkaListener注解动态传入groupId然后创建多个消费者的问题后。遇到第二个问题:Spring有@Aysnc注解可以优雅的实现线程池的方法调用。之前没用过@Aysnc注解,看了下原理和使用姿势。我觉得它很优雅(优雅永远不会过时)。但是使用@Aysnc是肯定要自己创建线程池的,我想为每个消费者创建自己独有的线程池。而且我也不能说要定义一个为每个组创建线程池的方法吧?(写这种破代码,睡不着觉)这次翻了官网和各种技术博客,也没能解决我的问题:在@Async注解上动态传入线程池实例Spring环境,创建线程池实例可以支持按条件传参。最后只能放弃@Aysnc注解,用编程的方式实现:下面是TaskPendingHolder的实现(无非就是为每个消费者创建一个对应的线程池):而Task的实现是目前比较简单,直接调用对应的Handler发送消息即可:04.总结代码看似简单,业务看似简单易懂,但要知道的是,即使很多小公司的生产项目也没有这种设计.一梭子真的太普通了(功能不是不能实现,代码也不是不能跑,最重要的:人不是不能跑)这篇文章主要描述一个思路:消费MQ的时候,多组它可以实现数据隔离。如果想提高消费的吞吐量,可以再加一层缓冲(前提是消费是IO密集型的)。最后,如果您觉得本文对您有帮助,请点个赞。或者可以加入我的开发交流群:1025263163互相学习,我们会有专业的技术解答。如果您觉得这篇文章对您有用,请给我们的开源项目一个小星星:http://github。crmeb.net/u/defu非常感谢!PHP学习手册:https://doc.crmeb.com技术交流论坛:https://q.crmeb.com