当前位置: 首页 > 科技观察

微服务架构:使用事件驱动实现最终一致性

时间:2023-03-19 20:24:43 科技观察

事务一致性首先我们回顾一下ACID原则:Atomicity:原子性,改变数据状态要么一起完成,要么一起失败Consistency:一致性,数据的状态是完整一致Isolation:隔离线,即使有并发事务,也不会相互影响Durability:持久性,事务一旦提交,就无法撤销在单体应用中,我们可以利用关系数据库的特性来完全的事务一致性,但是一旦应用向微服务方向发展,就会按照业务拆分成不用的模块,每个模块的数据库已经分离。这时候我们要面对的就是分布式事务,需要在代码上去完成ACID。目前比较流行的方案有:两阶段提交、补偿机制、本地消息表(使用本地事务和MQ)、MQ事务消息(RocketMQ)。CAP定理1998年,加州大学计算机科学家EricBrewer提出分布式系统存在三个指标。Consistency:一致性Availability:可用性Partitiontolerance:分区容错性EricBrewer说这三个指标不能同时达到。这个结论被称为CAP定理。在微服务中,不同模块之间使用的数据库不同,不同模块之间部署的服务可能不会被使用,分区容错是不可避免的,因为服务之间的调用不能保证100%无缝。问题,所以系统设计必须考虑到这种情况。因此,我们可以认为CAP的P总是成立的,剩下的C和A不可能同时成立。实际上,根据分布式系统中的CAP原则,当P(PartitionTolerance)出现时,强行追求C(Consistency)会导致(A)可用性和吞吐量下降。这个时候我们一般会使用最终一致性来保证我们系统的AP能力。当然不是放弃C,而是放弃强一致性,一般情况下CAP可以保证,但是在分区容错的情况下,我们可以使用最终一致性来保证数据的一致性。事件驱动实现最终一致性事件驱动架构通过异步消息同步领域对象之间的状态。一些消息也可以同时发布到多个服务。该消息引起一个服务的同步后,可能又引起另一个消息。该事件将传播出去。严格意义上的事件驱动没有同步调用。例子:在电子商务中,当用户下订单时,他必须根据库存来判断订单是否售出。项目架构:SpringBoot2+Mybatis+tk-Mybatis+ActiveMQ【因为例子小,就不做成SpringCloud架构了】首先我们看下正常服务之间的调用:代码:@Override@Transactional(rollbackFor=Exception.class)publicResultplaceOrder(OrderQueryquery){Resultresult=newResult();//先远程调用Stock-Service减少库存RestTemplaterestTemplate=newRestTemplate();//请求头HttpHeadersheaders=newHttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);//封装创建请求对象HttpEntityentity=newHttpEntity(query,headers);//同步调用库存服务的接口ResultstockResult=restTemplate.postForObject("http://127.0.0.1:8081/stock/reduceStock",entity,结果类);如果(stockResult.getCode()==Result.ResultConstants.SUCCESS){Orderorder=newOrder();BeanUtils.copyProperties(query,order);order.setOrderStatus(1);IntegerinsertCount=orderMapper.insertSelective(order);if(insertCount==1){result.setMsg("下单成功");}else{result.setMsg("下单失败");}}else{result.setCode(Result.ResultConstants.FAIL);result.setMsg("下单失败下单:"+stockResult.getMsg());}returnresult;}我们可以看到这样的服务调用有很多缺点:1、订单服务需要同步等待库存服务的返回结果,接口结果的返回有延迟。2.订单服务直接依赖库存服务。一旦库存服务崩溃,订单服务就无法继续正常运行。3、订单服务需要考虑并发问题,库存可能最终为负数。下面我们使用事件驱动来实现最终一致性1、在订单服务中添加订单后,订单状态为“已开启”,然后向消息队列发布一个OrderCreated事件代码:@Transactional(rollbackFor=Exception.class)publicResultplaceOrderByMQ(OrderQueryquery){Resultresult=newResult();//先创建一个订单,状态为order0Orderorder=newOrder();BeanUtils.copyProperties(query,order);order.setOrderStatus(0);IntegerinsertCount=orderMapper.insertSelective(order);if(insertCount==1){//发送订单消息MqOrderMsgmqOrderMsg=newMqOrderMsg();mqOrderMsg.setId(order.getId());mqOrderMsg.setGoodCount(query.getGoodCount());mqOrderMsg.setGoodName(query.getGoodName());mqOrderMsg.setStockId(query.getStockId());jmsProducer.sendOrderCreatedMsg(mqOrderMsg);//此时订单只处于开启状态result.setMsg("下单成功");}returnresult;}2。库存服务在监听消息队列OrderCreated中的消息,从inventory表中的产品库存中减去订单数量,然后向消息队列发送一个StockLocked事件。代码:/***接收订单消息*@parammessage接收消息*@paramsession上下文*/@JmsListener(destination=ORDER_CREATE,containerFactory="myListenerContainerFactory")@Transactional(rollbackFor=Exception.class)publicvoidreceiveOrderCreatedMsg(Messagemessage,Sessionsession){try{if(messageinstanceofActiveMQObjectMessage){MqStockMsgresult=newMqStockMsg();ActiveMQObjectMessageobjectMessage=(ActiveMQObjectMessage)消息;MqOrderMsgmsg=(MqOrderMsg)objectMessage.getObject();IntegerupdateCount=stockMapper.updateNumByStockId(msg.getStockId(),msg.getGoodCount());if(updateCount>=1){result.setSuccess(true);result.setOrderId(msg.getId());}else{result.setSuccess(false);}//手动ack使消息出队列,否则会继续消费message.acknowledge();//发送股票锁消息给MQjmsProducer.sendStockLockedMsg(result);}}catch(JMSExceptione){log.error("收到订单创建消息错误:"+e.getMessage());}}细心的朋友可能看到:message.acknowledge(),即手动确认消息。因为在保证库存服务的逻辑可以正常执行后确认消息已经被消费,可以保证消息的投递可靠性。万一执行库存服务时出现异常,我们可以重新消费订单消息。3、订单服务收到StockLocked事件,将订单状态更改为“Confirmed”如果没有库存,则为2,并通知用户(WebSocket)*@parammessage*/@JmsListener(destination=STOCK_LOCKED,containerFactory="myListenerContainerFactory")@Transactional(rollbackFor=Exception.class)publicvoidreceiveStockLockedMsg(Messagemessage,Sessionsession){try{Active{if(messageinstanceofessActiveMQObjectMessage)ObjectMessage){ActiveMj消息;MqStockMsgmsg=(MqStockMsg)objectMessage.getObject();if(msg.isSuccess()){OrderupdateOrder=newOrder();updateOrder.setId(msg.getOrderId());updateOrder.setOrderStatus(1);orderMapper.updateByPrimaryKeySelective(updateOrder);log.info("订单["+msg.getOrderId()+"]下单成功");}else{OrderupdateOrder=newOrder();updateOrder.setId(msg.getOrderId());updateOrder.setOrderStatus(2);orderMapper.updateByPrimaryKeySelective(updateOrder);//通知用户库存不足,订单取消log.error("Order["+msg.getOrderId()+"]wascancelledduetostockunless");}//手动ack,制作消息出队列,否则继续消费message.acknowledge();}}catch(JMSExceptione){log.error("收到库存锁消息错误:"+e.getMessage());}}同理,我们这里也将使用手动确认消息来保证消息的传递可靠性至此,所有的事情都已经完成了。我们看一下它和普通的服务调用有什么区别:1.订单服务不再直接依赖库存服务,而是将订单事件发送给MQ进行库存监控。2、订单服务可以真正作为一个模块独立运行。3、解决了并发问题,MQ的队列处理效率非常高。但也存在以下问题:1.用户体验发生了变化:因为使用了事件机制,所以订单是立即生成的,但是很有可能过了一段时间,系统会提醒你缺货..就好比排队买东西,会提示没货了,不用再排队了。2.可能有很多订单没有入库。最后,如果真的要考虑用户体验,不想数据库中有很多无用的数据怎么办?然后聚合订单服务和库存服务。解决当前的问题应该是首要考虑的。设计微服务的目的是为了解决业务并发。但是我们现在面临的是用户体验的问题,所以架构设计也需要做出妥协。最重要的是,我们已经思考和分析了每个方案可以实现到什么程度,可以应用到哪些场景。俗话说技术要结合实际场景,不能为了追求新技术而生搬硬套。