Python+Sqlalchemy批量插入或更新(Upsert)到数据库插入)。由于不同的数据库对这种upsert有不同的实现机制,因此SQLAlchemy不再尝试做一致的封装,而是提供了自己的dialectAPI。具体来说,对于Mysql,它在insert语句中增加了on_duplicate_key_update方法。基本用法假设表数据模型如下:Column(db.Integer,primary_key=True)a=db.Column(db.Integer)b=db.Column(db.Integer)c=db.Column(db.Integer)其中id是自增主键,a和b构成唯一索引。那么对应的upsert语句如下:fromsqlalchemy.dialects.mysqlimportinsertinsert(TableA).values(a=1,b=2,c=3).on_duplicate_key_update(c=3)复用值类似SQL语句,我们可以不用每次都重复insert和updatevalues:update_keys=['c']insert_stmt=insert(table_cls).values(a=1,b=2,c=3)update_columns={x.name:xforxininsert_stmt.insertedifx.nameinupdate_keys}upsert_stmt=insert_stmt.on_duplicate_key_update(**update_columns)db.session.execute(upsert_stmt)注意最后一句on_duplicate_key_update的参数需要展开,dict不接受作为批处理的参数。insert语句支持传递一组数据作为参数:records=[{'a':1,'b':2,'c':3},{'a':10,'b':20,'c':4},{'a':20,'b':30,'c':5}]update_keys=['c']insert_stmt=insert(table_cls).values(records)update_columns={x.name:xforxininsert_stmt.insertedifx.nameinupdate_keys}upsert_stmt=insert_stmt.on_duplicate_key_update(**update_columns)db.session.execute(upsert_stmt)可以实现整体upsert。封装观察上面的代码,其实upsert部分与业务无关,所以可以封装一个更方便的通用函数来调用:fromsqlalchemy.dialects.mysqlimportinsertdefupsert(table_cls,records,except_cols_on_update=[]):update_keys=[keyforkeyinrecords[0].keys()ifkeynotinexcept_cols_on_update]insert_stmt=insert(table_cls).values(chunk)update_columns={x.name:xforxininsert_stmt.insertedifx.nameinupdate_keys}upsert_stmt=insert_stmt.on_duplicate_key_update(**update_columns)db.session.execute(upsert_stmt)批量生成上面的封装,可以做一些改进:为了防止记录数据集过大,可以批量执行sql语句,通过参数决定是否提交:fromsqlalchemy.dialects.mysqlimportinsertdefupsert(table_cls,records,chunk_size=10000,commit_on_chunk=True,except_cols_on_update=[]):update_keys=[keyforkeyinrecords[0].keys()如果键不在except_cols_on_update]foriinrange(0,len(records),chunk_size):chunk=records[i:i+chunk_size]insert_stmt=insert(table_cls).values(chunk)update_columns={x.name:xforxininsert_stmt.insertedifx.nameinupdate_keys}upsert_stmt=insert_stmt.on_duplicate_key_update(**update_columns)db.session.execute(upsert_stmt)ifcommit_on_chunk:db.session.commit()调用方法如下:upsert(TableA,records,chunk_size=50000,commit_on_chunk=True,except_cols_on_update=['id','a','b'])这时候记录数可以很大,比如作为1000万。生成一条5万条的sql语句,执行后commit(若参数commit_on_chunk=False,则函数不会一直提交,结束后统一提交即可),update语句中,避免更新'id','a','b'这三个字段链接到我的原文
