我是一个奋斗在编程链条底层的pythoner。一边维护erp系统,一边处理数据,web网站的友好“交流”……时不时分享一下工作中遇到的,包括我认为值得记录的编程技巧,还有遇到的问题和解决办法,还有源码阅读等等,编程可能会有人生感悟,更何况,我要重构我的程序了很多转贴的答案都是给被忽悠的人(因为我也是)。一方面,我也给自己做个笔记,方便以后查需求的原因。sql存储过程执行数据更新和修改操作。由于数据量大,对数据库的压力太大。因此,需要重构程序,使用python读取文件来计算和处理数据,以减轻这部分压力。最后只需要调用aws的lambda服务,将计算结果重新更新数据库即可,大大降低了压力和成本。涉及数据库主要是insert和update操作版本库信息基于linux系统写的一个三方库>>>pandas1.0.5,pymysql0.9.3python版本>>>3.7标准库>>os逻辑梳理其实,最后一步是将写入数据库的File数据存入内存。因为读取文件后的计算都是在内存中进行的,不需要把计算结果写到本地,然后读取,再写入数据库,这样会影响程序的运行效率。逻辑如下读取文件的拼接计算,生成新的df,初始化数据库连接,将df需要的数据转换成tuple数据(取决于数据库第三方库的接口如何支持batch操作)将数据写入数据库查看数据库的内容来实现和分析读取文件并逐步给出文件路径,然后读取文件。我强调一下需要注意的地方。绝对路径:这个最简单,直接给路径字符串就行了,但是一旦文件夹目录结构发生变化,就需要经常更改相对路径:我一般喜欢先在脚本中定位到当前脚本,然后再查找它通过相对路径,所以只要你整个包里面的目录结构没有变化,你就不用改,即使你部署网上也是直接根据包的位置。非常方便的是pandas默认会将所有数字读取为float类型,所以对于看起来像数字但实际上需要作为字符串使用的字段,进行类型转换importpandasaspdimportnumpyasnp#当前脚本的位置current_folder_path=os.path。dirname(__file__)#你的文件所在位置your_file_path1=os.path.join(current_folder_path,"文件名1")your_file_path2=os.path.join(current_folder_path,"文件名2")#我正在取以读取csv文件为例,delimiter是我们内部约定的列之间的分隔符df1=pd.read_csv(your_file_path1,dtype={"column1":str,"column2":str},delimiter="/t")df2=pd.read_csv(your_file_path2,dtype={"column1":str,"column2":str},delimiter="/t")文件和计算文件的拼接,主要是merge和concat这两种语法的使用,强调小知识点合并语法主要对应SQL的innerjoin,outerjoin,leftjoin,rightjoin语言。Concat主要用于简单拼接具有相同结构的df(即列表的总行数增加了)#这里以leftjoin为例,假设只有两个文件拼接ret_df=pd.merge(df1,df2,left_on=["column_name"],right_on=["column_name"],how="left")初始化连接并导入第三方库pymysql,并初始化连接#pymysql接口获取链接defmysql_conn(host,user,password,db,port=3306,charset="utf8"):#pass参考版本conn=pymysql.connect(host=host,user=user,password=password,database=db,port=port,charset=charset)returnconn对应接口转换data数据插入要考虑写事务,因为如果失败了,必须保证对数据库没有影响。结构符合相应接口的数据格式,通过查询,pymysql有两个接口可以执行语句execute(singleinsertstatement)执行单条语句的接口类似这样:Insertintotable_name(column)values(价值);executemany(batchinsertstatement)执行多条语句的接口类似这样:Insertintotable_name(column1,column2,column3)values(value1,value2,value3);具体实现如下#首先创建一个游标来操作conn接口conn=mysql_conn("yourdbhost","yourusername","yourpassword","dbname")cursor=conn.cursor()#开启事务conn.begin()##############构造批量数据的过程##############构造需要的或者匹配数据库的列firstcolumns=list(df.columns)#可以删除不需要的列或数据库中没有的列名columns.remove("Columnname")#重构df,使用上面的column,这里要保证你所有的column都准备好写入数据库new_df=df[columns].copy()#构造符合sql语句的columns,因为sql语句是用逗号分隔的,(这个对应上面sql语句的(column1,column2,column3))columns=','.join(list(new_df.columns))#构造每列对应的数据,对应于上面的((value1,value2,value3))data_list=[tuple(i)foriingdsord_df.values]#每个tuple是一条数据,根据df的行数生成多少条tuple数据#计算一行需要多少个value值使用字符串占位符s_count=len(data_list[0])*"%s,"#构造sql语句insert_sql="insertinto"+"数据库表名"+"("+columns+")values("+s_count[:-1]+")"往数据库中写入数据很简单,直接上代码cursor.executemany(insert_sql,data_list)conn.commit()cursor.close()conn.close()Check数据库是否插入成功,如果没有问题,可以同时读写多个文件,计算,最后开启多线程同时向数据库写入数据,非常高效!完整代码importpandasaspdimportnumpyasnp#pymysqlinterfacedefmysql_conn(host,user,password,db,port=3306,charset="utf8"):conn=pymysql.connect(host=host,user=user,password=password,database=db,port=port,charset=charset)returnconn#当前脚本的位置current_folder_path=os.path.dirname(__file__)#你的文件的位置your_file_path1=os.path.join(current_folder_path,"文件名1")your_file_path2=os.path.join(current_folder_path,"文件名2")#我这里以读取csv文件为例,分隔符是列之间的分隔符我们在内部同意df1=pd.read_csv(your_file_path1,dtype={"column1":str,"column2":str},delimiter="/t")df2=pd.read_csv(your_file_path2,dtype={"column1":str"column2":str},delimiter="/t")#mergeret_df=pd.merge(df1,df2,left_on=["column_name"],right_on=["column_name"],how="left")#先创建cursor负责操作conn接口conn=mysql_conn("yourdbhost","yourusername","yourpassword","dbname")cursor=conn.cursor()#开启事务conn.begin()#首先构造需要的or匹配数据库的列columns=list(df.columns)#可以删除不需要的列或者数据库没有的列名columns.remove("列名")#重构df,使用上面的列,这里要保证你所有的专栏都准备写入数据库new_df=df[columns].copy()#构造匹配sql语句的列,因为sql语句是用逗号分隔的,(这个对应上面的sql语句(column1,column2,column3))columns=','。join(list(new_df.columns))#构造每一列对应的数据,对应上面的((value1,value2,value3))data_list=[tuple(i)foriingdsord_df.values]#每个tuple是一个数据,根据df行数生成多少个tuple数据#计算一行有多少个值,需要用到一个要占用空间的字符串s_count=len(data_list[0])*"%s,"#构造sql语句insert_sql="insertinto"+"数据库表名"+"("+columns+")values("+s_count[:-1]+")"try:cursor.executemany(insert_sql,data_list)conn.commit()cursor.close()conn.close()exceptExceptionase:#如果失败,需要回滚操作conn.rollback()cursor.close()conn.close()
