当前位置: 首页 > 后端技术 > Java

Pulsar会重复消费吗?

时间:2023-04-01 14:04:47 Java

背??景好久没有分享Java相关的排查问题了。最近帮同事排查了一个问题:使用Pulsar消费时,重复消费同一条消息。当他告诉我这个现象时,我很怀疑。根据之前的经验,Pulsar在官方文档和API中都有说明:只有设置了消费ackTimeout,消费超时,才会重复投递消息。默认情况下next是关闭的,查看代码确实没有开启。会不会是调用了negativeAcknowledge()方法(调用这个方法也会触发重新投递),因为我们使用了第三方库https://github.com/majusko/pulsar-java-spring-boot-starter只有在抛出异常时才会调用此方法。查阅了代码,没有地方可以抛异常,甚至整个过程都没有看到异常;这有点奇怪。为了了解整个事情的来龙去脉,我对他的使用过程进行了详细的了解;实际上,业务中存在错误。他调试,然后在消费消息的时候进行单步调试。经过一次调试,他很快就关闭了得到了同样的信息。但是奇怪的是每次debug后都无法重复消费。我们都说一个bug如果能100%完全复现,基本上就解决了一大半。因此,我们调查的第一步是完全重现问题。为了排除IDEA的问题(虽然概率很小),由于是调试时的问题,转换成代码的时候其实是sleep,所以我们打算在消费逻辑中直接sleep一段时间看看现在是否可以重复。经过测试,sleep几秒到几十秒都无法复现,最后索性sleep了一分钟,神奇的事情发生了,而且每次都复现成功!既然能复现成功,那好说,因为我自己的业务代码也是用的Pulsar,打算在自己的项目中再复现一遍,方便调试。结果,又发生了奇怪的事情,我这里无法重现。这虽然符合预期,但也无法调整。本着相信现代科学的前提,我们两人唯一的区别就是项目不同。为此,我比较了双方的代码。@PulsarConsumer(topic=xx,clazz=Xx.class,subscriptionType=SubscriptionType.Shared)publicvoidconsume(Datamsg){log.info("consumemsg:{}",msg.getOrderId());锁定lock=redisLockRegistry.obtain(msg.getOrderId());如果(lock.tryLock()){尝试{orderService.do(msg.getOrderId());}catch(Exceptione){log.error("consumermsg:{}err:",msg.toString(),e);}最后{lock.unlock();}}}果不其然,同事这边的代码加了一把锁;一个基于Redis的分布式锁,这个时候我拍大腿不会因为解锁时超时,导致抛出异常。为了验证这个问题,在可复现性的基础上,我在框架的Pulsar消费处打了个断点:果然解决了,异常提示很明确:加锁超时时间已过。进入exception后,直接发送negativemessage,exception也同时被吃掉,所以之前没有发现。查看RedisLockRegistry的源码,默认的超时正好是一分钟,所以之前我们sleep了几十秒也无法重现这个问题。总结之后问同事为什么要加锁,因为我看到根本不需要加锁;原来他是因为抄别人的代码才加的,根本没想那么多。所以这件事也可以吸取一些教训:ctrlC/V虽然方便,但是也要充分考虑自己的业务场景。在使用某些第三方API时,您需要充分了解其功能和参数。你的点赞和分享是对我最大的支持