# SQLAlchmey
**目标:**类/对象 --> 转换为 SQL --> pymysql/MySQLdb --> 再在数据库中执行
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
# 组成部分:
- Engine,框架的引擎
- Connection Pooling ,数据库连接池,去获取一个链接线程
- Dialect,选择连接数据库的DB API种类,记录选择pymysql、MySQLdb模块
- Schema/Types,架构和类型
- SQL Exprression Language,SQL表达式语言
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:(即Dialect的配置文件的写法)
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
# 基本使用
不常见版本,偏向于底层
import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) def task(arg): conn = engine.raw_connection() cursor = conn.cursor() cursor.execute( "select * from t1" ) result = cursor.fetchall() cursor.close() conn.close() #20个同时来,5个5个执行 for i in range(20): t = threading.Thread(target=task, args=(i,)) t.start()
# ORM操作
# models.py 用于生成表的类
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' #生成的数据库表名 id = Column(Integer, primary_key=True) #主键 name = Column(String(32), index=True, nullable=False) #name字段,创建索引,不能为空 __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), # Index('ix_id_name', 'name', 'email'), ) def init_db(): """ 根据类创建数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) #读取所有的表生成表 Base.metadata.create_all(engine) def drop_db(): """ 根据类删除数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) #删除表 Base.metadata.drop_all(engine) if __name__ == '__main__': drop_db() init_db()
# 单表/多表的增删
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import relationship Base = declarative_base() # ##################### 单表示例 ######################### class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True) age = Column(Integer, default=18) email = Column(String(32), unique=True) ctime = Column(DateTime, default=datetime.datetime.now) extra = Column(Text, nullable=True) __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), # Index('ix_id_name', 'name', 'extra'), ) class Hosts(Base): __tablename__ = 'hosts' id = Column(Integer, primary_key=True) name = Column(String(32), index=True) ctime = Column(DateTime, default=datetime.datetime.now) # ##################### 一对多示例 ######################### class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) hobby_id = Column(Integer, ForeignKey("hobby.id")) # 与生成表结构无关,仅用于查询方便 hobby = relationship("Hobby", backref='pers') # ##################### 多对多示例 ######################### class Server2Group(Base): __tablename__ = 'server2group' id = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) # 与生成表结构无关,仅用于查询方便 servers = relationship('Server', secondary='server2group', backref='groups') class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) def init_db(): """ 根据类创建数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) def drop_db(): """ 根据类删除数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': drop_db() init_db()
# ORM操作
# 基本使用
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session,相当于connection,因为本质就是一次数据库url访问,为了保持唯一性 session = Session() # ############# 执行ORM操作 ############# obj1 = Users(name="alex1") session.add(obj1) # 提交事务 session.commit() # 关闭session session.close()
# 创建表
# 单表
import datetime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) #主键字段 name = Column(String(32), index=True, nullable=False) #添加索引,不能为空 email = Column(String(32), unique=True) ctime = Column(DateTime, default=datetime.datetime.now) #datetime.datetime.now 本能加括号 extra = Column(Text, nullable=True) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), #id和name联合唯一索引 Index('ix_id_name', 'name', 'email'), )
# 一对多
class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) hobby_id = Column(Integer, ForeignKey("hobby.id")) #hobby是__tablename__ # 与生成表结构无关,仅用于查询方便 hobby = relationship("Hobby", backref='pers')
# 多对多
#不能自动创建第三张表,只能自己创建 class Server2Group(Base): #第三张表 __tablename__ = 'server2group' id = Column(Integer, primary_key=True, autoincrement=True) girl_id = Column(Integer, ForeignKey('girl.id')) boy_id = Column(Integer, ForeignKey('boy.id')) class Girl(Base): __tablename__ = 'girl' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) # 与生成表结构无关,仅用于查询方便 res = relationship('Boy', secondary='Boy2Girl', backref='girl') class Boy(Base): __tablename__ = 'boy' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), unique=True, nullable=False)
# 操作表
上面的表,通过init_db函数即可创建表,都写在models.py中,那么下面我们将在test.py函数中操作表
简单的增加记录的操作
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users #创建连接池 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session session = Session() # ############# 执行ORM操作 ############# obj1 = Users(name="alex1",email='wang@qq.com') session.add(obj1) # 提交事务 session.commit() # 关闭session,将链接放回连接池 session.close()
基于scoped_session实现线程安全,用户与session(上例类式,并多出一些方法,但是不是通过继承关系,而是重写)
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) #该方法返回的一个Session类 Session = sessionmaker(bind=engine) """ # 线程安全,基于本地线程实现每个线程用同一个session # 特殊的:scoped_session中有原来方法的Session中的一下方法: public_methods = ( '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested', 'close', 'commit', 'connection', 'delete', 'execute', 'expire', 'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind', 'is_modified', 'bulk_save_objects', 'bulk_insert_mappings', 'bulk_update_mappings', 'merge', 'query', 'refresh', 'rollback', 'scalar' ) """ #执行scoped_session类的实例化,init函数 session = scoped_session(Session) # ############# 执行ORM操作 ############# obj1 = Users(name="alex1") # 本质执行的do函数:add session.add(obj1) # 提交事务 session.commit() # 关闭session session.close()
scoped_session 能调用 add 等session类的原因?源码部分
__all__ = ["scoped_session"] #只能导入小写的出去 #第一次看见小写的类 class scoped_session(object): # session_factory 就是传进来的Session类 def __init__(self, session_factory, scopefunc=None): self.session_factory = session_factory if scopefunc: self.registry = ScopedRegistry(session_factory, scopefunc) else: #将Session类传入,触发ThreadLocalRegistry类,封装了原来的Session类、threading.local类 self.registry = ThreadLocalRegistry(session_factory) ... def instrument(name): #self是scoped_session类对象 def do(self, *args, **kwargs): #self.registry是我们创建的ThreadLocalRegistry类,加括号,触发类的__call__方法,返回的是原来的Session类的对象,当然有add方法,并将此方法放在了threading.Local中 return getattr(self.registry(), name)(*args, **kwargs) #返回一个函数名: #add:do(add) return do #循环所有的Session的公用方法 for meth in Session.public_methods: # scoped_session就是类,使用闭包,传入属性 setattr(scoped_session, meth, instrument(meth)) def makeprop(name): def set_(self, attr): setattr(self.registry(), name, attr) def get(self): return getattr(self.registry(), name) return property(get, set_) for prop in ( "bind", "dirty", "deleted", "new", "identity_map", "is_active", "autoflush", "no_autoflush", "info", "autocommit", ): setattr(scoped_session, prop, makeprop(prop))
Session.public_methods 是Session类,循环public_methods属性,及那个其设置为自己的属性
#1、这是Session类的public_methods属性 public_methods = ( "__contains__", "__iter__", "add", "add_all", "begin", "begin_nested", "close", "commit", ... ) #2、打开ThreadLocalRegistry类的init函数 def __init__(self, createfunc): #传入的是原来的Session类,这里并没有执行 self.createfunc = createfunc #这就是我们以前看见的线程安全的threading.local(),给每个线程分配空间 self.registry = threading.local() def __call__(self): try: #最初没有值 return self.registry.value except AttributeError: #原来的Session类实例化,其中包括了add方法 val = self.registry.value = self.createfunc() return val
# 增删改查全家桶
# 基本操作增删改查
from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy import create_engine,text import models #创建连接池 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session session = Session() #type: sqlalchemy # ############# 执行ORM操作 ############# # 添加单条记录 obj1 = models.Users(name="alex12",email='wang1@qq.com') session.add(obj1) #批量增加 session.add_all([ models.Users(name="alex13", email='wang3@qq.com'), models.Users(name="alex14", email='wang4@qq.com'), ]) --------------------------------------------------------- #查所有,查询可以不用commit user_list = session.query(models.Users).all() print(user_list) # 拿到所有记录对象 for row in user_list: print(row.name) #过滤查询 user = session.query(models.Users).filter(models.Users.id>2) for row in user: print(row.name) #给字段起别名,可以通过别名查询, 返回的额是一个列表中, 对每个符合要求的字段 r2 = session.query(models.Users.name.label('xx'), models.Users.age).all() print(r2) #[('alex1099', 19), ('wanglixing099', 19)] for item in r2: print(item.xx) #alex1099 ... #filter 参数传的是表达式 r3 = session.query(models.Users).filter(models.Users.name == "alex1099").all() print(r3) #[<models.Users object at 0x00000109B31DBB00>] #filter_by 传入的关键字参数 r4 = session.query(models.Users).filter_by(name='alex1099').all() r5 = session.query(models.Users).filter_by(name='alex1099').first() print(r4,r5) #[<models.Users object at 0x0000015538196B00>] <models.Users object at 0x0000015538196B00> #复杂一点的查询语句,通过 (xxx:变量).params() 进行传参, id<2,name= 的记录 r6 = session.query(models.Users).filter(text("id<:value and name=:name")).params(value=224, name='alex1099').order_by(models.Users.id).all() print(r6) #[<models.Users object at 0x0000021D46C92B38>] #使用原生自定义sql语句,类似于Djnago的raw函数 r7 = session.query(models.Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='alex1099').all() print(r7) #[<models.Users object at 0x0000018378A22B70>] ------------------------------------------------------- #删除 user = session.query(models.Users).filter(models.Users.id>2).delete() print(user) # 返回删除的条数 ------------------------------------------------------ #修改 user = session.query(models.Users).filter(models.Users.id ==2).update({'name':'wanglixing'}) print(user) # 返回修改的条数 #更改每个字段名,每个字段名后添加99 session.query(models.Users).filter(models.Users.id > 0).update({models.Users.name: models.Users.name + "099"}, synchronize_session=False) #更改字段值,涉及计算,年龄自加1 session.query(models.Users).filter(models.Users.id > 0).update({"age": models.Users.age + 1}, synchronize_session="evaluate") # 提交事务 session.commit() # 关闭session,将链接放回连接池 session.close()
# 常用操作
from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy import create_engine,text import models #创建连接池 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session session = Session() # ############# 查询补充 ############# #filter_by 关键字传参 ret = session.query(models.Users).filter_by(name='alex1099').all() print(ret) #[<models.Users object at 0x000001CF270D79E8>] #filter 过滤条件 ,表示与关系 ret1 = session.query(models.Users).filter(models.Users.id > 0, models.Users.name == 'alex1099').first() print(ret1.id) #1 #filter 加between区域筛选 ret2 = session.query(models.Users).filter(models.Users.id.between(1, 3), models.Users.name == 'wanglixing099').all() print(ret2) #[<models.Users object at 0x000001D982C572B0>] #filter 加_in 表示子集查询 ret = session.query(models.Users).filter(models.Users.id.in_([1,3,4])).all() print(ret) #[<models.Users object at 0x0000014FEC597B38>] #filter 加~取补集 ret = session.query(models.Users).filter(~models.Users.id.in_([1,3,4])).first() print(ret.id) #2 #子查询,查询users表中的id在字段为wang..的id中的人 ret = session.query(models.Users).filter(models.Users.id.in_(session.query(models.Users.id).filter_by(name='wanglixing099'))).all() print(ret) #类似于Q查询,可以进行条件组合 from sqlalchemy import and_, or_ ret = session.query(models.Users).filter(and_(models.Users.id < 3, models.Users.name == 'wanglixing099')).first() ret4 = session.query(models.Users).filter(or_(models.Users.id >3, models.Users.name == 'wanglixing099')).all() ret5 = session.query(models.Users).filter( or_( models.Users.id < 2, #,也表示and_ and_(models.Users.name == 'eric', models.Users.id > 3), models.Users.extra != "" )).first() print(ret.name,ret4,ret5.name) #wanglixing099 [<models.Users object at 0x000001E8163D3D68>] alex1099 ------------------------------------------------------ #通配符,从开头开始匹配 ret = session.query(models.Users).filter(models.Users.name.like('ale%')).first() ret6 = session.query(models.Users).filter(~models.Users.name.like('w%')).first() print(ret.name,ret6.name) #alex1099 alex1099 # 限制,切片 ret = session.query(models.Users)[0:2] ret2 = session.query(models.Users).filter(models.Users.id<2)[0:1] print(ret,ret2) #[<models.Users object at 0x00000277F2C15A58>, <models.Users object at 0x00000277F2C15AC8>] [<models.Users object at 0x00000277F2C15A58>] # 排序,desc降序,asc升序 ret = session.query(models.Users).order_by(models.Users.name.desc()).all() ret7 = session.query(models.Users).order_by(models.Users.name.desc(), models.Users.id.asc()).all() print(ret,ret7) ------------------------------------------------ # 分组 from sqlalchemy.sql import func ret = session.query(models.Users.id, func.count('*').label('id_count')).group_by(models.Users.id).all() print(ret) #[(1, 1), (2, 1)] # 生成子句,等同于(select user_id ... group_by user_id),通过id分组,生成(id, count),再通过subquery生成子句 ret = session.query(models.Users.id, func.count('*').label('id_count')).group_by(models.Users.id).subquery() print(ret.c.id_count) #anon_1.id_count # 联接子句,注意子句中需要使用c来调用字段内容,筛选出user.id 为ret的id右连接ret,取出筛选出的(name,出现的个数) ret2 = session.query(models.Users.name, ret.c.id_count).outerjoin(ret, models.Users.id==ret.c.id).all() print(ret2) #[('alex1099', 1), ('wanglixing099', 1)] # 根据Users的备注字段进行分组 ret10 = session.query(models.Users).group_by(models.Users.extra).all() #聚合函数 根据users表的user分组,然后分别求出每一组id的最大值/求和/最小值 ret9 = session.query( func.max(models.Users.id), func.sum(models.Users.id), func.min(models.Users.id)).group_by(models.Users.name).all() # 先以用户名分组,求出每组的最大值/最小值/求和值,最后再过滤出最小值大于2的 ret8 = session.query( func.max(models.Users.id), func.sum(models.Users.id), func.min(models.Users.id)).group_by(models.Users.name).having(func.min(models.Users.id) == 1).all() print(ret8,ret9,ret10) #[(1, Decimal('1'), 1)] [(1, Decimal('1'), 1), (2, Decimal('2'), 2)] [<models.Users object at 0x000001C30E0D7A20>] ----------------------------------------------- # 连表,连表条件,如果两张表存在ForeignKey,就以ForeignKey关联; 如果没有ForeignKey就以主键关联,但是可以自定义 # 查找User表和Hobby表的连表,过滤出id相同的 ret = session.query(models.Users, models.Hobby).filter(models.Users.id == models.Hobby.id).all() #如果内连接/外连接没有Foreignkey指定,那么我们可以自己指定,且支持and_条件组合 ret1 = session.query(models.Person).join(models.Hobby,models.Person.nid==models.Hobby.id).all() # 内链接inner join, Hobby主动连接Person ret1 = session.query(models.Person).join(models.Hobby).all() # 只有左连接,isouter=True,就是一个外连接,是Person外联Hobby,也就是说Hobby的一方是少的一方,Person是多的一方 ret2 = session.query(models.Hobby).join(models.Person, isouter=True).all() print(ret,ret,ret2) #[(<models.Users object at 0x0000020E0041E048>, <models.Hobby object at 0x0000020E0041E0B8>)] [(<models.Users object at 0x0000020E0041E048>, <models.Hobby object at 0x0000020E0041E0B8>)] [<models.Hobby object at 0x0000020E0041E0B8>] ---------------------------------------------- #原生SQL # 查询 cursor = session.execute('select * from users') result = cursor.fetchall() print(result) # 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'}) print(cursor.lastrowid) # 提交事务 session.commit() # 关闭session,将链接放回连接池 session.close()
# 一对多的正向/方向查询操作
# 查询/添加
from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy import create_engine,text import models #创建连接池 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session session = Session() # ############# 连表查询 ############# #添加Hobby记录 # session.add_all([ # models.Hobby(caption='菇凉'), # models.Hobby(caption='篮球') # ]) #创建Person记录 # session.add_all([ # models.Person(name='小伍',hobby_id=1), # models.Person(name='小王',hobby_id=2), # models.Person(name='小李',hobby_id=2), # models.Person(name='小刘',hobby_id=2), # ]) #查所有的用户 person_list = session.query(models.Person).all() for row in person_list: print(row.name,row.hobby_id) #小伍 1,小王 2 ... #现在查询对应的爱好的名字? #方式一:连表取数据 person_list = session.query(models.Person.name,models.Hobby.caption.label('爱好名')).join(models.Hobby,isouter=True).all() for row in person_list: print(row.name,row.爱好名 ) #小伍 Rap,小王 菇凉... #方式二:通过再表中生成一个类似于related属性的字段,用于写在一的一方,通过它来找关联它的多的对象(.) ''' class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) hobby_id = Column(Integer, ForeignKey("hobby.id")) # 与生成表结构无关,仅用于查询方便,关联表的类名,backref='pers',通过backref可以是Hobby反向找到Person hobby = relationship("Hobby", backref='pers') ''' # 正向:通过Hobby自定义字段多找一 (本质上是一对多关系Hobby为一和Person为多,多个person对应一个hobby) person_list = session.query(models.Person).all() for row in person_list: # 通过定义的hobby字段可以获取和这条相关的Hobby的对象 print(row.name,row.hobby.caption) #小伍 1,小王 2 ... #反向: 通过backref是一对多, 喜欢菇凉的Person obj = session.query(models.Hobby).filter(models.Hobby.id==2).first() persons = obj.pers #pers是backref的值 print(persons) #[<models.Person object at 0x0000020419A66080>, ...] ----------------------------------------- #如何给关联的表添加数据? #正向添加:通过Hobby字段,绑定数据, #方式一:原始方法, 添加一条数据给Hobby,同时绑定给张九,最后add person = models.Person(name='张九', hobby=models.Hobby(caption='妹纸')) #如果hobby已经有了就使用hobby_id赋值,没有的话就使用hobby=models...创建 session.add(person) #方式二:通过 backref 反向找到管理的所有多条记录的表,增加两条记录 hb = models.Hobby(caption='人妖') #这种情况:当你要添加一条数据,并绑定给新用户,但是你又不知道这条数据的id,或者该id还没有生成 hb.pers = [models.Person(name='文飞'), models.Person(name='博雅')] session.add(hb) # 提交事务 session.commit() # 关闭session,将链接放回连接池 session.close()
# 多对多的正/反向操作
# 查询/新增
from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy import create_engine,text import models #创建连接池 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session session = Session() # ############# 多对多连表查询 ############# #创建多对多的表 # session.add_all([ # models.Server(hostname='c1.com'), # models.Server(hostname='c2.com'), # models.Group(name='A组'), # models.Group(name='B组'), # ]) #如何使Server/Group两张表多对多? #原始方法:填写第三张表数据 s2g = models.Server2Group(server_id=1, group_id=1) #生成第三张表,里面右server_id/group_id,用于记录关系 session.add(s2g) #使用relationship正向查询 ''' class Group(Base): #写在任意一方即可 __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) # 与生成表结构无关,仅用于查询方便,secondary是第三张表,Server是管理的类名 servers = relationship('Server', secondary='server2group', backref='groups') ''' gp = models.Group(name='C组') #获取正向对象 gp.servers = [models.Server(hostname='c3.com'),models.Server(hostname='c4.com')] #通过servers字段,获取到关联的多表进行查询 session.add(gp) session.commit() #反向:通过backref的方式创建另一张表的额数据 ser = models.Server(hostname='c6.com') #sever创建对象 ser.groups = [models.Group(name='F组'),models.Group(name='G组')] #group对象创建 session.add(ser) #第三张表创建,同时创建三张表 #正向查询 v = session.query(models.Group).first() #找到Group对象的第一个 print(v.name) print(v.servers) #使用servers自定义字段,可以查询到与之对应的多的一方的数据 #反向查询 v = session.query(models.Server).first() print(v.hostname) print(v.groups) # 通过backref的值反向查询对应的值 # 提交事务 session.commit() # 关闭session,将链接放回连接池 session.close()
# 子查询
from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy import create_engine,text import models #创建连接池 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session session = Session() # ############# 关联子查询 ############# # 以前学过的子查询: select * from tb id in [select id from xxx]; # 问题1:在学生表中 取出每个学生的id、姓名、以及xxx表的最大score的分数? # select id, name, (select max(score) in xxx) as maxsocre from tb # 问题2:查询每个学生的平均成绩 #select id, name, (select avg(score) in from 成绩表 where 成绩表.sid = 学生表.id) as maxsocre from 学生表 from sqlalchemy.sql import text, func # 关联子查询 subqry = session.query(func.count(models.Server.id).label("sid")).filter(models.Server.id == models.Group.id).correlate(models.Group).as_scalar() result = session.query(models.Group.name, subqry) """ SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid FROM server WHERE server.id = `group`.id) AS anon_1 FROM `group` """ # 原生SQL """ # 查询 cursor = session.execute('select * from users') result = cursor.fetchall() # 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'}) session.commit() print(cursor.lastrowid) """ # 提交事务 session.commit() # 关闭session,将链接放回连接池 session.close()