Python操作MySQL工具类
在上一篇文章中,简单对Python利用pymysql操作MySQL进行了封装,实现了多种功能的复用。接下来,在参阅pymysql的API后,对Python进行了更加详细的封装,实现了Python操作MySQL的工具类MySQLDBHelper,其代码主要如下:
import pymysqlimport reclass MySQLDBHelper(object): """ 定义构造方法,用于初始化数据库连接 """ def __init__(self, config): self.host = config['host'] self.user = config['user'] self.password = config['password'] self.port = config['port'] self.connection = None self.cursor = None try: self.connection = pymysql.connect(**config) self.connection.autocommit(1) # 调用cursor()方法创建一个用于操作的游标 self.cursor = self.connection.cursor() except: print("数据库连接失败,请检查数据库配置项!") # 定义关闭数据库连接的方法 def close(self): if not self.connection: self.connection.close() else: print("数据库连接已经关闭!") # 创建数据库 def createDatabase(self, DB_NAME): """ 创建数据库 """ self.cursor.execute( "CREATE DATABASE IF NOT EXISTS %s DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci" % DB_NAME) self.connection.select_db(DB_NAME) print(f"数据库{DB_NAME}创建成功!") # 选择数据库 def selectDatabase(self, DB_NAME): self.connection.select_db(DB_NAME) # 获取数据库的版本号 def getDatabaseVersion(self): self.cursor.execute('SELECT VERSION()') return self.queryOne() # 获取一行查询结果 def queryOne(self): return self.cursor.fetchone() # 创建数据库表 def createTable(self, table_name, attribute_dict, constraint): """ 创建数据库表 :param table_name: 表的名称 :param attribute_dict: 属性键值对 :param constraint: 主外键约束 """ # 判断要创建的表是否存在 if self.isExistTable(table_name): print(f"{table_name}已经存在!") return sql = '' sql_mid = '`id` bigint(11) auto_increment,' for attr, value in attribute_dict.items(): sql_mid = sql_mid + '`' + attr + '`' + ' ' + value + ',' sql = sql + 'CREATE TABLE IF NOT EXISTS %s (' % table_name sql = sql + sql_mid sql = sql + constraint sql = sql + ') ENGINE=InnoDB DEFAULT CHARSET=utf8' print(f'创建表的语句是:{sql}') self.executeCommit(sql) def executeSql(self, sql=''): """ 执行sql语句,针对读操作返回结果集 """ try: self.cursor.execute(sql) records = self.cursor.fetchall() return records except pymysql.Error as e: print(f'SQL语句执行失败!ERROR({e.args[0]}): {e.args[1]}') def executeCommit(self, sql=''): """ 执行SQL语句,针对更新、删除等事务操作,失败时回滚 """ try: self.cursor.execute(sql) self.connection.commit() except pymysql.Error as e: self.connection.rollback() error = 'SQL语句执行失败!ERROR (%s): %s' % (e.args[0], e.args[1]) print(error) return error def insert(self, table_name, params): """ """ key = [] value = [] for k, v in params.items(): key.append(k) if isinstance(v, str): value.append("'" + v + "'") else: value.append(v) attrs_sql = '(' + ','.join(key) + ')' values_sql = ' values(' + ','.join(value) + ')' sql = 'insert into %s' % table_name sql = sql + attrs_sql + values_sql print(f'插入SQL语句: {sql}') self.executeCommit(sql) def select(self, table_name, cond_dict='', order='', fields='*'): """ 查询数据 args: table_name: 表的名字 cond_dict: 查询条件 order: 排序条件 示例: db.select(table) db.select(table, fields=['user_name]) db.select(table, fields=['user_name','income']) """ cond_sql = ' ' if cond_dict != '': for k, v in cond_dict.items(): cond_sql = cond_sql + '`' + k + '`' + '=' + '"' + v + '"' + ' and' cond_sql = cond_sql + ' 1=1 ' if fields == '*': sql = 'select * from %s where ' % table_name else: if isinstance(fields, list): fields = ",".join(fields) sql = 'select %s from %s where ' % (fields, table_name) else: print('传入的字段不正确,请以列表的形式传入字段信息!') sql = sql + cond_sql + order print(f'查询语句: {sql}') return self.executeSql(sql) def insertMany(self, table_name, attrs, values): """ 插入多条数据 args: table_name: 表的名字 attrs: 属性键 values: 属性值 示例: table_name ='mytable' key = ["id","name","age"] value = [[1,'张三',20],[2,'李四',25]] db.insertMany(table_name, key, value) """ values_sql = ['%s' for attr in attrs] attrs_sql = '(' + ','.join(attrs) + ')' values_sql = ' values(' + ','.join(values_sql) + ')' sql = 'insert into %s' % table_name sql = sql + attrs_sql + values_sql print(f'插入多条语句:{sql}') try: for i in range(0, len(values), 20000): self.cursor.executemany(sql, values[i:i + 20000]) self.connection.commit() except pymysql.Error as e: self.connection.rollback() print(f'插入多条数据遇到错误! ERROR({e.args[0]}):{e.args[1]}') def delete(self, table_name, cond_dict): """ 删除数据 args: table_name: 表的名字 cond_dict: 删除条件字典 示例: params = {'name': '风清扬', 'age':38} db.delete(table_name, params) """ cond_sql = ' ' if cond_sql != '': for k, v in cond_dict.items(): if isinstance(v, str): v = "'" + v + "'" cond_sql = cond_sql + table_name + "." + k + '=' + v + ' and ' cond_sql = cond_sql + ' 1=1 ' sql = 'delete from %s where %s ' % (table_name, cond_sql) print(f'生成的sql语句:{sql}') return self.executeCommit(sql) def update(self, table_name, attrs_dict, cond_dict): """ args: table_name: 表的名字 attrs_dict: 更新属性键值对字典 cond_dict: 更新条件字典 example: params = {'name': '风清扬', 'age': 40} """ attrs_list = [] cond_sql = ' ' for k, v in attrs_dict.items(): attrs_list.append("`" + k + "`" + "=" + "'" + v + "'") attrs_sql = ",".join(attrs_list) print(f"attrs_sql: {attrs_sql}") if cond_dict != '': for k, v in cond_dict.items(): if isinstance(v, str): v = "'" + v + "'" cond_sql = cond_sql + "`" + table_name + "`." + "`" + k + "`" + '=' + v + ' and ' cond_sql = cond_sql + ' 1=1 ' sql = 'update %s set %s where %s' % (table_name, attrs_sql, cond_sql) print(sql) return self.executeCommit(sql) def dropTable(self, table_name): """ 删除数据库表 args: table_name: 表名字 """ sql = 'DROP TABLE %s' % table_name self.executeCommit(sql) def deleteTable(self, table_name): """ 清空表数据 """ sql = 'DELETE FROM %s' % table_name print(sql) self.executeCommit(sql) def truncateTable(self, table_name): """ 清空表数据,不写日志 """ sql = 'TRUNCATE TABLE %s' % table_name print(sql) self.executeCommit(sql) def isExistTable(self, table_name): """ 判断表是否存在 """ sql = 'select * from %s' % table_name ret = self.executeCommit(sql) if ret is None: return True else: if re.search("doesn't exist", ret): return False else: return True
# 测试if __name__ == "__main__": # 定义数据库访问参数 config = { 'host': 'node05', 'user': 'root', 'password': 'Love88me', 'port': 3306, 'charset': 'utf8', 'cursorclass': pymysql.cursors.DictCursor } # 初始化打开数据库连接 db = MySQLDBHelper(config) # 打印数据库版本 print(db.getDatabaseVersion()) # 测试创建数据库 print("================测试创建数据库=========================") DB_NAME = input('输入要创建的数据库名称:') db.createDatabase(DB_NAME) # 测试选择数据库 print("================测试选择数据库=========================") db.selectDatabase(DB_NAME) # 测试创建表 print("================测试创建数据表=========================") table_name = input('请输入要创建的数据表名:') ''' 例如 create table `location` ( `id` bigint(11) not null auto_increment, `name` varchar(30) not null, `province` varchar(20), `city` varchar(50) ) engine=InnoDB default charset=utf8; ''' attrs_dict = { 'name': 'varchar(30) not null', 'province': 'varchar(30) not null', 'city': 'varchar(50)' } constraint = 'primary key(`id`)' db.createTable(table_name, attrs_dict, constraint) # 测试单条数据插入功能 params = {'name': '北京市文化局', 'province': '北京', 'city': '丰台区'} db.insert(table_name, params) # 测试批量插入数据 insert_values = [['江苏省无锡市文化局','江苏省','无锡市'], ['安徽省合肥市教育局','安徽省','合肥市'], ['浙江省杭州市旅游局','浙江省','杭州市']] insert_attrs = ['name','province','city'] db.insertMany(table_name, insert_attrs, insert_values) # 测试数据查询 print(db.select(table_name, fields=['id','name'])) print(db.select(table_name,cond_dict={'province':'江苏省'}, fields=['name','province','city'])) print(db.select(table_name, fields=['id','name', 'province', 'city'], order='order by id desc')) # 测试更新数据 update_params = {'name':'江苏省无锡市苏南国际鸡厂'} update_cond_dict = {'province':'江苏省'} db.update(table_name, update_params, update_cond_dict) # 测试删除数据 delete_params = {'province':'安徽省'} db.delete(table_name, delete_params) # 测试删除表数据 db.deleteTable(table_name) # 测试删除表 db.dropTable(table_name)
操作的中间结果