当前位置: 首页 > 后端技术 > Java

大数据量、高并发业务如何优化?(1)

时间:2023-04-02 09:12:22 Java

博主这里的大数据量高并发业务处理优化是基于博主在线项目实践和全网数据,在此分享给大家。1.大数据量上传写入优化线上业务后台项目有消息推送功能。通过上传包含用户id的文件,将系统消息推送给指定的用户。1.1上面的功能描述很简单,但是对于技术方面来说,要想做好这个功能,要保证用户数(比如达到百万级别),系统运行正常,以及功能正常。其实需要慎重考虑。博主在这里给出思路:上传文件类型选择。通常,大多数用户会使用excel文件,但有一个比excel文件更推荐的。文件格式为csv文件。与excel文件相比,可以直接在记事本中编辑。Excel也可以打开cvs文件,而且占用内存少(划重点)。对于上传的csv文件太大,也可以使用streamingRead,无论推送成功与否,都读一部分写一部分消息。状态已保存。由于大批量数据的插入是一个比较耗时的操作(可能几秒或者几分钟),所以需要保存批量插入是否成功的状态,可以在后台显示出来。这条消息推送记录是成功还是失败,方便操作追溯消息推送状态,批量写入启用或禁用事务博主这里有两种解决方案。优缺点:开启事务:好处是,比如在批量插入的过程中,异常可以保证原子性,但是性能比不开启事务要低一些,在大额的情况下会显着降低数据的。不开启事务:优点是写入性能高,大数据量的写入性能有明显提升,但是不能保证原子性,但是对于批量插入新加入的数据只会产生脏数据,它如果功能设计合理,不会影响业务,如下面第四点:在大数据量的情况下,如果追求极致性能,就不能开启事务。选择也需要你结合自己的业务情况推送异常失败的消息处理建议。功能设计上,可以在操作前屏蔽失败消息,这样就不需要处理之前推送失败写入的脏数据,直接添加消息推送即可1.2批量写入代码优化jdbc参数携带rewriteBatchedStatements=true在jdbc驱动上启动批量写入功能,如下spring.datasource.master.jdbc-url=jdbc:mysql://localhost:3306/test_db?allowMultiQueries=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&rewriteBatchedStatements=true启用insertintotable(id,name)values(1,'tom'),(2,'jack')模式,建议一次不要写太多,MySQL有限制lengthofsql,对于这种字段很少的表来说,一次写500-1000条问题不大。如果字段太多,需要减少写量。插入im_notice_app_ref(notice_id,app_id,create_time)values(#{item.noticeId},#{item.appId},#{item.createTime})一般第二项优化大家都知道,但是jdbc参数可能会被忽略携带rewriteBatchedStatements=true,这个参数可以在第二项的基础上开启批量执行SQL,进一步提高写入性能大家都知道,但是高并发下不建议使用。建议通过程序化事务手动控制事务提交或回滚,减少事务影响范围。下面是一段代码,用于回滚订单超时时未支付的业务数据,使用@Transactional事务注解@Transactional(rollbackFor=Exception.class)publicvoiddoUnPaidTask(LongorderId){//1.查询是否订单存在Orderorder=orderService.getById(orderId);if(order==null){thrownewBusinessException(String.format("订单不存在,orderId:%s",orderId));}if(order.getOrderStatus()!=OrderStatusEnum.ORDER_PRE_PAY.getOrderStatus()){thrownewBusinessException(String.format("订单状态错误,order:%s",order));}//2.设置订单为取消order.setOrderStatus((byte)OrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus());order.setUpdateTime(newDate());if(!orderService.updateById(order)){thrownewBusinessException("更新数据已过期");}//3.items数量添加LambdaQueryWrapperqueryWrapper=Wrappers.lambdaQuery();queryWrapper.eq(OrderItem::getOrderId,orderId);ListorderItems=orderItemService.list(queryWrapper);for(OrderItemorderItem:orderifItems){getSeckillId()!=null){//秒杀项目处理LongseckillId=orderItem.getSeckillId();SeckillServiceseckillService=SpringContextUtil.getBean(SeckillService.class);if(!seckillService.addStock(seckillId)){thrownewBusinessException("秒杀商品库添加库存失败");}}else{//普通单商品项目处理LonggoodsId=orderItem.getGoodsId();IntegergoodsCount=orderItem.getGoodsCount();if(!goodsDao.addStock(goodsId,goodsCount)){thrownewBusinessException("秒杀商品增加库存失败");}}}//4.返回优惠券couponService.releaseCoupon(orderId);log.info("---------------orderorderId:{},未付超时取消成功",orderId);}使用程序化事务优化,代码如下:@ResourceprivatePlatformTransactionManagerplatformTransactionManager;@ResourceprivateTransactionDefinitiontransactionDefinition;publicvoiddoUnPaidTask(LongorderId){//开启程序化交易//1.开启交易前检查订单是否存在order=orderService.getById(orderId);if(order==null){thrownewBusinessException(String.format("订单不存在,orderId:%s",orderId));}if(order.getOrderStatus()!=OrderStatusEnum.ORDER_PRE_PAY.getOrderStatus()){抛出新的BusinessException(String.format("订单状态错误,order:%s",order));}//2.打开事务TransactionStatustransaction=platformTransactionManager.getTransaction(transactionDefinition);try{//3.设置订单为取消订单.setOrderStatus((byte)OrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus());order.setUpdateTime(newDate());if(!orderService.updateById(order)){thrownewBusinessException("更新的数据已过期");}//4.增加项目数LambdaQueryWrapperqueryWrapper=Wrappers.lambdaQuery();queryWrapper.eq(OrderItem::getOrderId,orderId);ListorderItems=orderItemService.list(queryWrapper);for(OrderItemorderItems:){if(orderItem.getSeckillId()!=null){//秒杀项目处理LongseckillId=orderItem.getSeckillId();SeckillServiceseckillService=SpringContextUtil.getBean(SeckillService.class);RedisCacheredisCache=SpringContextUtil.getBean(RedisCache.class);if(!seckillService.addStock(seckillId)){thrownewBusinessException("秒杀商品增加库存失败");}redisCache.increment(Constants.SECKILL_GOODS_STOCK_KEY+seckillId);redisCache.deleteCacheSet(Constants.SECKILL_SUCCESS_USER_ID+seckillId,order.getUserId());}else{//普通单品处理LonggoodsId=orderItem.getGoodsId();整数goodsCount=orderItem.getGoodsCount();if(!goodsDao.addStock(goodsId,goodsCount)){thrownewBusinessException("库存增加失败");}}}//5.返回优惠券couponService.releaseCoupon(orderId);//6.所有更新操作完成后,提交事务platformTransactionManager.commit(transaction);log.info("----------------订单orderId:{},取消成功且支付超时",订单号);}catch(Exceptione){log.info("------------订单orderId:{},支付超时取消失败",orderId,e);//7.发生异常时,回滚事务platformTransactionManager.rollback(transaction);}}可以看到,采用程序化事务后,我们将查询逻辑排除在事务之外,降低其影响范围,提升性能,在高并发场景和性能优先场景下,甚至可以考虑不适用的事务3.客户端海量日志上报优化线上项目客户端,使用tcp协议与日志采集服务建立连接,业务高峰期上报日志数据,会有上千个客户端同时建立连接,实时上报日志数据。如上面的场景,在高峰期,会对日志采集服务造成很大的压力。服务处理不当会导致高峰期服务卡顿和CPU占用。太高,内存不足等这里针对高并发海量日志的优化点:异步处理的上报日志,普通版:使用阻塞队列ArrayBlockingQueue获取生产者-消费者模式,对日志进行异步批处理数据。缓存存储在内存中,然后在消费者中批量存储,存储在仓库中。进阶版:使用了Disruptor队列,也是基于内存队列的生产者消费者模型。消费速度相比ArrayBlockingQueue有一个数量级的性能提升,简单介绍:https://www.jianshu.com/p/bad...终极版:使用kfaka消息队列中间件持久化日志数据并消费慢慢地。虽然第三方依赖的引入会增加系统的复杂度,但相比于kfaka在大数据场景下所提供的优秀性能,还是值得的。以上三种方案:可以根据自己项目的实际量选择采集日志。压缩。如果要将上报的日志发送到其他服务,建议压缩它们以避免消耗过多的网络带宽和最终数据存储选项。类型:网络传输,通常是指Java中的序列化方式。Jdk的序列化方式与Protobuf、fst、Hession等相比,在序列化速度和大小上没有优势,甚至可以用垃圾来形容。博主这里有一个比较Java中几种序列化方式的直接链接:https://segmentfault.com/a/11...,传输大小要求高推荐使用Avro序列化,综合要求高推荐使用Protobuf存储类型选择,比如数据量大的日志,都是新增未修改的场景。推荐使用Clickhouse进行存储。与MySql相比,同等数据量下,占用存储空间更少,查询性能更高。最后附上博主。github地址:https://github.com/wayn111欢迎大家点赞、收藏、转发。您的支持将是博主更新文章的动力