当前位置: 首页 > 后端技术 > Python

半个小时,把你的SparkSQL模型变成在线服务

时间:2023-03-26 11:50:25 Python

SparkSQL对第四范式在机器学习场景的应用,已经在金融行业的反欺诈、媒体行业的新闻推荐、能源行业的管道检测等多个行业实现了数以万计的AI应用。SparkSQL在这些AI应用中被快速实现。特征转换起着重要作用。SparkSQL在特征转换上主要有以下几种多表场景,用于表之间的拼接操作,比如交易信息表拼接账户表。UDF用于简单的特征转换,例如时间戳的小时。函数处理使用时间窗和udaf进行时间序列特征处理,比如计算一个人最后一天的消费总和。SparkSQL目前为止已经解决了离线模型训练的特征转换问题,但是随着AI应用的发展,大家对模型的期待不再只是为了得到离线研究的效果,而是要在真正的业务中发挥作用场景,真正的业务场景是模型应用场景,需要高性能和实时推理。这时候,我们会遇到下面的问题,就是如何将离线的多表数据映射到线上,即批量训练过程中输入很多表,这些表在在线环境中应该以什么形式存在?这也会影响到整个系统架构。如果做得好,可以提高效率。如果好的话,会大大增加从模型中产生商业价值的成本。将SQL转化为实时执行的成本很高,因为在线推理需要高性能,数据科学家可能会创建数千个特征,每一个都转化为人肉,这将大大增加工程成本,很难保持离线功能与在线功能一致。手动转换会导致性能一致,而且往往很难保持一致。线下效果很好,线上效果不能满足业务需求。在具体的反欺诈场景下,模型应用需要tp9920ms来检测一笔交易是否存在欺诈,因此对模型应用的性能要求非常高。第四范式特征工程数据库是如何解决这些问题的呢?通过特征工程数据库,补充了SparkSQL的能力。以数据库的形式,解决了离线表到在线的映射问题,我们上面给出的答案是离线表是怎么分配的,在线表是怎么分配的。使用同一套代码进行离线和在线特征转换,保证了在线模型的效果。数据科学家与业务开发团队的合作,以sql为媒介进行传递,而不是手动转换代码,大大提高了模型迭代的效率。llvm加速的sql相比spark2可以加速2到3倍。内存存储可以保证SQL以极低的延迟返回结果,快速将SparkSQL模型转化为实时服务demomo模型训练场景。为了预测打车到终点所需的时间,这里我们将使用fedb、pyspark、lightgbm等工具最终会搭建一个http模型推理服务,这也是spark在机器学习场景的实践。整个demo有200多行代码,制作时间不超过半小时。train_sql.py特征计算和训练,80行代码predict_server.pymodelInferencehttp服务,129行代码场景数据和特征介绍整个训练数据看起来像下面的样本数据`id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_durationid3097625,1,2016-0116:01:00,2016-2216:15:16,2,-73.97746276855469,40.7613525390625,-73.955734292969,40.7723960876484484,8566696721607,28,07,28,07,28,07,28,07,28,07,28,07,07,28,07,28,07,28,07,28,07,28,07,28,07,28。2807:40:16,1,-73.98524475097656,40.7597777832031,-73.99615478515625,40.7294578522461,1198ID0224515,27,48:27,2016-6,016-16-0173.98342895507812,40.7500114440918,-73.97383880615234,40.74980163574219,N,303id3370903,1,2016-01-1411:46:43,2016-01-1412:25:33,2,-74.00027465820312,40.74786376953125,-73.86485290527344,40.77039337158203,N,2330id2763851,2,2016-02-2013:21:00,2016-02-2013:45:56,1,-73.95218658447266,40.772220611572266,-73.9920425415039,40.74932098388672,N,1496id0904926,1,2016-02-2019:17:44,2016-02-2019:33:19,4,-73.97344207763672,40.75189971923828,-73.98480224609375,40.76243209838867,N,935id2026293,1,2016-02-2501:16:23,2016-02-2501:31:27,1,-73.9871597290039,40.68777847290039,-73.9115219116211,40.68180847167969,N,904id1349988,0,1,22016-01-2820:21:33,1,-74.0028076171875,40.7338752746582,-73.9968032836914,40.74370599365234,331ID3218692,2016-02-1716:27,2016-02-1716:54:54:54:54:54:00,5,-73.98147583007812,40.77408218383789,-73.97216796875,40.76400375366211,N,674`场景特征变换sql脚本特征变换`selecttrip_duration,passenger_count,sum`(pickup_latitude)overwasvendor_sum_pl,`max`(pickup_latitude)overwasvendor_max_pl,`min`(pickup_latitude)overwasvendor_min_pl,`avg`(pickup_latitude)overwasvendor_avg_pl,`sum`(pickup_latitude)overw2aspc_sum_pl,`max`(pickup_latitude)overw2aspc_max_pl,`min`(pickup_latitude)overw2aspc_min_pl,`avg`(pickup_latitude)overw2aspc_avg_pl,`count`(vendor_id)overw2aspc_cnt,`count`(vendor_id)overwasvendor_cnt`from{}windowwas(partitionbyvendor_idorderbypickup_datetimeROWS_RANGEBETWEEN1dPRECEDINGANDCURRENTROW),w2as(分区bypassenger_countorderbypickup_datetimeROWS_RANGEBETWEEN1dPRECEDINGANDCURRENTROW)`我们选择了vendor_id和passenger_count的两个维度作为时间序列特征`train_df=spark.sql(train_sql)#将您??的配置指定为dictparams={'boosting_type'`:'gbdt'`,'objective'`:'regression'`,'metric'`:{'l2','l1'`},'num_leaves':31`,'learning_rate'`:0.05`,'feature_fraction'`:0.9`,'bagging_fraction'`:0.8`,'bagging_freq'`:5`,'verbose'`:0`}print`('开始训练...')`gbm=lgb.train(params,lgb_train,num_boost_round`=20,`valid_sets`=`lgb_eval,early_stopping_rounds`=5)`gbm.save_model(`'model.txt')执行模型训练过程,最后生成model.txt模型推理过程导入数据代码import`definsert_row(行):row=line.split(`','`)row[`2]``=``'%dl'%int(datetime.datetime.strptime(row[2],'%Y-%m-%d%H:%M:%S').timestamp()``*``1000)`row[`3]``=``'%dl'%int(datetime.datetime.strptime(row[3],'%Y-%m-%d%H:%M:%S').timestamp()``*``1000)`insert="insertintot1values('%s',%s,%s,%s,%s,%s,%s,%s,%s,'%s',%s);"`%tuple`(row)driver.executeInsert(`'db_test'`,insert)withopen`('data/taxi_tour_table_train_simple.csv','r'`)asfd:idx=0forlineinfd:ifidx=`=0`:idx=idx+1continueinsert_row(line.replace(`'n'`,''))idx=idx+1`注:train.csv为训练数据csv格式本模型推导预测.pydef``post(self`):row=json.loads(`self`.request.body)ok,req=fedb_driver.getRequestBuilder(`'db_test'`,sql)ifnotokornotreq:self`.write("failtogetreq")`returninput_schema=req.GetSchema()ifnotinput_schema:self`.write("noschema发现”)`returnstr_length=0foriinrange`(input_schema.GetColumnCnt()):`ifsql_router_sdk.DataTypeName(input_schema.GetColumnType(i))=`='string'`:str_length=str_length+len`(row.get(input_schema.GetColumnName(i),''))`req.Init(str_length)foriinrange`(input_schema.GetColumnCnt()):`tname=sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))iftname=`='string'`:req.AppendString(row.get(input_schema.GetColumnName(i),''))eliftname=`='int32'`:req.AppendInt32(`int(row.get(input_schema.GetColumnName(i),``0)))`eliftname=`='double'`:req.AppendDouble(`float(row.get(input_schema.GetColumnName(i),``0)))`eliftname=`='timestamp':req.AppendTimestamp(`int(row.get(input_schema.GetColumnName(i),``0)))`else`:`req.AppendNULL()ifnotreq.Build():self`.write("failtobuildrequest")`returnok,rs=fedb_driver.executeQuery(`'db_test'`,sql,req)ifnotok:self`.write("执行sql失败")`returnrs.`Next`()ins=build_feature(rs)self`.write("-----------------ins----------------\n")`self`.write(str(ins)+"n"`)duration=bst.predict(ins)self`.write("------------预测行程持续时间------------\n")`self`.write("%ss"%str(duration[0]))``最终执行结果`#发送推理请求,会看到如下输出python3predict.py----------------ins------------[[2.40.77409740.77409740.77409740.77409740.77409740.77409740.77409740.7740971.1.]]]----------------predicttrip_duration----------859.3298781277192s`去https://github.com/运行演示4paradigm/SparkSQLWithFeDB