当前位置: 首页 > 后端技术 > Java

只说不练假动作,一起练Kafka业务吧

时间:2023-04-01 21:22:38 Java

7.1顺序场景7.1.1场景概述假设我们要将一批订单传输到另一个系统,要求订单对应状态的演化是顺序的。已下单→已付款→已确认,不允许混淆!7.1.2订单级别1)全局订单:序列化。每条经过Kafka的消息都必须严格保证顺序。这就要求Kafka只有一个channel,每个groupid对消费者进行排序,对性能影响很大,这在实际业务中几乎是没有必要的。2)偏序:业务是偏序的。同一个订单可以顺序处理,不同订单可以并行处理。不同订单的顺序无关紧要。充分利用Kafka的多分区的并发性。你只需要想办法把一批需要排序的数据放到同一个分区中。7.1.3实现方案1)发送端:指定发送的key,key=order.id,案例回顾:4.2.3,PartitionProducer2)发送:为队列配置多个partition,保证并发。3)读者:单一消费者:显然不合理。吞吐量显然上不去。在Kafka中有多个分区有什么意义?所以开启多个消费者指定分区进行消费。理想情况下,每个分区都应该分配一个。但是,这个吞吐量还是有限的,怎么处理呢?解决方案:多线程是在每个消费者上开启多线程的解决方案。但是,请注意顺序被破坏!参考下图:线程处理完后,数据会变成2-1-3改进:接收后分发到二级内存队列。消费者拿到消息后不对消息进行处理,根据key分发到多个阻塞队列中。然后启动多个线程,给每个队列分配一个线程进行处理。提高吞吐量7.1.4代码验证1)新建一个2个分区的排序队列2)启动排序项目源码参考:SortedProducer(顺序发送者)SortedConsumer(顺序消费者-阻塞队列实现,让大家看懂设计idea)SortedConsumer2(顺序消费端-线程池实现,现实中推荐这种方式!)3)通过swagger请求,先根据不同的id发送,查看consolelog,id正确分配到对应的queue并且同一个key分配给同一个A队列,保证顺序7.2海量同步场景假设大数据部门需要大屏显示用户的打车订单状态,需要将订单数据发送给druid。这里不涉及下单,只要下单就会传输,但是对实时性和并发性要求高7.2.1常规架构下单完成mysql后,打印出程序代码,直接输入kafka或者logback与kafka集成,通过log发送。优点:更符合常规思维。发送数据到想要的部门缺点:耦合度高,将kafka发送消息嵌入到下单主业务中,形成代码入侵。我不关心下订单,也不应该关心发送给Kafka。一旦Kafka不可用,程序就会受到影响。7.2.2解耦使用canal监控订单表数据变化,不再影响主业务。7.2.3部署实现1)注意mysql部署,需要开启binlog,8.0默认开启#启动mysql8dockerrun--namemysql8-v/opt/data/mysql8:/var/lib/mysql-p3389:3306-eTZ=Asia/Shanghai-eMYSQL_ROOT_PASSWORD=thisisprizemysql8db-ddaocloud.io/mysql:8.0连接mysql,执行以下sql,添加canal用户CREATEUSERcanalIDENTIFIEDBY'canal';GRANTSELECT,REPLICATIONSLAVE,复制客户端*。*TO'canal'@'%';FLUSHPRIVILEGES;ALTERUSER'canal'@'%'IDENTIFIEDWITHmysql_native_passwordBY'canal';创建订单表CREATETABLE`orders`(`id`intunsignedNOTNULLAUTO_INCREMENT,`name`varchar(255)DEFAULTNULL,PRIMARYKEY(`id`));2)canal部署#canal.properties#包含在附带的信息中,放在服务器的/opt/data/canal/目录下#修改servers为你的kafka机器地址canal.serverMode=kafkakafka.bootstrap.servers=52.82.98.209:10903,52.82.98.209:10904#docker-compose.yml#附件中有canal.yml,随便找个目录重命名为docker-compose.yml#修改mysql链接信息的链接信息#然后执行docker-composeup在当前目录-dversion:'2'services:canal:image:canal/canal-servercontainer_name:canalresstart:alwaysports:-"10908:11111"environment:#mysql链接信息canal.instance.master.address:52.82.98.209:3389canal.instance.dbUsername:canalcanal.instance.dbPassword:canal#putintokafkawhichsubject?做好准备!canal.mq.topic:canalvolumes:-"/opt/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"3)数据通道验证进入kafka容器,使用上面3.2.4中的命令行监听canal队列。/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topiccanal在mysql上创建一个orders表,并增删数据。Trymysql>insertintoorders(name)values('ZhangSan');QueryOK,1rowaffected(0.03sec)在kafka控制台可以看到同步消息{"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"intunsigned","名字":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}数据通道已经打开,还缺的是druid作为consumer接收消息4)druid部署#druid.yml#在附带资料中,#随便找一个目录,执行docker-compose-fdruid.ymlup-d5)验证druid配置的数据源,从kafka读取数据,验证数据可以正确输入druid注意:关于druid的详细使用,会在大数据章节详细讲解。7.3kafka监控7.3.1eagle介绍KafkaEagle监控系统是一款监控Kafka集群的工具,支持管理多个Kafka集群,管理Kafka主题(包括查看、删除、创建等),消费者组合消费者实例监控,消息阻塞告警、Kafka集群健康状态检查等7.3.2部署推荐docker-composestart复制提供数据中的eagle.yml到服务器任意目录下,修改对应ip地址为你服务器地址#注意ip地址:52.82.98.209,全部替换成版本你自己的服务器:'3'服务:zookeeper:图像:zookeeper:3.4.13kafka-1:container_name:kafka-1图像:wurstmeister/kafka:2.12-2.2.2端口:-10903:9092-10913:10913环境:KAFKA_BROKER_ID:1HOST_IP:52.82.98.209KAFKA_ZOOKEEPER_CONNECT:zookeeper:2181#Docker部署必须设置外部可访问的ip和端口,否则注册到zk中的地址将无法访问,无法进行外部连接。KAFKA_ADVERTISED_HOST_NAME:52.82.98.209KAFKA_ADVERTISED_PORT:10903KAFKA_.management.jmxremote=true-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false-Djava.rmi.server.hostname=52.82。98.209-Dcom.sun.management.jmxremote.rmi.port=10913"JMX_PORT:10913volumes:-/etc/localtime:/etc/localtimedepends_on:-zookeeperkafka-2:container_name:kafka-2image:wurstmeister/kafka:2.12-2.2.2ports:-10904:9092-10914:10914environment:KAFKA_BROKER_ID:2HOST_IP:52.82.98.209KAFKA_ZOOKEEPER_CONNECT:zookeeper_HOSTVESNAME:2181KAFKA_HOSTVESNAME:52.82.98.209KAFKA_ADVERTISED_PORT:10904KAFKA_JMX_OPTS:“-Dcom.sun.management.jmxremote=true-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false-Djava.rmi.server.hostname=52.82.98.209-Dcom.sun.management.jmxremote.rmi.port=10914"JMX_PORT:10914volumes:-/etc/localtime:/etc/localtimedepends_on:-zookeepereagle:image:gui66497/kafka_eaglecontainer_name:ke重新启动:总是依赖于:-kafka-1-kafka-2ports:-"10907:8048"environment:ZKSERVER:"zookeeper:2181"执行docker-compose-feagle.ymlup-d7.3.3指令访问:http://52.82.98.209:10907/ke/default用户名密码:admin/123456如果要删除topic等操作,需要管理token:keadmin还是km选哪个?根据自己的习惯,个人认为:界面比km更漂亮,监控曲线更差,登录权限控制功能没有km简单明了,但是km需要配置一定的连接信息发布者教研组,转载请注明出处!如果本文对您有帮助,请关注并点赞;有什么建议也可以留言或私信。您的支持是我坚持创作的动力