当前位置: 首页 > 后端技术 > Python

mysql海量数据表归档

时间:2023-03-26 13:09:18 Python

在日常业务系统中,往往与用户相关的操作数据或日志记录越来越多,但前期数据查询需求不是那么强烈,所以可以进行归档——因为Colddata是从在线数据库中保存和删除。普通数据表归档简单的数据归档很容易实现。比如有这样一张表:CREATETABLE`operation_log`(`id`bigint(20)NOTNULLAUTO_INCREMENT,`create_date`dateDEFAULTNULL,`user_id`int(11)DEFAULTNULL,`type`varchar(20)NOTNULL,`amount`decimal(20,8)NOTNULL,PRIMARYKEY(`id`),KEY`ix_date`(`create_date`),KEY`ix_type_date`(`type`,`create_date`))可以查询根据时间,选择指定日期之前的数据,保存,然后从数据库中删除。示例代码如下(python+sqlalchemy):#已经获取到数据库连接,或者session#找出所有旧数据记录=connection.execute('select*fromoperation_logwherecreate_date<"2022-06-01"')#保存查看输出数据发送到文件,或者冷备数据库#......#删除输出数据connection.execute('deletefromoperation_logwherecreate_date<"2022-06-01"')海量数据表归档但是如果这张表存储的数据量很大,比如整个表有10亿条记录,1000万条结果集。显然,这种鲁莽的执行是不能执行的:将1000万个结果集全部加载到应用服务器是非常危险的,而且通常会持续到out-of-memory'create_date'字段中虽然有索引,但是搜索速度显然不如主键。对于问题1,解决方案是分段查询。每次迭代找出一个batch,处理一个batch,直到处理完所有符合条件的数据;对于问题2,可以利用主键id的自增特性(按记录生成顺序递增),将大部分查询转化为基于主键的条件。代码示例如下:#已经获取到数据库连接connection,或者sessionBATCH_SIZE=100000#设置批量数据为??100000FINAL_DATE=datetime.date(2022,6,1)records=connection.execute('selectidfromoperation_logorderbyidlimit1')batch_end_id=records[0]['id']#将迭代id初始化为batch_end_date=datetime.date(1980,1,1)#设置一个非常早的开始日期whilebatch_end_date=FINAL_DATE:#只有最后一次迭代,以date为条件,保证删除的数据不超过connection.execute('从operation_log中删除,其中id<{}和create_date<"{}"'.format(batch_end_id,FINAL_DATE.strftime('%Y-%m-%d')))breakelse:#正常迭代只有id作为查询条件,速度会快很多connection.execute('deletefromoperation_logwhereid<{}'.format(batch_end_id))