匹配拆分计算后,消费KAFKA订购的数据将被推到多个外部网络用户的HTTP服务器地址。该比例属于1:n的关系。
在压力测试过程中,发现在Kafka的顺序消耗方案中,由于超时而引起的总体积累压力在实际时间内相对较高,并且需要优化。
为了推到外部客户端场景,瓶颈主要在等待客户响应IO时间。尽管KAFKA在春季也是默认的批处理消耗,但批处理框架本身已转换为单线处理的一批消耗。因此,我们需要批处理消息以增加并行性。参考代码如下:
Apache的HTTPClient是单打模式,但是HTTPCLIENT有线池的概念可以同时重新播放此批次请求。
您需要了解单打模式和线程池之间的关系。尽管httpclient是单个类实例,但是在处理请求时,您可以使用poolinghtpclientConnectionManager线程池单独处理不同的连接。例如,同时执行10个HTTP请求。目前,您将访问poolinghtpclientConnectionManager获取连接池资源。成功地采购后,将执行HTTP请求。
同时,为了确保KAFKA消息消耗的可靠性,Kafka需要在HTTP成功重新发布后手动提交偏移量,并且需要在多线程编程中介绍CountDownLatch来协作。KAFKA。因此,修改了模式修改:n需要在消费者主线程的主线上重新启动的客户,并且主线程具有CountDownLatch等待所有任务重新播放的所有任务。消费速度太快了并导致客户积累压力。
在性能压力测试中,单个消息处理匹配查询的速度为2-4ms。对于100片数据,需要计算串行计算。通过平行计算Java8的平行流来优化此部分时间。请注意使用ParalleStream来注意线程安全性,并且返回的数据是无序的数据数据。
目前继续遇到多个问题:
1. JVM出现完整的GC [FLL GC(ALORACATION失败)507m-> 382m(512m),0.8299883秒] [EDEN:0.0B(25.0m) - > 0.0B(69.0m)幸存者:0.0B-> 0.0b-> 0.0B> 0.0B HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP HEAP:507.3m(512.0m) - > 382.7m(512.0m)],[Metaspace:109047K-> 109047K(1153024K)]]]
调试环境的GC 512MB太小,可以将其修改为2G。
完整的GC触发条件
2.转发队列,在重新发布之前重新发布之前,需要重新播放一些转换。继续在压力测试中重新发布,发现转发队列将显得卡住,并且在停顿后将继续进行任务。目前,推动任务尚未返回。没有占用线程的情况。这次,CPU,内存和GC是正常的。
原因:线程池执行的任务是密集的,其中大多数是空闲的,并且系统资源没有完全利用。如果大量请求立即出现,如果线程池的数量大于何时何时,则多余的请求将被放入等待队列中的等待等待线程,然后在等待队列中执行任务。
线程池中的代码执行顺序为:corepool-> workqueue-> maxpool
因此,修改了线程池的工作顺序。当核心线程已满时,它不会首先跳入队列,而是打开一个新的工作线程。这可以保证队列消息不会在内存中积累。
修改方法是记录执行过程中正在运行的任务数量。当线程是队列时,任务确定任务是否正常大于核心线程。如果它不大于核心,则创建和执行核心线程。然后创建一个新的线程执行任务。
如果您使用pollexecutor的referxecute方法来计数-1,则操作员是线程池中的线程。线程完成后,已通知A并接受新任务。如果此时有一个新的任务提交,则计数为时已晚,无法更新。目前,线程池被判断为核心线程没有空闲线程,并且将构建一个新线程。
发现:执行方法的多线程并发调整,线程池创建超支
观察线程名称,并发现执行方法的多线程并发调整,并且线程数继续增加。将不会重复旧线程,从而导致整个线程队列。
原则
线程池的状态不是由用户设置的,而是伴随着线程池的操作并从内部维护。线程池使用一个变量来维护两个值:运行状态和工作人员。
在特定的实现中,线程池放置了运行状态的两个关键参数和线程数(WorkerCount)的维护,如以下代码所示:
此原子类型类型是一个控制线程池的运行状态和线程池中有效线程数的字段。它还包含信息的两个部分:线程池的操作状态和线程池中有效线程的数量(WorkerCountt),3个高点可节省运行,29- bit savion workercount,并且之间不相互干扰两个变量。使用一个变量存储两个值,以避免做出相关决策时不一致的情况。您不必占据锁定资源即可维持两者的一致性。
也可以通过读取线程池的源代码来找到。通常有必要同时判断线程池的操作状态和线程数量。线程池还为用户提供了几种方法来获得当前的操作状态和线程池的线程。比基本操作快得多。
线程池需要管理线程的生命周期,并且在线程不长时间运行时需要回收。可以通过添加引用和删除引用来控制线程的内容。目前重要的是如何确定线程是否正在运行。
Worker使用AQ使用AQ来实现独家锁定。没有使用重新锁定重新输入锁,但是使用AQ,以实现不可用的特性来反映当前的执行状态。
1.一旦获得锁定方法,就意味着当前线程正在进行任务。
2.如果执行任务,则不应中断线程。
3.如果线程不是独家锁,即闲置的状态,则意味着它没有处理任务,并且此时可以中断。
4.当线程池执行关闭方法或trytermination方法时,interruptidis方法用于中断空闲线程。Interuptidis方法使用Trylock方法来确定线程池中的线程是否闲置。如果线程是空闲的,则可以安全地恢复。该特征在线程恢复过程中使用。恢复过程如下图所示:
线程池中线程的破坏取决于您需要引用JV的自动回收。创建工人后,它将继续查询,然后获得执行任务。核心线程可以无限制地等待完成任务。非核线程需要在有限的时间内获得任务。当工人无法获得任务时,即任务是空的,周期将结束,并且工作人员将积极地消除其线程池中的参考。
目前,可以看出,在任务结束后,线程池中完成的线程不会立即发布。
例如,核心线程为20,最大线程为40。当线程池膨胀到40.将被卡住。在回收一批线程后,将继续处理。这次,怀疑是多线程执行提交速度导致线程数量继续增加,并且没有重复使用。
发现的测试:
目前,可以判断提交速度与新线程的分配和重复使用旧线程有关。
取消CountDowLatch以制作射击协调机制,您可以使用线程池ActiveCount检查任务是否完成。所有线程池都释放了,效果等同于CountDownLatch。
源代码答案
目前,线程池中的线程仍在增加,旧线程没有重复使用。解释ActiveCount与JVM的实际发布线程无关。
线程池队列模型
ThreadPoolExecutor在以下4个情况下执行执行方法:
1)如果当前正在运行的线程小于CorePoolsize,请创建一个新线程以执行任务(请注意,您需要在此步骤中获得全局锁定)。
2)如果线程运行等于或超过CorePoolSize,请将任务添加到阻止类别。
3)如果无法将任务添加到BlockingQueue(队列已满),请创建一个新线程来处理任务(请注意,您需要在此步骤中获得全局锁定)。
4)如果创建新线程将使当前运行的线程超出maximumpoolsize,则将拒绝该任务,并且将调用拒绝的executionhandler.rejectexecution()方法。
ThreadPoolExecutor采用上述步骤的总体设计思想,以避免执行Execute()方法(将是一个严重的望远镜瓶颈)尽可能多地获得全局锁。比或等于CorePoolSize),几乎所有Execute()方法调用均执行步骤2,而步骤2不需要获得全局锁。
此时,检查线程池的任务调度机制。线程池是生产者消费者模型。生产和消费使用线程池设置的队列模型。
RunnableTaskQueue:用于保留执行任务的阻止队列。您可以选择以下阻塞队列:
l arrayblockingqueue:它是基于数组结构的有界阻塞队列。该队列由FIFO(高级第一)原理进行排序。
l linkedblockingqueue:基于链接列表的阻塞队列可以指定最大长度,但默认值是无限的。此队列按FIFO进行排序,吞吐量通常高于arrayblockingqueue.static Factory方法执行者。newfixedThreadPool()使用此queue。
在Synchronousqueue上:没有实际存储空间的同步阻塞队列。每个插入操作必须等到调用另一个线程才能删除操作,否则插入操作始终处于阻止状态。吞吐量通常高于链接障碍物。
y PriorityBlockingQueue:基于堆的无界阻塞优先级队列。
Lenovo的先前业务流程,Kafka每批消费100的重新发布消息,将线程池设置为100,并首次成功重新发布,然后消耗第二批。但是,目前尚未发布核心线程,新闻加入等待队列。因此出现了口吃。
考虑到口吃的场景,如果您降低等待队列,您将不断创建一个新线程,直到超过最大值线程的数量。如果控制的线程总数始终大于要执行的任务,它将不断创建新线程。旧线程被延期销毁。
考虑到此时更换线程池队列模型,Synchronousqueue没有实际的存储空间。您必须在输入下一个任务之前等待最后一项任务。这次,核心线程将继续消费任务,并且将不会有排队装满新线程的情况。
线程池提交休眠1NS,并使用shot弹枪进行协作。
线程池不会创建一个新线程,并且每分钟的请求量稳定在约6.6k。
线池倒计时,不休眠。
线程池将继续添加,在某些情况下,队列被排名到线程池中。固定速度。保存下降。
线程池使用同步队列,消耗250
稳定性约为13.2k。
CPU使用率为17K,稳定为73%
稳定在7K,CPU使用率很高。Location为85%
波动很大,但性能很高,平均为22k。认为推动队列受毛刺的影响,CPU利用率较低。50%
高性能,平均25,000。指出推杆队列受毛刺的影响,CPU使用率较低,28%。可以看出,对于IO强度的任务,CPU并未增加。
您可以看到,即使优化了线程池,httpclient的重新发布的性能约为400/S。性能不高,因此下一步是考虑使用Netty/vertx重写转发线程。