# SQLAlchmey

**目标:**类/对象 --> 转换为 SQL --> pymysql/MySQLdb --> 再在数据库中执行

​ SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

# 组成部分:
  • Engine,框架的引擎
  • Connection Pooling ,数据库连接池,去获取一个链接线程
  • Dialect,选择连接数据库的DB API种类,记录选择pymysql、MySQLdb模块
  • Schema/Types,架构和类型
  • SQL Exprression Language,SQL表达式语言

​ SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:(即Dialect的配置文件的写法)

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
    
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
# 基本使用
  • 不常见版本,偏向于底层

    import time
    import threading
    import sqlalchemy
    from sqlalchemy import create_engine
    from sqlalchemy.engine.base import Engine
     
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
     
    def task(arg):
        conn = engine.raw_connection()
        cursor = conn.cursor()
        cursor.execute(
            "select * from t1"
        )
        result = cursor.fetchall()
        cursor.close()
        conn.close()
     
     #20个同时来,5个5个执行
    for i in range(20):
        t = threading.Thread(target=task, args=(i,))
        t.start()
    

# ORM操作

  • # models.py 用于生成表的类
    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    
    Base = declarative_base()
    
    class Users(Base):
        __tablename__ = 'users' #生成的数据库表名
    	
        id = Column(Integer, primary_key=True) #主键
        name = Column(String(32), index=True, nullable=False) #name字段,创建索引,不能为空
    
        __table_args__ = (
            # UniqueConstraint('id', 'name', name='uix_id_name'),
            # Index('ix_id_name', 'name', 'email'),
        )
    
    def init_db():
        """
        根据类创建数据库表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    	
        #读取所有的表生成表
        Base.metadata.create_all(engine)
    
    def drop_db():
        """
        根据类删除数据库表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    	
        #删除表
        Base.metadata.drop_all(engine)
    
    if __name__ == '__main__':
        drop_db()
        init_db()
    
  • # 单表/多表的增删
    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    from sqlalchemy.orm import relationship
    
    Base = declarative_base()
    
    
     # ##################### 单表示例 #########################
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True)
        age = Column(Integer, default=18)
        email = Column(String(32), unique=True)
        ctime = Column(DateTime, default=datetime.datetime.now)
        extra = Column(Text, nullable=True)
    
        __table_args__ = (
            # UniqueConstraint('id', 'name', name='uix_id_name'),
            # Index('ix_id_name', 'name', 'extra'),
        )
    
    class Hosts(Base):
        __tablename__ = 'hosts'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True)
        ctime = Column(DateTime, default=datetime.datetime.now)
    
    
     # ##################### 一对多示例 #########################
    class Hobby(Base):
        __tablename__ = 'hobby'
        id = Column(Integer, primary_key=True)
        caption = Column(String(50), default='篮球')
    
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        hobby_id = Column(Integer, ForeignKey("hobby.id"))
    
        # 与生成表结构无关,仅用于查询方便
        hobby = relationship("Hobby", backref='pers')
    
    
     # ##################### 多对多示例 #########################
    
    class Server2Group(Base):
        __tablename__ = 'server2group'
        id = Column(Integer, primary_key=True, autoincrement=True)
        server_id = Column(Integer, ForeignKey('server.id'))
        group_id = Column(Integer, ForeignKey('group.id'))
    
    
    class Group(Base):
        __tablename__ = 'group'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
    
        # 与生成表结构无关,仅用于查询方便
        servers = relationship('Server', secondary='server2group', backref='groups')
    
    
    class Server(Base):
        __tablename__ = 'server'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
    
    
    def init_db():
        """
        根据类创建数据库表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    def drop_db():
        """
        根据类删除数据库表
        :return: 
        """
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.drop_all(engine)
    
    
    if __name__ == '__main__':
        drop_db()
        init_db()
    

# ORM操作

  • # 基本使用
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
      
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
      
     # 每次执行数据库操作时,都需要创建一个session,相当于connection,因为本质就是一次数据库url访问,为了保持唯一性
    session = Session()
      
     # ############# 执行ORM操作 #############
    obj1 = Users(name="alex1")
    session.add(obj1)
      
    # 提交事务
    session.commit()
    # 关闭session
    session.close()
    

# 创建表

  • # 单表
    import datetime
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    
    Base = declarative_base()
    
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)  #主键字段
        name = Column(String(32), index=True, nullable=False) #添加索引,不能为空
        email = Column(String(32), unique=True)
        ctime = Column(DateTime, default=datetime.datetime.now) #datetime.datetime.now 本能加括号
        extra = Column(Text, nullable=True)
    
        __table_args__ = (
            UniqueConstraint('id', 'name', name='uix_id_name'), #id和name联合唯一索引
            Index('ix_id_name', 'name', 'email'),
        )
    
  • # 一对多
    class Hobby(Base):
        __tablename__ = 'hobby'
        id = Column(Integer, primary_key=True)
        caption = Column(String(50), default='篮球')
    
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        hobby_id = Column(Integer, ForeignKey("hobby.id")) #hobby是__tablename__
    
        # 与生成表结构无关,仅用于查询方便
        hobby = relationship("Hobby", backref='pers')
    
    
    
  • # 多对多
     #不能自动创建第三张表,只能自己创建
    class Server2Group(Base): #第三张表
        __tablename__ = 'server2group'
        id = Column(Integer, primary_key=True, autoincrement=True)
       	girl_id = Column(Integer, ForeignKey('girl.id'))
        boy_id = Column(Integer, ForeignKey('boy.id'))
    
    
    class Girl(Base):
        __tablename__ = 'girl'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
    
        # 与生成表结构无关,仅用于查询方便
        res = relationship('Boy', secondary='Boy2Girl', backref='girl')
    
    
    class Boy(Base):
        __tablename__ = 'boy'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(64), unique=True, nullable=False)
    
    

# 操作表

上面的表,通过init_db函数即可创建表,都写在models.py中,那么下面我们将在test.py函数中操作表

  • 简单的增加记录的操作

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
    
    #创建连接池
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()
    
    # ############# 执行ORM操作 #############
    obj1 = Users(name="alex1",email='wang@qq.com')
    session.add(obj1)
    
    # 提交事务
    session.commit()
    # 关闭session,将链接放回连接池
    session.close()
    
  • 基于scoped_session实现线程安全,用户与session(上例类式,并多出一些方法,但是不是通过继承关系,而是重写)

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Users
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
     #该方法返回的一个Session类
    Session = sessionmaker(bind=engine)
    
    """
    # 线程安全,基于本地线程实现每个线程用同一个session
    # 特殊的:scoped_session中有原来方法的Session中的一下方法:
    
    public_methods = (
        '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
        'close', 'commit', 'connection', 'delete', 'execute', 'expire',
        'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
        'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
        'bulk_update_mappings',
        'merge', 'query', 'refresh', 'rollback',
        'scalar'
    )
    """
     #执行scoped_session类的实例化,init函数
    session = scoped_session(Session)
    
    # ############# 执行ORM操作 #############
    obj1 = Users(name="alex1")
    # 本质执行的do函数:add 
    session.add(obj1)
    
    
    # 提交事务
    session.commit()
    # 关闭session
    session.close()
    

    scoped_session 能调用 add 等session类的原因?源码部分

    __all__ = ["scoped_session"]  #只能导入小写的出去 
     #第一次看见小写的类
    class scoped_session(object):
        # session_factory 就是传进来的Session类
        def __init__(self, session_factory, scopefunc=None):
    
            self.session_factory = session_factory
    
            if scopefunc:
                self.registry = ScopedRegistry(session_factory, scopefunc)
            else:
                #将Session类传入,触发ThreadLocalRegistry类,封装了原来的Session类、threading.local类
                self.registry = ThreadLocalRegistry(session_factory)
    ...
                
    def instrument(name):
       	#self是scoped_session类对象
        def do(self, *args, **kwargs):
            #self.registry是我们创建的ThreadLocalRegistry类,加括号,触发类的__call__方法,返回的是原来的Session类的对象,当然有add方法,并将此方法放在了threading.Local中
            return getattr(self.registry(), name)(*args, **kwargs)
        #返回一个函数名:
        #add:do(add)
        return do
    
     #循环所有的Session的公用方法
    for meth in Session.public_methods:
        # scoped_session就是类,使用闭包,传入属性
        setattr(scoped_session, meth, instrument(meth))
    
    
    def makeprop(name):
        def set_(self, attr):
            setattr(self.registry(), name, attr)
    
        def get(self):
            return getattr(self.registry(), name)
    
        return property(get, set_)
    
    
    for prop in (
        "bind",
        "dirty",
        "deleted",
        "new",
        "identity_map",
        "is_active",
        "autoflush",
        "no_autoflush",
        "info",
        "autocommit",
    ):
        setattr(scoped_session, prop, makeprop(prop))
    

    Session.public_methods 是Session类,循环public_methods属性,及那个其设置为自己的属性

     #1、这是Session类的public_methods属性
        public_methods = (
            "__contains__",
            "__iter__",
            "add",
            "add_all",
            "begin",
            "begin_nested",
            "close",
            "commit",
    ...
        )
     
     #2、打开ThreadLocalRegistry类的init函数
        def __init__(self, createfunc):
            #传入的是原来的Session类,这里并没有执行
            self.createfunc = createfunc
            #这就是我们以前看见的线程安全的threading.local(),给每个线程分配空间
            self.registry = threading.local()
            
        def __call__(self):
            try:
                #最初没有值
                return self.registry.value
            except AttributeError:
                #原来的Session类实例化,其中包括了add方法
                val = self.registry.value = self.createfunc()
                return val
       
    

# 增删改查全家桶

  • # 基本操作增删改查
    from sqlalchemy.orm import sessionmaker, scoped_session
    from sqlalchemy import create_engine,text
    import  models
    
    #创建连接池
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()  #type: sqlalchemy
    
    
    # ############# 执行ORM操作 #############
    
    # 添加单条记录
    obj1 = models.Users(name="alex12",email='wang1@qq.com')
    session.add(obj1)
    
    #批量增加
    session.add_all([
        models.Users(name="alex13", email='wang3@qq.com'),
        models.Users(name="alex14", email='wang4@qq.com'),
    ])
    
    ---------------------------------------------------------
    #查所有,查询可以不用commit
    user_list = session.query(models.Users).all()
    print(user_list) # 拿到所有记录对象
    for row in user_list:
        print(row.name)
    
    #过滤查询
    user = session.query(models.Users).filter(models.Users.id>2)
    for row in user:
        print(row.name)
    
    #给字段起别名,可以通过别名查询, 返回的额是一个列表中, 对每个符合要求的字段
    r2 = session.query(models.Users.name.label('xx'), models.Users.age).all()
    print(r2) #[('alex1099', 19), ('wanglixing099', 19)]
    for item in r2:
        print(item.xx)  #alex1099 ...
    
    #filter 参数传的是表达式
    r3 = session.query(models.Users).filter(models.Users.name == "alex1099").all()
    print(r3) #[<models.Users object at 0x00000109B31DBB00>]
    
    #filter_by 传入的关键字参数
    r4 = session.query(models.Users).filter_by(name='alex1099').all()
    r5 = session.query(models.Users).filter_by(name='alex1099').first()
    print(r4,r5)   #[<models.Users object at 0x0000015538196B00>] <models.Users object at 0x0000015538196B00>
    
    #复杂一点的查询语句,通过 (xxx:变量).params() 进行传参, id<2,name= 的记录
    r6 = session.query(models.Users).filter(text("id<:value and name=:name")).params(value=224, name='alex1099').order_by(models.Users.id).all()
    print(r6)  #[<models.Users object at 0x0000021D46C92B38>]
    
    #使用原生自定义sql语句,类似于Djnago的raw函数
    r7 = session.query(models.Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='alex1099').all()
    print(r7) #[<models.Users object at 0x0000018378A22B70>]
    
    -------------------------------------------------------
    #删除
    user = session.query(models.Users).filter(models.Users.id>2).delete()
    print(user)  # 返回删除的条数
    
    ------------------------------------------------------
    #修改
    user = session.query(models.Users).filter(models.Users.id ==2).update({'name':'wanglixing'})
    print(user) # 返回修改的条数
    
    #更改每个字段名,每个字段名后添加99
    session.query(models.Users).filter(models.Users.id > 0).update({models.Users.name: models.Users.name + "099"}, synchronize_session=False)
    #更改字段值,涉及计算,年龄自加1
    session.query(models.Users).filter(models.Users.id > 0).update({"age": models.Users.age + 1}, synchronize_session="evaluate")
    
    
    # 提交事务
    session.commit()
    # 关闭session,将链接放回连接池
    session.close()
    
  • # 常用操作
    from sqlalchemy.orm import sessionmaker, scoped_session
    from sqlalchemy import create_engine,text
    
    import  models
    
    #创建连接池
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()
    
    
    # ############# 查询补充 #############
    
    #filter_by 关键字传参
    ret = session.query(models.Users).filter_by(name='alex1099').all()
    print(ret) #[<models.Users object at 0x000001CF270D79E8>]
    
    #filter 过滤条件 ,表示与关系
    ret1 = session.query(models.Users).filter(models.Users.id > 0, models.Users.name == 'alex1099').first()
    print(ret1.id) #1
    
    #filter 加between区域筛选
    ret2 = session.query(models.Users).filter(models.Users.id.between(1, 3), models.Users.name == 'wanglixing099').all()
    print(ret2) #[<models.Users object at 0x000001D982C572B0>]
    
    #filter 加_in 表示子集查询
    ret = session.query(models.Users).filter(models.Users.id.in_([1,3,4])).all()
    print(ret)  #[<models.Users object at 0x0000014FEC597B38>]
    
    #filter 加~取补集
    ret = session.query(models.Users).filter(~models.Users.id.in_([1,3,4])).first()
    print(ret.id) #2
    
    #子查询,查询users表中的id在字段为wang..的id中的人
    ret = session.query(models.Users).filter(models.Users.id.in_(session.query(models.Users.id).filter_by(name='wanglixing099'))).all()
    print(ret)
    
    #类似于Q查询,可以进行条件组合
    from sqlalchemy import and_, or_
    ret = session.query(models.Users).filter(and_(models.Users.id < 3, models.Users.name == 'wanglixing099')).first()
    ret4 = session.query(models.Users).filter(or_(models.Users.id >3, models.Users.name == 'wanglixing099')).all()
    ret5 = session.query(models.Users).filter(
        or_(
            models.Users.id < 2,
            #,也表示and_
            and_(models.Users.name == 'eric', models.Users.id > 3),
            models.Users.extra != ""
        )).first()
    print(ret.name,ret4,ret5.name) #wanglixing099 [<models.Users object at 0x000001E8163D3D68>] alex1099
    
    ------------------------------------------------------
    #通配符,从开头开始匹配
    ret = session.query(models.Users).filter(models.Users.name.like('ale%')).first()
    ret6 = session.query(models.Users).filter(~models.Users.name.like('w%')).first()
    print(ret.name,ret6.name)  #alex1099 alex1099
    
    # 限制,切片
    ret = session.query(models.Users)[0:2]
    ret2 = session.query(models.Users).filter(models.Users.id<2)[0:1]
    print(ret,ret2) #[<models.Users object at 0x00000277F2C15A58>, <models.Users object at 0x00000277F2C15AC8>] [<models.Users object at 0x00000277F2C15A58>]
    
    # 排序,desc降序,asc升序
    ret = session.query(models.Users).order_by(models.Users.name.desc()).all()
    ret7 = session.query(models.Users).order_by(models.Users.name.desc(), models.Users.id.asc()).all()
    print(ret,ret7)
    
    ------------------------------------------------
    # 分组
    from sqlalchemy.sql import func
    
    ret = session.query(models.Users.id, func.count('*').label('id_count')).group_by(models.Users.id).all()
    print(ret)  #[(1, 1), (2, 1)]
    
    # 生成子句,等同于(select user_id ... group_by user_id),通过id分组,生成(id, count),再通过subquery生成子句
    ret = session.query(models.Users.id, func.count('*').label('id_count')).group_by(models.Users.id).subquery()
    print(ret.c.id_count)  #anon_1.id_count
    # 联接子句,注意子句中需要使用c来调用字段内容,筛选出user.id 为ret的id右连接ret,取出筛选出的(name,出现的个数)
    ret2 = session.query(models.Users.name, ret.c.id_count).outerjoin(ret, models.Users.id==ret.c.id).all()
    print(ret2) #[('alex1099', 1), ('wanglixing099', 1)]
    
    
    # 根据Users的备注字段进行分组
    ret10 = session.query(models.Users).group_by(models.Users.extra).all()
    
    
    #聚合函数 根据users表的user分组,然后分别求出每一组id的最大值/求和/最小值
    ret9 = session.query(
        func.max(models.Users.id),
        func.sum(models.Users.id),
        func.min(models.Users.id)).group_by(models.Users.name).all()
    
    # 先以用户名分组,求出每组的最大值/最小值/求和值,最后再过滤出最小值大于2的
    ret8 = session.query(
        func.max(models.Users.id),
        func.sum(models.Users.id),
        func.min(models.Users.id)).group_by(models.Users.name).having(func.min(models.Users.id) == 1).all()
    
    print(ret8,ret9,ret10)  #[(1, Decimal('1'), 1)] [(1, Decimal('1'), 1), (2, Decimal('2'), 2)] [<models.Users object at 0x000001C30E0D7A20>]
    
    -----------------------------------------------
    # 连表,连表条件,如果两张表存在ForeignKey,就以ForeignKey关联; 如果没有ForeignKey就以主键关联,但是可以自定义
    # 查找User表和Hobby表的连表,过滤出id相同的
    ret = session.query(models.Users, models.Hobby).filter(models.Users.id == models.Hobby.id).all()
    
    #如果内连接/外连接没有Foreignkey指定,那么我们可以自己指定,且支持and_条件组合
    ret1 = session.query(models.Person).join(models.Hobby,models.Person.nid==models.Hobby.id).all()
    
    # 内链接inner join, Hobby主动连接Person
    ret1 = session.query(models.Person).join(models.Hobby).all()
    
    # 只有左连接,isouter=True,就是一个外连接,是Person外联Hobby,也就是说Hobby的一方是少的一方,Person是多的一方
    ret2 = session.query(models.Hobby).join(models.Person, isouter=True).all()
    print(ret,ret,ret2) #[(<models.Users object at 0x0000020E0041E048>, <models.Hobby object at 0x0000020E0041E0B8>)] [(<models.Users object at 0x0000020E0041E048>, <models.Hobby object at 0x0000020E0041E0B8>)] [<models.Hobby object at 0x0000020E0041E0B8>]
    
    ----------------------------------------------
    #原生SQL
    
    # 查询
    cursor = session.execute('select * from users')
    result = cursor.fetchall()
    print(result)
    
    # 添加
    cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
    print(cursor.lastrowid)
    
    # 提交事务
    session.commit()
    # 关闭session,将链接放回连接池
    session.close()
    

# 一对多的正向/方向查询操作

  • # 查询/添加
    from sqlalchemy.orm import sessionmaker, scoped_session
    from sqlalchemy import create_engine,text
    
    import  models
    
    #创建连接池
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()
    
    # ############# 连表查询 #############
    #添加Hobby记录
    # session.add_all([
    #     models.Hobby(caption='菇凉'),
    #     models.Hobby(caption='篮球')
    # ])
    
    #创建Person记录
    # session.add_all([
    #     models.Person(name='小伍',hobby_id=1),
    #     models.Person(name='小王',hobby_id=2),
    #     models.Person(name='小李',hobby_id=2),
    #     models.Person(name='小刘',hobby_id=2),
    # ])
    
    #查所有的用户
     person_list = session.query(models.Person).all()
     for row in person_list:
         print(row.name,row.hobby_id)  #小伍 1,小王 2 ...
    
    #现在查询对应的爱好的名字?
    #方式一:连表取数据
     person_list = session.query(models.Person.name,models.Hobby.caption.label('爱好名')).join(models.Hobby,isouter=True).all()
     for row in person_list:
         print(row.name,row.爱好名 )  #小伍 Rap,小王 菇凉...
    
    #方式二:通过再表中生成一个类似于related属性的字段,用于写在一的一方,通过它来找关联它的多的对象(.)
    '''
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        hobby_id = Column(Integer, ForeignKey("hobby.id"))
    
        # 与生成表结构无关,仅用于查询方便,关联表的类名,backref='pers',通过backref可以是Hobby反向找到Person
        hobby = relationship("Hobby", backref='pers')
    '''
    # 正向:通过Hobby自定义字段多找一 (本质上是一对多关系Hobby为一和Person为多,多个person对应一个hobby)
    person_list = session.query(models.Person).all()
    for row in person_list:
        # 通过定义的hobby字段可以获取和这条相关的Hobby的对象
        print(row.name,row.hobby.caption)  #小伍 1,小王 2 ...
    
    #反向: 通过backref是一对多, 喜欢菇凉的Person
    obj = session.query(models.Hobby).filter(models.Hobby.id==2).first()
    persons = obj.pers  #pers是backref的值
    print(persons) #[<models.Person object at 0x0000020419A66080>, ...]
    
    -----------------------------------------
    #如何给关联的表添加数据?
    #正向添加:通过Hobby字段,绑定数据,
    #方式一:原始方法, 添加一条数据给Hobby,同时绑定给张九,最后add
    person = models.Person(name='张九', hobby=models.Hobby(caption='妹纸')) #如果hobby已经有了就使用hobby_id赋值,没有的话就使用hobby=models...创建
    session.add(person)
    
    #方式二:通过 backref 反向找到管理的所有多条记录的表,增加两条记录
    hb = models.Hobby(caption='人妖')  #这种情况:当你要添加一条数据,并绑定给新用户,但是你又不知道这条数据的id,或者该id还没有生成
    hb.pers = [models.Person(name='文飞'),  models.Person(name='博雅')]
    session.add(hb)
    
    
    # 提交事务
    session.commit()
    # 关闭session,将链接放回连接池
    session.close()
    

# 多对多的正/反向操作

  • # 查询/新增
    from sqlalchemy.orm import sessionmaker, scoped_session
    from sqlalchemy import create_engine,text
    
    import  models
    
    #创建连接池
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()
    
    # ############# 多对多连表查询 #############
    
    #创建多对多的表
    # session.add_all([
    #     models.Server(hostname='c1.com'),
    #     models.Server(hostname='c2.com'),
    #     models.Group(name='A组'),
    #     models.Group(name='B组'),
    # ])
    
    #如何使Server/Group两张表多对多?
    #原始方法:填写第三张表数据
     s2g = models.Server2Group(server_id=1, group_id=1)  #生成第三张表,里面右server_id/group_id,用于记录关系
     session.add(s2g)
    
    #使用relationship正向查询
    '''
    class Group(Base): #写在任意一方即可
        __tablename__ = 'group'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
    
        # 与生成表结构无关,仅用于查询方便,secondary是第三张表,Server是管理的类名
        servers = relationship('Server', secondary='server2group', backref='groups')
    '''
    gp = models.Group(name='C组') #获取正向对象
    gp.servers = [models.Server(hostname='c3.com'),models.Server(hostname='c4.com')] #通过servers字段,获取到关联的多表进行查询
    session.add(gp)
    session.commit()
    
    
    #反向:通过backref的方式创建另一张表的额数据
    ser = models.Server(hostname='c6.com')  #sever创建对象
    ser.groups = [models.Group(name='F组'),models.Group(name='G组')]  #group对象创建
    session.add(ser) #第三张表创建,同时创建三张表
    
    
    #正向查询
    v = session.query(models.Group).first()  #找到Group对象的第一个
    print(v.name)
    print(v.servers)  #使用servers自定义字段,可以查询到与之对应的多的一方的数据
    
    #反向查询
    v = session.query(models.Server).first()
    print(v.hostname)
    print(v.groups)  # 通过backref的值反向查询对应的值
    
    # 提交事务
    session.commit()
    # 关闭session,将链接放回连接池
    session.close()
    
  • # 子查询
    from sqlalchemy.orm import sessionmaker, scoped_session
    from sqlalchemy import create_engine,text
    
    import  models
    
    #创建连接池
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()
    
    # ############# 关联子查询 #############
    
    # 以前学过的子查询: select * from tb id in [select id from xxx];
    # 问题1:在学生表中 取出每个学生的id、姓名、以及xxx表的最大score的分数?
    # select id, name, (select max(score) in xxx) as maxsocre from tb
    # 问题2:查询每个学生的平均成绩
    #select id, name, (select avg(score) in from 成绩表 where 成绩表.sid = 学生表.id) as maxsocre from 学生表
    
    from sqlalchemy.sql import text, func
    # 关联子查询
    subqry = session.query(func.count(models.Server.id).label("sid")).filter(models.Server.id == models.Group.id).correlate(models.Group).as_scalar()
    result = session.query(models.Group.name, subqry)
    """
    SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
    FROM server 
    WHERE server.id = `group`.id) AS anon_1 
    FROM `group`
    """
    
    # 原生SQL
    """
    # 查询
    cursor = session.execute('select * from users')
    result = cursor.fetchall()
    
    # 添加
    cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
    session.commit()
    print(cursor.lastrowid)
    """
    
    # 提交事务
    session.commit()
    # 关闭session,将链接放回连接池
    session.close()