前言本文旨在解决不同Kafka主题之间存在一定数据关联时的顺序消费问题。如果存在Topic-insert和Topic-update分别插入和更新数据,当insert和update对同一个数据进行操作时,要保证先insert再update。一、问题引入kafka的顺序消费一直是一个难以解决的问题。Kafka的消费策略保证了相同topic和partition的消息顺序消费,其他的则不能保证。如果一个Topic只有一个Partition,那么消费者对应的Topic的消费一定是有序的。任何不同主题的情况下,都不能保证消费者的消费顺序与生产者的发送顺序一致。如果不同topic之间存在数据关联,并且有消费顺序的需求,如何处理?本文主要解决这个问题。2、解决思路现有Topic-insert和Topic-update,数据的唯一标识为id,对于id=1的数据,需要保证Topic-insert先消费,Topic-update后消费。两个topic的消费是由不同的线程处理的,所以为了保证同一时刻只有一个业务逻辑在处理同一个数据标识的消息,需要在业务上加一个锁操作。如果使用synchronized加锁,会影响不相关insert和update的数据消费能力。比如id=1的insert和id=2的update在synchronized的情况下不能并发处理。这是不必要的。我们要求的是同时处理id=1的insert和id=1的update中的一个,所以使用细粒度的锁来完成加锁操作。细粒度锁实现:https://blog.csdn.net/qq_3824...PS:如果是分布式系统,细粒度锁需要使用分布式锁对应的实现。加锁insert和update之后,还是没有解决消费订单的问题,但是一次只处理一个业务。针对消费顺序异常的问题,即先消费update再消费insert的情况。处理方式:消费更新数据,检查数据库中是否存在当前数据(即是否执行insert),如果不存在,将当前更新数据存入缓存,key为数据标识id,检查insert消费时id是否存在如果有对应的update缓存,则证明当前数据的消费顺序异常,需要先进行update操作,然后再移除缓存数据。3、实现scheme消息发送:kafkaTemplate.send("TOPIC_INSERT","1");kafkaTemplate.send("TOPIC_UPDATE","1");监控代码示例:KafkaListenerDemo.java@Component@Slf4jpublicclassKafkaListenerDemo{//消费privateMapUPDATE_DATA_MAP=newConcurrentHashMap<>();//数据存储privateMapDATA_MAP=newConcurrentHashMap<>();私有WeakRefHashLockweakRefHashLock;publicKafkaListenerDemo(WeakRefHashLockweakRefHash{this.weakRefHashLock=weakRefHashLock;}@KafkaListener(topics="TOPIC_INSERT")publicvoidinsert(ConsumerRecordrecord,Acknowledgmentacknowledgment)throwsInterruptedException{//模拟序列异常,即消费插入后,此处线程休眠Thread.sleep(1000);Stringid=record.value();log.info("Receivedinsert::{}",id);Locklock=weakRefHashLock.lock(id);lock。锁();try{log.info("开始处理{}的插入",id);//模拟插入业务处理Thread.sleep(1000);//从缓存中获取是否有更新数据if(UPDATE_DATA_MAP.containsKey(id)){//缓存数据存在,执行updatedoUpdate(id);}log.info("{}的插入处理结束",id);}最后{lock.unlock();}acknowledgment.acknowledge();}@KafkaListener(topics="TOPIC_UPDATE")publicvoidupdate(ConsumerRecordrecord,Acknowledgmentacknowledgment)throwsInterruptedException{Stringid=record.value();log.info("收到更新::{}",id);锁lock=weakRefHashLock.lock(id);锁.锁();try{//用于测试,不进行数据库校验if(!DATA_MAP.containsKey(id)){//没有找到对应的数据,证明消费顺序异常,将当前数据加入缓存log.info("消费序列异常,将更新数据{}加入缓存",id);UPDATE_DATA_MAP.put(id,id);}else{doUpdate(id);}}最后{lock.unlock();}acknowledgment.acknowledge();}voiddoUpdate(Stringid)throwsInterruptedException{//模拟更新log.info("Startprocessingupdate::{}",id);线程.睡眠(1000);log.info("进程更新::{}结束",id);}}log(代码中肯定模拟过当前消费订单异常):收到update::1消费订单异常,将更新数据1加入缓存,收到insert::1,开始处理1的insert,开始处理update::1processingupdate::1,endprocessing1'sinsert结束观察日志。该方案可以正常处理不同主题之间数据关联的消费顺序问题。版权声明:本文为CSDN博主“方片龙”原创文章,遵循CC4.0BY-SA版权协议,转载请附上原文出处链接及本声明。