当前位置: 首页 > 后端技术 > Java

一个诡异的PulsarInterruptedException_0

时间:2023-04-02 00:57:12 Java

背景今天收到业务组反馈,一个在线应用向Pulsar发送消息失败。查看日志后发现发送消息时抛出java.lang.InterruptedException异常。跟业务沟通后得知,消息的发送是在一个gRPC接口触发的。大约半个小时的异常后,才恢复正常。这就是整个问题的背景。预排查问题后,首先排查是否为通病,排查其他应用,未发现类似异常;同时查看Pulsarbroker的监控面板,这段时间仍然没有任何波动和异常;this可以初步排除是Pulsarserver端的问题。下一步是检查该时间段内应用程序的负载。从应用QPS到JVM,内存情况依然没有明显变化。Pulsar源码排查既然应用本身和Pulsarbroker似乎都没有问题,那么只能从异常本身排查。首先第一步要知道使用的是哪个版本的Pulsar-client,因为业务使用的是基于官方SDK的内部包springbootstarter,所以第一步要检查这个starter是否受到影响。通过查看源码,基本排除了起步者的嫌疑,简单封装了SDK的功能。org.apache.pulsar.client.api.PulsarClientException:java.util.concurrent.ExecutionException:org.apache.pulsar.client.api.PulsarClientException:java.lang.InterruptedException在org.apache.pulsar.client.api.PulsarClientException。解包(PulsarClientException.java:1027)在org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91)在java.base/java.lang.Thread.run(Thread.java:834)引起:java.util.concurrent.ExecutionException:org.apache.pulsar.client.api.PulsarClientException:java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)处的java.lang.InterruptedException。base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)atorg.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89)...省略了49个常见帧Causedby:org.apache.pulsar.client.api.PulsarClientException:org.apache.pu中的java.lang.InterruptedExceptionlsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775)在org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393)在org.apache.pulsar.client。org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(未知来源)在org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)在org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java)在org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292)在org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363)在org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191))在org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167)在org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)在org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82)...49常见帧省略由:java.lang.InterruptedException:nullatjava.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343)atjava.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318)atorg.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)接下来分析栈,因为Pulsar-client的部分实现源码没有直接打包intothedependencies在反编译的情况下,很多代码行数不匹配,所以需要在本地拉取官方源码,切换到对应的分支查看这一步有点麻烦,首先,代码库相当大,如果没有准备好Pulsar的开发环境,估计有些人会望而却步;但实际上,大部分问题都是网络造成的。只要配置一些Maven镜像,多试几次,总会编译成功。我这里直接把分支切换到branch-2.8。从栈顶查看TypedMessageBuilderImpl.java:91:好像内部异步发送消息的时候抛出了异常。然后往下看这里:java.lang.InterruptedExceptionatorg.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775)at这里好像是对的,但是代码行数明显不对;因为2.8这个分支也修复了好几个版本,所以中间改动导致代码行数与最新代码不匹配是正常的。信号量.get().acquire();不过乍一看应该是这行代码抛出的线程终端异常。看来他是这里最有可能的人了。为了确认是否真的是这行代码,这个文件翻了好几个版本,最后确认这行代码是正确的。我们打开java.util.concurrent.Semaphore#acquire()的源码,/***

  • 在这个方法的入口处设置了中断状态;或者*
  • 在等待*许可时被{@linkplainThread#interruptinterrupted}**然后抛出{@linkInterruptedException}并且当前线程的*中断状态被清除。**@throwsInterruptedException如果当前线程被中断*/publicvoidacquire()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted()||(tryAcquireShared(arg)<0&&acquire(null,arg,true,true,false,0L)<0))thrownewInterruptedException();}通过源码会发现acquire()函数确实响应中断,一旦检测到当前线程被中断,就会抛出InterruptedException。定位问题这样问题的原因就基本确定了。是Pulsar中的消息发送线程中断导致的,但是中断的原因还有待排查。我们知道线程中断需要调用Thread.currentThread().interrupt();API,先猜测Pulsar客户端内部是否有线程中断发送线程。于是在pulsar-client模块中搜索了相关代码:排除掉与producer无关的部分,其他所有中断线程的代码在异常后继续传递;所以最初,没有主动中断的操作。既然Pulsar自己不做,那只能靠业务来做?于是在业务代码中查找:果然找到了业务代码中唯一被中断的地方,通过调用关系得知这段代码是在消息发送之前执行的,并且与脉冲星发送功能。大致的伪代码如下:List.of(1,2,3).stream().map(e->{returnCompletableFuture.supplyAsync(()->{try{TimeUnit.MILLISECONDS.sleep(10);}catch(InterruptedExceptionex){thrownewRuntimeException(ex);}returne;});}).collect(Collectors.toList()).forEach(f->{try{Integerinteger=f.get();log.info("===="+integer);if(integer==3){TimeUnit.SECONDS.sleep(10);Thread.currentThread().interrupt();}}catch(InterruptedExceptione){thrownewRuntimeException(e);}catch(ExecutionExceptione){thrownewRuntimeException(e);}});MessageIdsend=producer.newMessage().value(msg.getBytes()).send();执行这段代码可以完全重现相同的堆栈。还好这里有日志:通过日志查找发现异常的时间和时间本次中断的日志时间点完全重合,才知道根源。因为业务线程和消息发送线程是同一个,在某些情况下,Thread.currentThread().interrupt();将被执行。其实简单的执行这行函数什么都不会发生,只要不响应中断,那就是Semaphore源码中判断线程中断的flag:publicfinalvoidacquireSharedInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted()||(tryAcquireShared(arg)<0&&acquire(null,arg,true,true,false,0L)<0))thrownewInterruptedException();但是正好这里业务中断后,我没有判断这个标记,导致Pulsar内部判断,最后抛出这个异常。总结所以归根结底还是这里的代码不合理导致的。首先,线程自己中断了却没有被使用,这可能会导致被其他基础库使用的可能性,因此会造成一些不可预知的后果。还有一个就是不建议在业务代码中使用Thread.currentThread().interrupt();这种代码一看就不知道干什么,也不好维护。其实线程中断本质上是一种线程间通信的手段。如果有这样的需求,完全可以用BlockQueue等内置函数代替。

  • 最新推荐
    猜你喜欢