当前位置: 首页 > 后端技术 > Java

Kafka事务介绍示例

时间:2023-04-01 22:49:39 Java

Kafka从0.2.11版本开始支持事务。本文档对Kafka事务进行了简单的描述,并给出了java代码示例,并对代码进行了一些简单的解释,以及相关的注意事项。希望对需要使用kafka事务的朋友有所帮助。2017年6月28日,Kafka正式发布0.11.0.0版本。从这个版本开始,Kafka支持事务。那么,Kafka中的事务是什么?Kafka事务支持生产者能够将一组消息作为单个事务发送,原子地成功或失败。例如,用户支付订单。订单支付后,需要通知库存模块减少库存,需要通知优惠券模型扣除优惠券。您需要使这两条消息都成功或失败。.这时候就可以使用kafka事务了。我们先看代码。第一步:引入依赖,在pom.xml中添加kafka客户端依赖org.apache.kafkakafka-clients2.7.0第二步:发送消息相关代码apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("transactional.id","my-transactional-id");KafkaProducerproducer=newKafkaProducer<>(props);producer.initTransactions();intc=0;while(true){if(c++>=3){中断;}尝试{producer.beginTransaction();ProducerRecordrecord=newProducerRecord<>("TEST1","消息a"+c);生产者.发送(记录);System.out.println("发送测试1"+c);制作人Recordrecord2=newProducerRecord<>("TEST2","消息b"+c);生产者.发送(记录2);System.out.println("发送测试2"+c);生产者.commitTransaction();}catch(RuntimeExceptione){System.out.println(e.getMessage());生产者.abortTransaction();}}前6行是构建KafkaProducer实例。在springboot项目中,通常会将这总点封装成一个方法,然后通过@Bean完成对象注入管理。bootstrap.servers对应你的kafka服务器地址,需要根据你本地实际情况修改。这类地址也应该放在配置文件中。.第8行producer.initTransactions();意思是初始化kafka事务,这个方法可以为每个生产者调用一次。接下来是一个循环控制,这里是为了表达producer对象可以被复用,发送多个事务消息。对于每笔交易,producer.beginTransaction()代表交易的开始,producer.send(record)代表本次交易需要发送的消息。在每个事务中,可以根据业务需求多次调用send方法,然后通过producer.commitTransaction()提交事务,如果出现异常,通过producer.abortTransaction()取消事务。注意:transactional.id的值不能重复。如果你的环境中只有一个节点,你可以直接使用一个固定的字符串作为这个值。但是如果你的程序需要支持横向扩展,比如两台或多台服务器同时运行你的代码,这时候就会出现问题。对于同一个transactional.id,新的KafkaProducer调用initTransactions后,原进程会报错。只有最新的过程才能正常工作。所以这时候需要保证在不同的节点运行时,获取到的transactional.id的值是不同的。可以使用UUID.randomUUID().toString()生成保证不重复的随机ID,也可以直接在不同实例的服务器配置不同的transactional.ids。对于同一个节点和多个事务的操作,可以使用同一个KafkaProducer,使用同一个transactional.id。参考文档:https://www.confluent.io/blog...