10月08, 2019

sqlalchemy执行原生SQL

用 Binds 操作多个数据库

配置:

SQLALCHEMY_DATABASE_URI = 'postgres://localhost/main'
SQLALCHEMY_BINDS = {
    'users':        'mysqldb://localhost/users',
    'appmeta':      'sqlite:////path/to/appmeta.db'
}

create_all() 和 drop_all() 方法默认作用 于声明的所有 bind ,包括默认的。这个行为可以通过提供 bind 参数来定制。它 可以是单个 bind 名 、'all' 指向所有 binds 或一个 bind 名的列表。默 认的 bind ( SQLALCHEMY_DATABASE_URI )名为 None

封装用于执行原声SQL的工具类

from contextlib import contextmanager

from sqlalchemy import text
from sqlalchemy.orm import sessionmaker

from app.utils.baseModel import db


class DbUtil:

    def get_session(self, bind='admin'):
        engine = db.get_engine(app=db.get_app(), bind=bind)
        Session = sessionmaker(bind=engine)
        return Session()

    @contextmanager
    def auto_commit(self, bind='admin'):

        try:
            self.session = self.get_session(bind=bind)
            yield
            self.session.commit()
        except Exception as e:
            self.session.rollback()
            raise e
        finally:
            self.session.close()

    """
       用于执行一次查询的数据库查询操作封装
    """
    def Update(self,sql='', params={}):
        session = self.session
        if sql:
            stmt = text(sql)
            if params:
                session.execute(stmt, params)
            else:
                session.execute(stmt)
        else:
            print("SQL为空!")

    def Select(self, sql='', params={}):
        resList = []
        session = self.session
        if sql:
            stmt = text(sql)
            if params:
                for record in session.execute(stmt, params):
                    #print(type(record))
                    rowDict = dict((zip(record.keys(), record)))
                    resList.append(rowDict)
                print(resList)
                return resList
            else:
                for record in session.execute(stmt):
                    #print(type(record))
                    rowDict = dict((zip(record.keys(), record)))
                    resList.append(rowDict)
                print(resList)
                return resList
        else:
            print("SQL为空!")

    def __del__(self):
        self.session.close()

调用工具类执行查询及更新操作

def get_userrole_by_id1():

        try:
            dbUtil = DbUtil()
            with dbUtil.auto_commit(bind='admin'):

                #执行一次查询
                sql = """
                select r.* from admin.user u,admin.user_role ur,admin.role r 
                where u.id=ur.user_id and ur.role_id=r.id and u.id=:user_id
                """
                res = dbUtil.Select(sql=sql,params={"user_id": 12})

                #执行一次更新操作
                sql = """
                update role set name='系统管理员' where id=:role_id
                """
                dbUtil.Update(sql, {"role_id":1})

                #执行一次查询操作
                sql = """
                        select r.* from admin.user u,admin.user_role ur,admin.role r 
                        where u.id=ur.user_id and ur.role_id=r.id and u.id=:user_id
                """
                res = dbUtil.Select(sql=sql, params={"user_id": 12})
        except Exception as e:
            print(e)

        return "OK"

其它配置

#如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,这对调试有用。(打印sql语句)
SQLALCHEMY_ECHO=True

#如果为True,则连接池将记录信息输出,例如连接失效时以及连接被回收到默认日志处理程序时,默认sys.stdout为输出。如果设置为字符串 "debug",则日志记录将包括池检出和签入。使用标准Python logging模块也可以直接控制日志记录 。
SQLALCHEMY_ECHO_POOL=True

#数据库连接池的大小。默认是引擎默认值
SQLALCHEMY_POOL_SIZE = 20

#设定连接池的连接超时时间。默认是 10 。
SQLALCHEMY_POOL_TIMEOUT= 10

本文链接:http://57km.cc/post/sqlalchemy execute sql.html

-- EOF --

Comments