Kafka-多线程代码赏析2020年8月13日IGORBUZATOVI?此人写于https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/下载了这个博客。以下内容纯属学习。源码路径https://github.com/inovatrend/mtc-demoMultithreadedKafkaConsumerpackagecom.inovatrend.mtcdemo;importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.errors.WakeupException;导入org.apache.kafka.common.serialization.StringDeserializer;导入org.slf4j.Logger;导入org.slf4j.LoggerFactory;导入java.time.Duration;导入java.time.temporal。ChronoUnit;导入java.util.*;导入java.util.concurrent.ExecutorService;导入java.util.concurrent.Executors;导入java.util.concurrent.atomic.AtomicBoolean;公共类MultithreadedKafkaConsumer实现Runnable,ConsumerRebalanceListener{privatefinalKafkaConsumerconsumer;privatefinalExecutorServiceexecutor=Executors.newFixedThreadPool(8);privatefinalMapactiveTasks=newHashMap<>();privatefinalMapoffsetsToCommit=newHashMap<>();privatefinalAtomicBooleanstopped=newAtomicBoolean(false);privatelonglastCommitTime=System.currentTimeMillis();privatefinalLoggerlog=LoggerFactory.getLogger(MultithreadedKafkaConsumer.class);publicMultithreadedKafkaConsumer(字符串主题){Propertiesconfig=newProperties();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);config.put(ConsumerConfig...);consumer=newKafkaConsumer<>(config);newThread(this).start();}@Overridepublicvoidrun(){try{consumer.subscribe(Collections.singleton("topic-name"),this);while(!stopped.get()){ConsumerRecordsrecords=consumer.poll(Duration.of(100,ChronoUnit.MILLIS));handleFetchedRecords(records);checkActiveTasks();commitOffsets();}}catch(WakeupExceptionwe){if(!stopped.get())throwwe;}finally{consumer.close();}}privatevoidhandleFetchedRecords(ConsumerRecordsrecords){if(records.count()>0){ListpartitionsToPause=newArrayList<>();records.partitions().forEach(partition->{List>partitionRecords=records.records(partition);Tasktask=newTask(partitionRecords);partitionsToPause.add(partition);executor.submit(task);activeTasks.put(partition,task);});consumer.pause(partitionsToPause);}}privatevoidcommitOffsets(){try{longcurrentTimeMillis=System.currentTimeMillis();if(currentTimeMillis-lastCommitTime>5000){if(!offsetsToCommit.isEmpty()){consumer.commitSync(offsetsToCommit);offsetsToCommit.clear();}lastCommitTime=currentTimeMillis;}}catch(Exceptione){log.error("提交偏移量失败!",e);}}privatevoidcheckActiveTasks(){ListfinishedTasksPartitions=newArrayList<>();activeTasks.forEach((partition,task)->{if(task.isFinished())finishedTasksPartitions.add(partition);longoffset=task.getCurrentOffset();if(offset>0)offsetsToCommit.put(partition,newOffsetAndMetadata(offset));});finishedTasksPartitions.forEach(partition->activeTasks.remove(partition));consumer.resume(finishedTasksPartitions);}@OverridepublicvoidonPartitionsRevoked(Collectionpartitions){//1.停止所有任务处理来自已撤销分区的记录MapstoppedTask=newHashMap<>();for(TopicPartitionpartition:partitions){Tasktask=activeTasks.remove(partition);if(task!=null){task.stop();stoppedTask.put(partition,task);}}//2.等待停止的任务完成当前rec的处理ordstoppedTask.forEach((partition,task)->{longoffset=task.waitForCompletion();if(offset>0)offsetsToCommit.put(partition,newOffsetAndMetadata(offset));});//3.收集撤销分区的偏移量MaprevokedPartitionOffsets=newHashMap<>();partitions.forEach(partition->{OffsetAndMetadataoffset=offsetsToCommit.remove(partition);if(offset!=null)revokedPartitionOffsets.put(partition,offset);});//4.撤销分区的提交偏移量try{consumer.commitSync(revokedPartitionOffsets);}catch(Exceptione){log.warn(“无法提交已撤销分区的偏移量!”);}}@OverridepublicvoidonPartitionsAssigned(Collectionpartitions){consumer.resume(partitions);}publicvoidstopConsuming(){stopped.set(true);consumer.wakeup();}}Task下面查看分析线程序代码packagecom.inovatrend.mtcdemo;importorg.apache.kafka.clients.consumer.ConsumerRecord;导入org.slf4j.Logger;导入org.slf4j.LoggerFactory;导入java.util.List;导入java.util.concurrent.CompletableFuture;导入java.util.concurrent.ExecutionException;导入java.util.concurrent.atomic.AtomicLong;importjava.util.concurrent.locks.ReentrantLock;publicclassTaskimplementsRunnable{privatefinalList>records;privatevolatilebooleanstopped=false;privatevolatilebooleanstarted=false;privatevolatilebooleanfinished=false;privatefinalCompletableFuturecompletion=newCompletableFuture<>();privatefinalReentrantLockstartStopLock=newReentrantLock();privatefinalAtomicLongcurrentOffset=newAtomicLong();privateLoggerlog=LoggerFactory.getLogger(Task.class);publicTask(List>records){this.records=records;}publicvoidrun(){startStopLock.lock();if(stopped){return;}started=true;startStopLock.unlock();for(ConsumerRecordrecord:records){if(stopped)break;//在这里处理记录并确保捕获所有异常;currentOffset.set(record.offset()+1);}finished=true;completion.complete(currentOffset.get());}publiclonggetCurrentOffset(){returncurrentOffset.get();}publicvoidstop(){startStopLock.lock();this.stopped=true;if(!started){finished=true;completion.complete(currentOffset.get());}startStopLock.unlock();}publiclongwaitForCompletion(){try{returncompletion.get();}catch(InterruptedException|ExecutionExceptione){return-1;}}publicbooleanisFinished(){returnfinished;}}分析1、手动提交offset属性:enable.auto.commit设置为true;如果设置为false则自动提交轮询方法后的数据偏移量;需要以下两种:commitSync()在记录处理完成和调用下一次轮询方法之前实现ConsumerRebalanceListener接口,并重写中的方法它。比如撤销partition,此时commitoffset2.处理速度慢问题是轮询获取到的消息时,后续处理逻辑复杂。如果消费者在这个时间间隔内没有调用轮询方法,则消费者将被从监听器中移除。Kafka的max.poll.interval.ms配置默认是5分钟。在使用线程消费模型时,可以根据以下两种配置来处理这个问题。max.poll.recoreds设置较小的值max.poll.interval.ms设置较高的值来执行两者的结合;看逻辑执行时间,如果轮询记录大小为50条,每次逻辑处理6秒,则为300秒(5分钟);这可以减少50,时间间隔可以增加300多秒。3、处理消息异常对于程序中的异常处理,有三种选择:停止处理和关闭消费(在这个操作之前,可以选择重试几次)将记录发送到死信队列,继续下一条记录(在此操作之前,您可以选择重试几次)重试直到记录被成功处理(这可能需要很长时间)第三种选择,无限重试,在某些场景下是可取的。例如,如果外部系统离线并涉及写入操作,您可能希望不断重试,直到外部系统可用,无论需要多长时间。当然,在kafka中,由于max.poll.interval.ms的原因,每个线程在执行消费模型时,必须在一个时限内完成对每条记录的处理。否则会超过指定时间,被消费组移除。为此,必须为重试实现相当复杂的逻辑。4.多线程下的不良影响1.offset可能在处理一条记录之前提交2.从同一个partition获取的消息可能会被并行处理(同一条记录处理出现多次),消息处理的顺序不能得到保证。当然我们希望多线程能像单线程一样保持执行顺序,不要重复获取同一个分区的同一条记录。对于本文中的任务和消费者,仅提供一种解决问题的思路,并不适用于所有场景。实现线程池,配置分区轮询获取的记录大小和数据量。5.保证处理顺序由于轮询采用多线程的方式处理逻辑,因此在线程模型中分区处理完成后,可以暂停为消费者设置的记录分区。主线程全部执行完毕后,消费者会解除分区限制。这是一般的想法。这里用到了KafkaConsumer的两个API:pause(Collectionpartitions)resume(Collectionpartitions)这里不是放过所有的任务分区。而是放开完成子线程任务的分区。6.Processinggrouprebalancing由于是多线程,消费者可能会被rebalance,部分分区可以重新分配给其他消费者。这个时候还有一些线程在执行那些分区的记录,所以可能会删除一些记录。多个消费者处理。出现重复数据等。当然,通过处理undopartition的recordcompletion,并在repartition重新分配之前commit相应的offset,可以最大限度地减少由于grouprebalancing带来的重复处理。ConsumerRebalanceListener的实例被设置为KafkaConsumer.subscribe()方法的参数,从而覆盖onPartiionsRevoke()方法。由于这是从消费者的轮询方法调用的,因此它会在主线程上再次发生。所以consumer.commitSync()同步提交,不用担心报ConcurretnModificationException。如果某些线程任务当前正在处理来自撤消分区的记录。有两种选择可以处理这种情况。1.等待所有线程任务完成。2.停止这些线程任务,等待当前处理的记录。完成上述操作后,就可以提交这些分区的偏移量了。在onPartitionsRevoked()方法中等待结果是阻塞消费者主进程的,所以要注意等待时间过长会超过max.poll.interval.ms时间间隔,导致消费者被群组移除。所以,第二个稍微好一点,因为它花费的时间更少。因此,在与其他系统交互时,应选择小于max.poll.interval.ms的时间间隔,以防止出现上述情况。如果发生会话超时,则不应提交相应的偏移量,因为请求不会被视为成功提交。这意味着已经处理过的记录在分区被重新分配后会被再次处理,这将导致系统中的重复操作,除非写操作是幂等的(任意数量的操作与一次操作的效果相同)。总结在这些用例中,实现多线程消费模型比每个消费者线程模型具有显着优势。虽然有很多方法可以做到这一点,但相应的关键点总是相同的:确保来自分区的记录只被一个线程处理一次,在记录被处理后,主线程提交偏移量适当处理的组在CONFLUENT中是平衡的有其他一些值得推荐的博客。例如:GitHub存储库。源代码观看我的Kafka峰会演讲,阅读有关ConfluentParallelConsumer的内容