当前位置: 首页 > 后端技术 > Java

Java并发编程制作应用场景与实战

时间:2023-04-01 14:59:48 Java

背景介绍为什么要学习Java并发?从提高性能的角度,提高CPU的使用效率:目前生产的服务器大多是多核的,标准机都是8C/16G的。操作系统会将不同的线程分配给不同的核心进行处理。理论上,并行执行的线程数与核心数一样多。如果没有并发编程,CPU的利用率就会被大大浪费。假设当前正在处理耗时的I/O操作,那么整个CPU会处于阻塞空闲状态,后续的指令必须等待前面的执行完成才能继续执行。降低服务RT:大规模上网的次数很容易超过每秒10000次。如果没有并发处理,所有的用户请求都会排队。你能想象体验效果吗?这样的服务能力如何留住客户?通过并发编程,充分释放CPU算力。操作系统让每个客户轮流使用CPU计算,每个客户都能得到快速响应。容错率高:线程之间的执行不会相互干扰,一个线程异常退出也不会影响其他线程。从一个开发者的角度来看,Java基本面试技巧是必须考的:Java并发面试题基本都会出现。有大型项目开发经验的同学和处理过很多并发问题的同学往往会受到青睐。因为系统越复杂,并发请求越多,简单的业务+并发=这个业务不简单。并发离不开工作:多线程可以充分发挥CPU的计算能力,这就让我们不得不了解并发的原理,以免造成线程安全问题,给生产带来损失。很多并发知识都用在了常用的中间件上,比如MQ、RPC等,如果不熟悉原理,怎么调优中间件的使用。并发编程业务实践一:风控规则引擎-策略执行互联网企业的风控和安全部门需要时刻与黑灰产品作斗争,以保护企业免受不必要的经济损失。在交锋过程中,风控策略组沉淀出一系列风险识别策略,检测当前业务请求中是否存在高风险操作。风控和安全团队需要评估在业务运营过程中是否存在黑产可以获取收益的地方,即“风险点”。评估后,业务每次经过风险卡点时,需要将业务信息透传给风控服务。风控服务长时间进行大量的决策计算,返回业务方的决策结果(ACCEPT-通过/REVIEW-手动,需要进一步信息确认/REJECT-拒绝,高风险操作)。图为营销活动——裂变活动风险卡点。营销裂变流程风险卡点图一个业务请求一般需要300到500毫秒之间。如果超出这个范围,可能需要定位调优哪个节点耗时。大型互联网公司的系统架构相对复杂。一个完整的业务可能有几十个甚至上百个服务系统。您触发的请求可能会经过比您想象的更多的服务。前文提到,业务对风控业务的性能要求很高,一般控制在100ms以内。然而,风控业务请求的内部调查涉及大量的策略和规则。如何在不阉割策略的情况下,在短时间内执行?答案是并发成为。风控内部大量使用并发来满足海量请求和计算需求。我将以策略规则的执行为例来说明如何编写代码。下面是一个风控服务请求的大致执行流程:从风控流程执行图可以看出,一个风控请求的判断涉及到大量的规则判断。如果此时没有并发,会有什么影响呢?Serial&ParallelExecutionRuleDiagram如果所有的策略和规则都是串行执行的,可能几秒内都算不出来。这时候就需要使用Java并发来满足性能需求。核心代码如下:publicclassRuleSessionExecutor{//线程池privatefinalstaticExecutorServiceexecutor=newThreadPoolExecutor(Runtime.getRuntime().availableProcessors()*8,Runtime.getRuntime().availableProcessors()*8,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue(),newCustomerThreadFactory("rule-executor"),newThreadPoolExecutor.AbortPolicy());/***规则执行*@paramrules*/publicvoidexecute(Listrules){finalCountDownLatchlock=newCountDownLatch(rules.size());for(CustomerSessionsession:rules){try{session.exec();}catch(Throwablee){}最后{lock.countDown();}}try{lock.await(50,TimeUnit.MILLISECONDS);}catch(InterruptedExceptione){log.error("CountDownLatcherror",e);}}}这里用到了CountDownLatch并发工具类,下面将介绍实践二:风控特征平台特征加载如上实践一,大量的规则需要大量的执行之前获取特征,在每个规则执行内获取特征是可行的,但是会造成重复获取特征的问题,浪费性能。例如:如果规则人员在做A/B测试,两个策略包有很多重叠的特征,此时如果在每条规则内获取相当于重复访问两次有交集的特征,这种浪费是没有必要的,此时我们先获取当前策略包下的所有去重特征,然后再进行规则执行,然后获取所有特征,然后执行规则。那么此时的问题是,如何批量获取特征呢?特征类型很多:输入型:不耗时-请求上下文携带,如订单金额衍生型:基本不耗时-基于输入型特征推导,如根据经纬度计算距离Real-时间统计特性:基本不耗时-有趣可以关注我的文章Flink在风控场景下的实时特性。详细介绍了查询类:耗时——比如根据订单号调整业务RPC接口获取订单详情,通信+业务本身耗时外部类:耗时——比如第三方同步获取&异步获取同盾、IPIP等风控公司产品特性。显然,我们需要并发来支持性能。核心代码如下:publicclassDataSourceExecutor{privatefinalstaticExecutorServiceexecutor=newThreadPoolExecutor(Runtime.getRuntime().availableProcessors()*8,Runtime.getRuntime().availableProcessors()*8,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue(),newHamThreadFactory("ds-executor"),newThreadPoolExecutor.AbortPolicy());/***特征获取**@paramdataSources*/publicvoidexecute(ListdataSources){Listfutures=Lists.newArrayList();长时间超时=ApolloConfig.getLongProperty(ApolloConfigKey.DS_TIMEOUT_OUTER_KEY,150L);对于(数据aSourceds:dataSources){超时=ds.getExecutionTimeout()>超时?ds.getExecutionTimeout():超时;CompletableFuturefuture=CompletableFuture.runAsync(()->{ds.execute();},执行者);futures.add(未来);}CompletableFuturesummaryFuture=CompletableFuture.allOf(futures.toArray(newCompletableFuture[]{}));尝试{summaryFuture.get(timeout,TimeUnit.MILLISECONDS);}catch(InterruptedException|ExecutionException|TimeoutExceptione){log.error("DataSourceexecerror",e);}}}类似于规则的批量执行,但是这里使用了Java8的CompletableFuture并发工具类,其功能得到了增强,下面将使用和介绍实践3:分布式任务运行批量定时任务应该是一个工作中很常见的需求,比如订单状态流水检测,对账等。任务一般都是批量运行的,即包含多个子任务。这种场景非常适合线程池任务队列的并发执行。任务队列线程池图核心代码如下:publicvoidexecute(Listtasks){tasks.forEach(t->{executor.execute(()->{try{log.info("taskid:{}beginexec",t.getId());t.execute();}catch(Throwablee){log.error(String.format("taskexecuteerror,uid:%s",t.getId()),e);}finally{log.info("taskid:{}endexec",t.getId());}});});}并发编程常用工具线程池线程池(英文:线程池):一种线程使用模式。过多的线程会引入调度开销,从而影响缓存位置和整体性能。线程池维护着多个线程,等待主管分配可以并发执行的任务。这避免了在处理短期任务时创建和销毁线程的成本。线程池既可以保证核心的充分利用,又可以防止过度调度。可用的线程数应取决于可用的并发处理器、处理器内核、内存、网络套接字等的数量[1]。J.U.C提供的线程池:ThreadPoolExecutor类帮助开发者方便的管理线程和执行并行任务。了解并合理使用线程池是开发者必须掌握的一项基本技能。任务调度当用户提交任务时,任务的生命周期将由线程池控制。线程池内部实际上构建了一个生产者/消费者模型。线程和任务解耦,没有强关联,有利于任务的缓冲和复用。了解线程池的第一步必须知道任务的运行机制。任务执行图任务队列线程池的本质是任务和线程的管理,而这其中的关键是将任务和线程解耦,使两者不直接相关,从而实现后续工作的合理分配。线程池是通过阻塞队列以生产者消费者模式实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。阻塞队列(BlockingQueue)在队列的基础上增加了两个特性。当队列为空时,获取元素的线程会等待队列变为非空。当队列已满时,存储元素的线程将等待队列可用。阻塞队列常用于生产者和消费者的场景。生产者是将元素添加到队列的线程,消费者是从队列中获取元素的线程。阻塞队列是生产者存储元素的容器,消费者只从容器中获取元素。阻塞队列示意图阻塞队列可以选择如下:ArrayBlockingQueue是有界的;数组实现;先进先出;LinkedBlockingQueue是有界的(默认长度为Integer.MAX_VALUE,一不小心内存就会溢出);链表实现;先进先出;PriorityBlockingQueue是无界的;平衡二叉树实现;排序DelayQueue和PriorityBlockingQueue一样;对象只有在过期时才能从队列中移除SynchronousQueue不存储元素;没有put操作需要等待take操作LinkedTransferQueue是有界的;优点是与LinkedBlockingQueue相比,降低了锁的粒度,LinkedBlockingDeque的性能相对更高LinkedBlockingQueue实现了双端阻塞;锁粒度减小,性能更好。任务拒绝线程池的自我保护熔断部分。当任务有界缓存队列满了,证明线程池已经过载,无法处理。这时候需要拒绝新的任务,应该采用设置的拒绝策略来保护线程池。用户可以选择JDK提供的四种拒绝策略,也可以自定义RejectedExecutionHandler接口的实现。ThreadPoolExecutor.AbortPolicy()丢弃任务并抛出RejectedExecutionException;线程池的默认拒绝策略;关键业务应该使用这个异常策略来了解线程池ThreadPoolExecutor.CallerRunsPolicy()的健康状态被主线程用来执行当前任务。ThreadPoolExecutor.DiscardOldestPolicy()丢弃旧任务,重新提交任务;不建议用于生产,存在风险。ThreadPoolExecutor.DiscardPolicy()丢弃任务并且不抛出异常;生产不建议使用,不容易发现问题。CountDownLatch-同步计数器CountDownLatch由一个计数器在内部实现。计数器的数量等于初始化时要处理的等待线程的数量。每个线程执行完后,计数器需要减??一。当计数器为0时,意味着需要等待执行的线程全部执行完毕,主线程会被唤醒继续执行主线程任务。CountDownLatch流程图常见场景:1等N:结合线程池释放CPU算力,比如首页复杂的信息流,可以分发加载各个模块的信息,之后计数完成,返回结构化数据给主线程,返回最差的匹配:which线程执行完后,可以立即释放cnt量为0,通知主线程执行核心代码:publicvoiddemo(){Listtasks=...finalCountDownLatchcountDownLatch=newCountDownLatch(10);tasks.forEach(task->{try{//自己的子线程逻辑}catch(Throwablee){//有时收不到Exception,建议使用Throwable}finally{countDownLatch.countDown();}});尝试{countDownLatch.await(100,TimeUnit.MILLISECONDS);}catch(InterruptedExceptione){log.error("countDownLatchInterruptedException",e);}}CompletableFutureJava在1.8版本提供了CompletableFuture来支持异步编程。CompletableFuture的功能实在是太震撼了。他的复杂度也应该是我见过最复杂的之一。下面通过一个例子来直观感受CompletableFurureCompletableFuture番茄炒蛋的强大//第一步:准备番茄CompletableFuturef1=CompletableFuture.supplyAsync(()->{System.out.println("WashTomatoes");System.out.println("Cutit");return"Cuttomatoes";});//第二步:准备鸡蛋CompletableFuturef2=CompletableFuture.supplyAsync(()->{System.out.println("洗鸡蛋");System.out.println("Friedeggs");return"Friedeggs";});//第三步:煎鸡蛋CompletableFuturef3=f1.thenCombine(f2,(__,tf)->{System.out.println("Stirfry");return"西红柿炒蛋";});//等待任务3执行结果System.out.println(f3.join());CompletableFuture它的目的解决多线程之间复杂的实现逻辑。如上图,其实只包含了业务实现的逻辑。Lamda编程巧妙地避开了并发编程的逻辑。用最少的代码做最难的事情是完美的。CompletableFuture在这里我就不详细描述了。有兴趣的可以关注我,因为CompletableFuture的实现可能一篇文章都说不完。常见并发问题及解决死锁&定位死锁(deadlock),当两个或多个计算单元都在等待对方停止执行以获取系统资源,但任何一方都没有提前退出时,称为死锁。[1]死锁的四个条件是:无抢占:系统资源不能被强制退出进程。保持并等待:进程可以在等待时保持系统资源。互斥:资源同时只能分配给一个进程,不能被多个进程共享。循环等待:一系列进程互相持有其他进程需要的资源。locatejpsjstackpid//截取以上信息发现一个Java级别的死锁:==============================》线程-1":waitingtolockmonitor0x00007fcc68023f58(object0x0000000795ea0c00,ajava.lang.Object),whichisheldby"Thread-0""Thread-0":waitingtolockmonitor0x00007fcc68022ab8(object0x0000Object.javacl,1ang)which被“Thread-1”持有jps定位到正在运行的java程序,然后使用jstackpid打印线程信息,往下拉最下面发现有死锁提示,然后根据线程号找到对应的线程0x00007fcc68023f58分析是哪一段代码引发的问题。Java并发多线程编程的性能调优,我们首选的工具一定是线程池。使用线程池面临的核心问题是线程池的参数不好配置!你是否也遇到过根据经验对某个线上场景线程池的最小活跃线程数和最大活跃线程数估计不准确或错误的情况?其实线程池并没有一个通用的计算公式,因为一台机器上不只有一个服务,一个服务中也不只有一个线程池,如果按照I/O密集型或者CPU-密集,重复是不可避免的。调试很痛苦。那么我们是否可以降低修改线程池参数的成本,让至少在发生故障的时候能够快速调整,缩短恢复时间呢?动态线程池的架构设计本文不多说。有兴趣的可以关注我。稍后我会发一篇文章。这里我只给出大概的思路:动态参数调整:支持动态调整线程池核心参数,最小/最大核心线程数任务监控:主要监控阻塞队列的累积和单线程执行耗时95到99lines报警:有潜在压力,及时警告和纠正操作通知和认证:生产操作非常危险,需要认真加强监控。任何系统的运行都离不开监控,只是粒度问题。在并发场景下,我们需要特别关注服务线程的监控状态,尤其是活跃线程数、队列累计长度、平均耗时、吞吐量等重要指标。当出现预警时,可以及时通知相应的开发者进行降级处理,也可以自动断线保护主服务。精彩的性能调优——小日志,大坑,性能优化必不可少——火焰图Flink实现了风控场景下的实时特性。欢迎关注公众号:布谷鸡技术专栏个人技术博客:https://jifuwei.github.io/本文参与思维技术征文征文,正在阅读的欢迎加入.

猜你喜欢