SQL Expression Language对原生SQL语言进行了简单的封装
两大模块SQLAlchemy Core and ORM:
Core:提供执行SQL Expression Language的接口
ORM
安装:SQLAlchemy及相关数据库驱动pip install sqlalchemy pymysql
数据库连接字符串格式:请参考这里
mysql://username:password@hostname/database
postgresql://username:password@hostname/database
sqlite:////absolute/path/to/database
oracle://scott:tiger@127.0.0.1:1521/orcl
比如SQLite如下:
from sqlalchemy import create_engine
engine = create_engine('sqlite:///COOKIEs.db')
engine2 = create_engine('sqlite:///:memory:')
engine3 = create_engine('sqlite:////home/COOKIEmonster/COOKIEs.db')
engine4 = create_engine('sqlite:///c:\\Users\\COOKIEmonster\\COOKIEs.db')
注意:create_engine函数返回以一个engine实例,但是不会立即获取数据库连接,直到在engine上进行操作如查询时才会去获取connection
关于MySQL空闲连接8小时自动关闭的解决方案:传入 pool_recycle=3600参数
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://COOKIEmonster:chocolatechip@mysql01.monster.internal/COOKIEs', pool_recycle=3600)
create_engine其余的一些参数:
echo:是否log打印执行的sql语句及其参数。默认为False
encoding:默认utf-8
isolation_level:隔离级别
pool_recycle:指定连接回收间隔,这对于MySQL连接的8小时机制特别重要。默认-1
获取连接
from sqlalchemy import create_engine
Schema and Types
engine = create_engine('mysql+pymysql://COOKIEmonster:chocolatechip' \
'@mysql01.monster.internal/COOKIEs', pool_recycle=3600)
cOnnection= engine.connect()
四种类型集合:
• Generic
• SQL standard
• Vendor specific
• User defined
SQLAlchemy定义了很多generic types以兼容不同数据库。这些类型都定义在sqlalchemy.types
模块中,为了方便也可以从sqlalchemy直接导入这些类型。
类型对应表如下:
SQLAlchemy | Python | SQL |
---|---|---|
BigInteger | int | BIGINT |
Boolean | bool | BOOLEAN or SMALLINT |
Date | datetime.date | DATE (SQLite: STRING) |
DateTime | datetime.datetime | DATETIME (SQLite: STRING) |
Enum | str | ENUM or VARCHAR |
Float | float or Decimal | FLOAT or REAL |
Integer | int | INTEGER |
Interval | datetime.timedelta | INTERVAL or DATE from epoch |
LargeBinary | byte | BLOB or BYTEA |
Numeric | decimal.Decimal | NUMERIC or DECIMAL |
Unicode | unicode | UNICODE or VARCHAR |
Text | str | CLOB or TEXT |
Time | datetime.time | DATETIME |
如果这些类型不能满足你,比如有些数据库支持json类型,那么你需要用到sqlalchemy.dialects
模块中对应数据库的类型。比如from sqlalchemy.dialects.postgresql import JSON
Metadata为了快速访问数据库。可以看作是很多Table对象的集合,还有一些关于engin,connection的信息。可以通过MetaData.tables
访问这些表对象字典
定义表对象之前需要先实例化Metadata:
from sqlalchemy import MetaData
metadata = MetaData()
Table对象构建如下:第一个参数为名称,第二个参数为Metadata对象,后续参数为Column对象. Column对象参数为,名称,类型,及其余等
from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey
COOKIEs = Table('COOKIEs', metadata,
Column('COOKIE_id', Integer(), primary_key=True),
Column('COOKIE_name', String(50), index=True),
Column('COOKIE_recipe_url', String(255)),
Column('COOKIE_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
from datetime import datetime
from sqlalchemy import DateTime
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(), default=datetime.now, Onupdate=datetime.now)
注意:这里default,onupdate属性是一个callable对象而不是直接值,比如datetime.now(),因为这样的话,就永远是这个值,而不是每个实例实例化、更新时的时间了。
比较有用的就是onupdate
,每次更新时都会调用该方法或函数。
键和约束(Keys and Constraints)
键和约束既可以像上面那样通过kwargs定义在Column中,也可以在之后通过对象添加。相关类定义在基础的 sqlalchemy模块中,比如最常用的三个:from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint, CheckConstraint
PrimaryKeyConstraint('user_id', name='user_pk'),它也支持同时定义多个形成联合主键。
UniqueConstraint('username', name='uix_username')
CheckConstraint('unit_cost >= 0.00', name='unit_cost_positive')
索引(Index)
from sqlalchemy import Index
Index('ix_COOKIEs_COOKIE_name', 'COOKIE_name')
这个定义需要放置在Table构造器中。也可以在之后定义,比如Index('ix_test', mytable.c.COOKIE_sku, mytable.c.COOKIE_name))
关联关系和外键约束(Relationships and ForeignKeyConstraints)
from sqlalchemy import ForeignKey
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id')),
Column('shipped', Boolean(), default=False)
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('COOKIE_id', ForeignKey('COOKIEs.COOKIE_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)
注意:这里ForeignKey用的是字符串参数(这些字符串对应的是数据库中的表名.列名),而非引用。这样隔离了模块间相互依赖
我们也可以使用:ForeignKeyConstraint(['order_id'], ['orders.order_id'])
创建或持久化表模式(Persisting the Tables)
通过示例代码我们知道所有的Table定义,以及额外的模式定义都会与一个metadata对象关联。我们可以通过这个metadata对象来创建表:
metadata.create_all(engine)
注意:默认情况下create_all不会
重新创建已有表,所以它可以安全地多次调用,而且也非常友好地与数据库迁移库如Ablembic集成而不需要你进行额外手动编码。
本节代码完整如下:
from datetime import datetime
SQLAlchemy-Core模块
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
DateTime, ForeignKey, create_engine)
metadata = MetaData()
COOKIEs = Table('COOKIEs', metadata,
Column('COOKIE_id', Integer(), primary_key=True),
Column('COOKIE_name', String(50), index=True),
Column('COOKIE_recipe_url', String(255)),
Column('COOKIE_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('customer_number', Integer(), autoincrement=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(), default=datetime.now, Onupdate=datetime.now)
)
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id'))
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('COOKIE_id', ForeignKey('COOKIEs.COOKIE_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)
ins = COOKIEs.insert().values(
COOKIE_name="chocolate chip",
COOKIE_recipe_url="http://some.aweso.me/COOKIE/recipe.html",
COOKIE_sku="CC01",
quantity="12",
unit_cost="0.50"
)
print(str(ins))
当然你也可以这么做:
from sqlalchemy import insert
ins = insert(COOKIEs).values(
COOKIE_name="chocolate chip",
COOKIE_recipe_url="http://some.aweso.me/COOKIE/recipe.html",
COOKIE_sku="CC01",
quantity="12",
unit_cost="0.50"
)
上述编译成预编译语句如下:
INSERT INTO COOKIEs
(COOKIE_name, COOKIE_recipe_url, COOKIE_sku, quantity, unit_cost)
VALUES
(:COOKIE_name, :COOKIE_recipe_url, :COOKIE_sku, :quantity, :unit_cost)
实际过程会是如下ins对象内部会调用compile()方法编译成上述语句,然后将参数存储到ins.compile().params字典中。
接下来我们通过前面获取的connection对象执行statement:
result = connection.execute(ins)
当然你也可以这么查询:
ins = COOKIEs.insert()
result = connection.execute(
ins,
COOKIE_name='dark chocolate chip',
COOKIE_recipe_url='http://some.aweso.me/COOKIE/recipe_dark.html',
COOKIE_sku='CC02',
quantity='1',
unit_cost='0.75'
)
result.inserted_primary_key
inventory_list = [
{
'COOKIE_name': 'peanut butter',
'COOKIE_recipe_url': 'http://some.aweso.me/COOKIE/peanut.html',
'COOKIE_sku': 'PB01',
'quantity': '24',
'unit_cost': '0.25'
},
{
'COOKIE_name': 'oatmeal raisin',
'COOKIE_recipe_url': 'http://some.okay.me/COOKIE/raisin.html',
'COOKIE_sku': 'EWW01',
'quantity': '100',
'unit_cost': '1.00'
}
]
result = connection.execute(ins, inventory_list)
注意:一定要确保所有字典参数拥有相同的keys
from sqlalchemy.sql import select
s = select([COOKIEs])
rp = connection.execute(s)
results = rp.fetchall()
当然我们也可以使用字符串来代替:
s = select("""SELECT COOKIEs.COOKIE_id, COOKIEs.COOKIE_name,
COOKIEs.COOKIE_recipe_url, COOKIEs.COOKIE_sku, COOKIEs.quantity,
COOKIEs.unit_cost FROM COOKIEs""")
connection.execute返回的rp变量是一个ResultProxy
对象(它是DBAPI中cursor对象的封装)。
我们也可以这样写:
from sqlalchemy.sql import select
s = COOKIEs.select()
rp = connection.execute(s)
results = rp.fetchall()
ResultProxy使得查询结果可以通过index,name,or Column object访问列数据。例如:
first_row = results[0]
first_row[1] #游标列索引从1开始,by index
first_row.COOKIE_name # by name
first_row[COOKIEs.c.COOKIE_name] #by Column object.
你也可以迭代ResultProxy,如下:
rp = connection.execute(s)
for record in rp:
print(record.COOKIE_name)
ResultProxy其余可用来获取结果集的方法
first()
fetchone()
fetchall()
scalar():Returns a single value if a query results in a single record with one column.
keys() 获取列名
关于选择ResultProxy上述的方法的建议:
1、使用first()而不是fetchone()来获取单条记录,因为fetchone()调用之后仍然保留着打开的connections共后续使用,如果不小心的话很容易引起问题。
2、使用迭代方式获取所有结果,而不是fetchall(),更加省内存。
3、使用scalar()获取单行单列结果时需要注意,如果返回多于一行,它会抛出异常。
控制返回列的数目
s = select([COOKIEs.c.COOKIE_name, COOKIEs.c.quantity])
rp = connection.execute(s)
print(rp.keys())
result = rp.first()
排序
s = select([COOKIEs.c.COOKIE_name, COOKIEs.c.quantity])
s = s.order_by(COOKIEs.c.quantity)
rp = connection.execute(s)
for COOKIE in rp:
print('{} - {}'.format(COOKIE.quantity, COOKIE.COOKIE_name))
#倒序desc
from sqlalchemy import desc
s = select([COOKIEs.c.COOKIE_name, COOKIEs.c.quantity])
s = s.order_by(desc(COOKIEs.c.quantity))
限制返回结果集的条数
s = select([COOKIEs.c.COOKIE_name, COOKIEs.c.quantity])
s = s.order_by(COOKIEs.c.quantity)
s = s.limit(2)
rp = connection.execute(s)
print([result.COOKIE_name for result in rp])
在sqlalchemy.sql.func模块中
#sum
from sqlalchemy.sql import func
s = select([func.sum(COOKIEs.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())
#count
s = select([func.count(COOKIEs.c.COOKIE_name)])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.count_1) #字段名是自动生成的,
#设置别名
s = select([func.count(COOKIEs.c.COOKIE_name).label('inventory_count')])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.inventory_count)
过滤
#where
s = select([COOKIEs]).where(COOKIEs.c.COOKIE_name == 'chocolate chip')
rp = connection.execute(s)
record = rp.first()
print(record.items()) #调用row对象的items()方法。
#like
s = select([COOKIEs]).where(COOKIEs.c.COOKIE_name.like('%chocolate%'))
rp = connection.execute(s)
for record in rp.fetchall():
print(record.COOKIE_name)
可以在where中使用的子句元素
between(cleft, cright)
concat(column_two) Concatenate column with column_two
distinct()
in_([list])
is_(None) Find where the column is None (commonly used for Null checks with None)
contains(string) Find where the column has string in it (case-sensitive)
endswith(string) Find where the column ends with string (case-sensitive)
like(string) Find where the column is like string (case-sensitive)
startswith(string) Find where the column begins with string (case-sensitive)
ilike(string) Find where the column is like string (this is not case-sensitive)
当然还包括一系列的notxxx方法,比如notin_(),唯一的例外是isnot()
操作符
+,-,*,/,%
==,!=,<,>,<=,>=
AND,OR,NOT,由于python关键字的原因,使用and_(),or_(),not_()来代替
+号还可以用于字符串拼接:
s = select([COOKIEs.c.COOKIE_name, 'SKU-' + COOKIEs.c.COOKIE_sku])
for row in connection.execute(s):
print(row)
from sqlalchemy import cast
s = select([COOKIEs.c.COOKIE_name,
cast((COOKIEs.c.quantity * COOKIEs.c.unit_cost),
Numeric(12,2)).label('inv_cost')])
for row in connection.execute(s):
print('{} - {}'.format(row.COOKIE_name, row.inv_cost))
注意:cast是另外一个函数,允许我们进行类型转换,上述转换是将数字转换为货币形式,和
print(&#8216;{} &#8211; {:.2f}&#8217;.format(row.COOKIE_name, row.inv_cost)).这个行为一致。
from sqlalchemy import and_, or_, not_
s = select([COOKIEs]).where(
and_(
COOKIEs.c.quantity > 23,
COOKIEs.c.unit_cost <0.40
)
)
for row in connection.execute(s):
print(row.COOKIE_name)
from sqlalchemy import and_, or_, not_
s = select([COOKIEs]).where(
or_(
COOKIEs.c.quantity.between(10, 50),
COOKIEs.c.COOKIE_name.contains('chip')
)
)
for row in connection.execute(s):
print(row.COOKIE_name)
from sqlalchemy import update
u = update(COOKIEs).where(COOKIEs.c.COOKIE_name == "chocolate chip")
u = u.values(quantity=(COOKIEs.c.quantity + 120))
result = connection.execute(u)
print(result.rowcount)
s = select([COOKIEs]).where(COOKIEs.c.COOKIE_name == "chocolate chip")
result = connection.execute(s).first()
for key in result.keys():
print('{:>20}: {}'.format(key, result[key]))
from sqlalchemy import delete
u = delete(COOKIEs).where(COOKIEs.c.COOKIE_name == "dark chocolate chip")
result = connection.execute(u)
print(result.rowcount)
s = select([COOKIEs]).where(COOKIEs.c.COOKIE_name == "dark chocolate chip")
result = connection.execute(s).fetchall()
print(len(result))
join(),outerjoin()函数,select_from()函数
columns = [orders.c.order_id, users.c.username, users.c.phone,
COOKIEs.c.COOKIE_name, line_items.c.quantity,
line_items.c.extended_cost]
COOKIEmon_orders = select(columns)
COOKIEmon_orders = COOKIEmon_orders.select_from(orders.join(users).join(
line_items).join(COOKIEs)).where(users.c.username ==
'COOKIEmon')
result = connection.execute(COOKIEmon_orders).fetchall()
for row in result:
print(row)
最终产生的SQL语句如下:
SELECT orders.order_id, users.username, users.phone, COOKIEs.COOKIE_name,
line_items.quantity, line_items.extended_cost FROM users JOIN orders ON
users.user_id = orders.user_id JOIN line_items ON orders.order_id =
line_items.order_id JOIN COOKIEs ON COOKIEs.COOKIE_id = line_items.COOKIE_id
WHERE users.username = :username_1
outerjoin
columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
for row in result:
print(row)
表别名函数alias()
>>> manager = employee_table.alias('mgr')
>>> stmt = select([employee_table.c.name],
... and_(employee_table.c.manager_id==manager.c.id,
... manager.c.name=='Fred'))
>>> print(stmt)
SELECT employee.name
FROM employee, employee AS mgr
WHERE employee.manager_id = mgr.id AND mgr.name = ?
columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
for row in result:
print(row)
def get_orders_by_customer(cust_name, shipped=None, details=False):
columns = [orders.c.order_id, users.c.username, users.c.phone]
joins = users.join(orders)
if details:
columns.extend([COOKIEs.c.COOKIE_name, line_items.c.quantity,
line_items.c.extended_cost])
joins = joins.join(line_items).join(COOKIEs)
cust_orders = select(columns)
cust_orders = cust_orders.select_from(joins)
cust_orders = cust_orders.where(users.c.username == cust_name)
if shipped is not None:
cust_orders = cust_orders.where(orders.c.shipped == shipped)
result = connection.execute(cust_orders).fetchall()
return result
返回的还是ResultProxy对象
1、完全采用原始SQL
result = connection.execute("select * from orders").fetchall()
print(result)
2、部分采用原始SQL,text()函数
from sqlalchemy import text
stmt = select([users]).where(text("username='COOKIEmon'"))
print(connection.execute(stmt).fetchall())
SQLALchemy定义了很多异常。我们通过关心:AttributeErrors,IntegrityErrors.等
为了进行相关试验与说明,请先执行下面这些语句
from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
DateTime, ForeignKey, Boolean, create_engine,
CheckConstraint)
metadata = MetaData()
COOKIEs = Table('COOKIEs', metadata,
Column('COOKIE_id', Integer(), primary_key=True),
37
Column('COOKIE_name', String(50), index=True),
Column('COOKIE_recipe_url', String(255)),
Column('COOKIE_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2)),
CheckConstraint('quantity > 0', name='quantity_positive')
)
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(),
default=datetime.now, Onupdate=datetime.now)
)
orders = Table('orders', metadata,
Column('order_id', Integer()),
Column('user_id', ForeignKey('users.user_id')),
Column('shipped', Boolean(), default=False)
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('COOKIE_id', ForeignKey('COOKIEs.COOKIE_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)
cOnnection= engine.connect()
from sqlalchemy import select, insert
ins = insert(users).values(
username="COOKIEmon",
email_address="mon@COOKIE.com",
phOne="111-111-1111",
password="password"
)
result = connection.execute(ins)
s = select([users.c.username])
results = connection.execute(s)
for result in results:
print(result.username)
print(result.password) #此处包AttributeError异常
在违反约束的情况下会出现IntegrityError异常。比如违反唯一性约束等。
s = select([users.c.username])
connection.execute(s).fetchall()
[(u'COOKIEmon',)]
ins = insert(users).values(
username="COOKIEmon",
email_address="damon@COOKIE.com",
phOne="111-111-1111",
password="password"
)
result = connection.execute(ins) #此处报IntegrityError, UNIQUE constraint failed: users.username
#异常处理
try:
result = connection.execute(ins)
except IntegrityError as error:
print(error.orig.message, error.params)
所有的SQLAlchemy异常处理方式都是上面那种思路,通过[SQLAlchemyError](http://docs.sqlal
chemy.org/en/latest/core/exceptions.html)可以获取到的信息由如下:
orig :The DBAPI exception object.
params:The parameter list being used when this exception occurred.
statement :The string SQL statement being invoked when this exception occurred.
from sqlalchemy.exc import IntegrityError
def ship_it(order_id):
s = select([line_items.c.COOKIE_id, line_items.c.quantity])
s = s.where(line_items.c.order_id == order_id)
transaction = connection.begin() #开启事务
COOKIEs_to_ship = connection.execute(s).fetchall()
try:
for COOKIE in COOKIEs_to_ship:
u = update(COOKIEs).where(COOKIEs.c.COOKIE_id == COOKIE.COOKIE_id)
u = u.values(quantity=COOKIEs.c.quantity - COOKIE.quantity)
result = connection.execute(u)
u = update(orders).where(orders.c.order_id == order_id)
u = u.values(shipped=True)
result = connection.execute(u)
print("Shipped order ID: {}".format(order_id))
transaction.commit() #提交事务
except IntegrityError as error:
transaction.rollback() #事务回滚
print(error)