作者:谢世雯62956 | 来源:互联网 | 2024-12-24 21:56
本文详细介绍了如何利用Pandas直接读取和解析SQL脚本,提供了一种高效的数据处理方法。该方法适用于各种数据库导出的SQL脚本,并且能够显著提升数据导入的速度和效率。
在实际工作中,有时会遇到需要处理几百MB大小的SQL脚本文件的情况。通常的做法是先将SQL脚本导入数据库,再从数据库中读取数据,但这种方法速度较慢。本文介绍了一种更高效的方法:直接从SQL脚本中读取数据并加载到Pandas DataFrame中。
将SQL脚本文本解析为CSV格式并加载
SQL脚本本质上是一个文本文件,现代计算机可以轻松地将其一次性加载到内存中。通过Python解析SQL脚本并转换为CSV格式,可以快速生成Pandas DataFrame。
注意:本文提供的代码主要针对MySQL数据库(如SQLyog导出的脚本),对于其他类型的数据库,可能需要根据实际情况进行微调。
以下是具体的读取方法:
from io import StringIO
import pandas as pd
import re
def parse_sql_script(sql_file_path, quotechar="'") -> dict:
insert_pattern = re.compile(r"insert +into +`?(\w+?)`?\(", re.I | re.A)
with open(sql_file_path, encoding="utf-8") as f:
sql_cOntent= f.read()
end_pos = -1
dataframes = {}
while True:
match = insert_pattern.search(sql_content, end_pos + 1)
if not match:
break
table_name = match.group(1)
start_pos = match.span()[1] + 1
end_pos = sql_content.find(";", start_pos)
tmp = re.sub(r"\) (values |,)\(", "\n", sql_content[start_pos:end_pos])
tmp = re.sub(r"[()`]", "", tmp)
df = pd.read_csv(StringIO(tmp), quotechar=quotechar)
dfs = dataframes.setdefault(table_name, [])
dfs.append(df)
for table_name, dfs in dataframes.items():
dataframes[table_name] = pd.concat(dfs)
return dataframes
参数:
- sql_file_path:SQL脚本的文件路径
- quotechar:脚本中字符串的引号类型,默认为单引号
返回值:
一个字典,键为表名,值为对应的DataFrame对象。
例如,我们可以用以下代码读取名为index_test
的表:
df_dict = parse_sql_script("D:/tmp/test.sql")
df = df_dict['index_test']
df.head(10)
结果如下图所示:
如果只需要读取特定表的数据,可以修改上述函数以支持按表名读取:
def read_specific_table(sql_file_path, table_name, quotechar="'") -> pd.DataFrame:
insert_pattern = re.compile(r"insert +into +`?(\w+?)`?\(", re.I | re.A)
with open(sql_file_path, encoding="utf-8") as f:
sql_cOntent= f.read()
end_pos = -1
dfs = []
while True:
match = insert_pattern.search(sql_content, end_pos + 1)
if not match:
break
if match.group(1) != table_name:
continue
start_pos = match.span()[1] + 1
end_pos = sql_content.find(";", start_pos)
tmp = re.sub(r"\) (values |,)\(", "\n", sql_content[start_pos:end_pos])
tmp = re.sub(r"[()`]", "", tmp)
df = pd.read_csv(StringIO(tmp), quotechar=quotechar)
dfs.append(df)
return pd.concat(dfs)
参数:
- sql_file_path:SQL脚本的文件路径
- table_name:要读取的表名
- quotechar:脚本中字符串的引号类型,默认为单引号
返回值:
指定表对应的DataFrame对象。
读取代码示例:
df = read_specific_table("D:/tmp/test.sql", "index_test")
df.head()
结果如下图所示:
将SQL脚本转换为SQLite格式并通过本地连接读取
另一种方法是将SQL脚本转换为SQLite语法的SQL语句,然后通过SQLite连接读取数据。此方法同样适用于MySQL导出的SQL脚本,但对于其他数据库,可能需要根据具体情况进行调整。
from sqlalchemy import create_engine
import pandas as pd
import re
def convert_and_load_sql_to_sqlite(sql_file_path):
create_pattern = re.compile("create +table [^;]+;", re.I)
insert_pattern = re.compile("insert +into [^;]+;", re.I)
with open(sql_file_path, encoding="utf-8") as f:
sql_cOntent= f.read()
engine = create_engine('sqlite:///:memory:')
pos = -1
while True:
match = create_pattern.search(sql_content, pos + 1)
if match:
pos = match.span()[1]
sql = match.group(0).replace("AUTO_INCREMENT", "")
sql = re.sub(r"\).+;", ");", sql)
engine.execute(sql)
match = insert_pattern.search(sql_content, pos + 1)
if match:
pos = match.span()[1]
sql = match.group(0)
engine.execute(sql)
else:
break
tablenames = [t[0] for t in engine.execute(
"SELECT tbl_name FROM sqlite_master WHERE type='table';").fetchall()]
return tablenames, engine.connect()
参数:
sql_file_path:SQL脚本的文件路径
返回值:
一个包含两个元素的元组,第一个元素是表名列表,第二个元素是SQLite内存连接。
测试读取:
tablenames, cOnn= convert_and_load_sql_to_sqlite("D:/tmp/test.sql")
tablename = tablenames[0]
print(tablename)
df = pd.read_sql(f"select * from {tablename};", conn)
df
结果如下图所示: