【.com快译】介绍MySQL是一个RDBMS平台,以表格格式以规范化的方式存储数据,将信息存储为集合中的文档分组。数据的表示方式完全不同,因此将MySQL表数据迁移到MongoDB集合听起来像是一项艰巨的任务。但是Python以其强大的连接性和数据操作能力让它变得轻而易举。在本文中,我将详细解释使用简单的Python脚本将MySQL表数据迁移到MongoDB集合所需的步骤。这些脚本是在Windows上使用Python3.9.5开发的。但是,它应该适用于任何平台上的任何Python3+版本。第1步:安装所需模块第一步是安装连接MySQL和MongoDB数据库实例所需的模块。我们将使用mysql.connector连接到MySQL数据库。对于MongoDB,使用pymongo,推荐用于从Python连接到MongoDB的模块。如果未安装所需模块,请运行以下pip命令进行安装。pipinstallmysql-connectorpipinstallpymongoPIP是Python包或模块的包管理器。第2步:从MySQL表中读取数据第一步是从源MySQL表中读取数据,并以可用于将数据加载到目标MongoDB数据库中的格式进行准备。MongoDB是一个NoSQL数据库,将数据存储为JSON文档,因此最好生成JSON格式的源数据。值得一提的是,Python具有强大的数据处理能力,可以轻松地将数据转换成JSON格式。importmysql.connectormysqldb=mysql.connector.connect(host="localhost",database="employees",user="root",password="")mycursor=mysqldb.cursor(dictionary=True)mycursor.execute("SELECT*fromcategories;")myresult=mycursor.fetchall()print(myresult)当脚本完成且没有任何错误时,输出如下:[{"id":4,"name":"Medicine","description":"
医学
","created_at":"","updated_at":""},{"id":6,"name":"Food","description":"
食物
","created_at":"","updated_at":""},{"id":8,"name":"Groceries","description":"
杂货
","created_at":"","updated_at":""},{"id":9,"name":"蛋糕&;Bakes","description":"
Cakes&Bakes
","created_at":d"","updated_at":""}]请注意,输出是一个JSON数组,因为我们通过dictionary=True参数给游标,否则结果会是列表格式,现在有了JSON格式的源数据,就可以迁移到MongoDB集合中了。第三步:写入MongoDB集合中获取源数据后JSON格式,下一步是往MongoDB集合中插入数据,集合就是一组文档,相当于RDBMS中的一个表(或者关系),我可以通过调用的insert_many()方法来实现集合类,它返回用于插入文档的对象ID列表。请注意,当将空列表作为参数传递时,此方法将抛出异常,因此在方法调用之前会进行长度检查。importpymongomongodb_host="mongodb://localhost:27017/"mongodb_dbname="mymongodb"myclient=pymongo.MongoClient(mongodb_host)mydb=myclient[mongodb_dbname]mycol=mydb["categories"]iflen(myresult)>0:x=mycol.insert_many(myresult)#myresultcomesfrommysqlcursorprint(len(x.inserted_ids))这一步之后,检??查MongoDB实例以验证数据库和集合已创建并插入了文档。请注意,MongoDB是无模式的,这意味着您不必定义模式来插入文档,模式是动态推断和自动创建的。MongoDB还可以创建数据库和集合(如果它们尚不存在)。第4步:将数据放在一起下面是从MySQL读取表并将其插入MongoDB中的集合的完整脚本。importmysql.connectorimportpymongodelete_existing_documents=Truemysql_host="localhost"mysql_database="mydatabase"mysql_schema="myschema"mysql_user="myuser"mysql_password="********"mongodb_host="mongodb://localhost:27017/"mongodb_dbname=“mymongodb”mysqldb=mysql.connector.connect(host=mysql_host,database=mysql_database,user=mysql_user,password=mysql_password)mycursor=mysqldb.cursor(dictionary=True)mycursor.execute(“SELECT*fromcategories;”)myresult=mycursor.fetchall()myclient=pymongo.MongoClient(mongodb_host)mydb=myclient[mongodb_dbname]mycol=mydb["类别"]iflen(myresult)>0:x=mycol.insert_many(myresult)#myresultcomesfrommysqlcursorprint(len(x.inserted_ids)))第5步:增强脚本以加载MySQL模式中的所有表此脚本从MySQL读取表并将结果加载到MongoDB集合中。然后,下一步是遍历源数据库中所有表的列表,并将结果加载到一个新的MySQL集合中。我们可以通过查询information_schema.tables元数据表来做到这一点,该表提供给定模式中的表列表。然后您可以迭代结果并调用上面的脚本来迁移每个表的数据。#Iteratethroughthelistoftablesintheschematable_list_cursor=mysqldb.cursor()table_list_cursor.execute("SELECTtable_nameFROMinformation_schema.tablesWHEREtable_schema=%sORDERBYtable_name;",(mysql_schema,))tables=table_list_cursor.fetchall()fortableintables:#Executetable也可以通过fortableintables抽象成迁移逻辑:#Executetable'函数来实现这一点。#Functionmigrate_tabledefmigrate_table(db,col_name):mycursor=db.cursor(dictionary=True)mycursor.execute("SELECT*FROM"+col_name+";")myresult=mycursor.fetchall()mycol=mydb[col_name]ifdelete_existing_documents:#deletealldocumentsinthecollection.delete_many({})#insertthedocumentsiflen(myresult)>0:x=mycol.insert_many(myresult)returnlen(x.inserted_ids)else:return0第六步:输出脚本进度,使其可读脚本的进度由使用打印语句来传达。使用颜色编码使输出易于阅读。例如,以绿色打印成功语句,以红色打印失败语句。classbcolors:HEADER='\033[95m'OKBLUE='\033[94m'OKCYAN='\033[96m'OKGREEN='\033[92m'WARNING='\033[93m'FAIL='\033[91m'ENDC='\033[0m'BOLD='\033[1m'UNDERLINE='\033[4m'print(f"{bcolors.HEADER}Thisaheader{bcolors.ENDC}")print(f"{bcolors.OKBLUE}Thisprintsinblue{bcolors.ENDC}")print(f"{bcolors.OKGREEN}Thismessageisgreen{bcolors.ENDC}")最终结果迁移后的源MySQL数据库目标MongoDB数据库Python脚本并在VSCode中输出完成脚本importmysql.connectorimportpymongoimportdatetimeclassbcolors:HEADER='\033[95m'OKBLUE='\033[94m'OKCYAN='\033[96m'OKGREEN='\033[92m'WARNING='\033[93m'FAIL='\033[91m'ENDC='\033[0m'BOLD='\033[1m'UNDERLINE='\033[4m'begin_time=datetime.datetime.now()print(f"{bcolors.HEADER}Scriptstartedat:{begin_time}{bcolors.ENDC}")delete_existing_documents=真;mysql_host="localhost"mysql_database="mydatabase"mysql_schema="myschhema"mysql_user="root"mysql_password=""mongodb_host="mongodb://localhost:27017/"mongodb_dbname="mymongodb"print(f"{bcolors.HEADER}初始化数据库连接...{bcolors.ENDC}")print("")#MySQLconnectionprint(f"{bcolors.HEADER}连接到MySQL服务器...{bcolors.ENDC}")mysqldb=mysql.connector.connect(host=mysql_host,database=mysql_database,user=mysql_user,password=mysql_password)print(f"{bcolors.HEADER}ConnectiontoMySQLServersucceeded.{bcolors.ENDC}")#MongoDBconnectionprint(f"{bcolors.HEADER}ConnectingtoMongoDBserver...{bcolors.ENDC}")myclient=pymongo.MongoClient(mongodb_host)mydb=myclient[mongodb_dbname]print(f"{bcolors.HEADER}ConnectiontoMongoDBServersucceeded.{bcolors.ENDC}")print(f"{bcolors.HEADER}数据库连接初始化成功。{bcolors.ENDC}")#Startmigrationprint(f"{bcolors.HEADER}迁移开始...{bcolors.ENDC}")dblist=myclient。list_database_names()ifmongodb_dbnameindblist:print(f"{bcolors.OKBLUE}数据库存在。{bcolors.ENDC}")else:print(f"{bcolors.WARNING}数据库不存在,正在创建。{bcolors.ENDC}")#Functionmigrate_tabledefmigrate_table(db,col_name):mycursor=db。游标(字典=真)mycursor.execute(“SELECT*FROM”+col_name+“;”)myresult=mycursor.fetchall()mycol=mydb[col_name]ifdelete_existing_documents:#deletealldocumentsinthecollectionmycol.delete_many({})#insertthedocumentsiflen(myresult)>0:x=mycol.insert_many(myresult)returnlen(x.inserted_ids)else:return0#Iteratethroughthelistoftablesintheschematable_list_cursor=mysqldb.cursor()table_list_cursor.execute("SELECTtable_nameFROMinformation_schema.tablesWHEREtable_schema=%sORDERBYtable_nameLIMIT15;",(mysql_schema.)表fetchall()total_count=len(tables)success_count=0fail_count=0fortableintables:try:print(f"{bcolors.OKCYAN}Processingtable:{table[0]}...{bcolors.ENDC}")inserted_count=migrate_table(mysqldb,table[0])print(f"{bcolors.OKGREEN}Processingtable:{table[0]}completed.{inserted_count}documentsinserted.{bcolors.ENDC}")success_count+=1exceptExceptionase:print(f"{bcolors.FAIL}{e}{bcolors.ENDC}")fail_count+=1print("")print("迁移完成。")print(f"{bcolors.OKGREEN}{success_count}of{total_count}表迁移成功。{bcolors.ENDC}")iffail_count>0:print(f"{bcolors.FAIL}Migrationof{fail_count}tablesfailed.Seeerrorsabove.{bcolors.ENDC}")end_time=datetime.datetime.now()print(f"{bcolors.HEADER}Scriptcompletedat:{end_time}{bcolors.ENDC}")print(f"{bcolors.HEADER}Totalexecutiontime:{end_time-begin_time}{bcolors.ENDC}")警告该脚本适用于中小型MySQL数据库,有数百个表,每个表有数千行对于具有数百万行的大型数据库,性能可能会受到影响。在开始实际迁移之前,在表列表查询和实际表选择查询上使用LIMIT关键字来检测有限的行。下载点此从GitHub下载整个脚本:https://github.com/zshameel/MySQL2MongoDB【翻译请注明原文译者及来源为.com,合作站点转载】