install:: pip install sqlalchemy import:: import sqlalchemy github:: sqlalchemy/sqlalchemy doc::


FlaskSQLAlchemy

连接

  • MySQL: mysql://username:password@host/databasename

属性

  • primary_key: 主键
  • unique: 唯一
  • index: 索引
  • nullable: 空值
  • default: 默认值,发生 insert 语句时调用
  • onupdated: 发生 update 语句时调用
  • comment: 注释

数据类型

SQLALchemy 类型MySQL 类型Python 类型描述
SmallIntegersmallintint数字,2字节:-32768 到 32767
Integerintint数字,4字节:-2147483648 到 2147483647
BigIntegerbigintint/long数字,8字节:9223372036854775807
Floatfloatfloat浮点数:4字节
Numericdecimaldecimal.Decimal定点数,M + 2字节:Numeric(M, D)
Stringvarcharstr字符串
Timetimedate.time时间
Datedatedatetime.date日期
DateTimedatetimedatetime.datetime日期时间
Booleantinyintbool布尔值
JSONJSONlist列表/JSON
Text长文本

过滤方法

  • filter():使用指定过滤规则,并返回查询对象
  • filter_by():使用指定过滤规则(以关键字表达式形式)
  • order_by():根据指定条件对记录进行排序
  • group_by():指定条件对记录进行分组

查询方法

  • all():返回包含所有查询记录的列表
  • first():返回查询的第一条记录,如未找到,返回 None
  • get(id):传入主键作为参数,如未找到,返回 None
  • count():返回查询结果的数量
  • first_or_404():返回查询结果的第一条记录,如果未找到,返回 404
  • get_or_404(id):传入主键作为参数,如未找到,返回 404
  • paginate():返回第一个 Pagination 对象,可对记录进行分页处理

使用

使用 UUID 作为主键

id = Column(String(36),  primary_key=True, default=lambda: str(uuid.uuid4()))

创建

# 添加数据到会话
db.seesion.add(User(name='immwind')) # User 为表类
# 提交到数据库
db.session.commit()

查询

# User 为对应模型类
 
# 获取 User 第一条记录
User.query.first()
User.query.first().name # 读取 name 的值
 
# User 所有记录
User.query.all() # 返回:[<User 1>, <User 2>]
 
# 指定数量
User.query.limit(42).all()
 
# 获取指定键值
User.query.get(42) # 返回 <User 1>
 
# 过滤
User.query.filter_by(name='immwind').first() # 返回 name 等于 immwind 的第一条记录
User.query.fileter(User.name='immwind').first() # 同上
 
# 获取记录数
User.query.count()
self.session.query(<Table>).count()

更新

# 需先读取数据
user = User.query.get(1)
user.name = 'python' # 更新数据

删除

# 先获取数据
user = User.query.get(1)
db.session.delete(user)

2.0

select

from sqlalchemy import func, select
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
 
class DB():
    """数据库操作"""
 
    def __init__(self):
        self.engine = create_engine(f'mysql+mysqlconnector://root:{urlquote("password")}@0.0.0.0:3306/new_highway', pool_size=100)
        f
        self.session = Session(bind=self.engine, future=True)
 
    def get_table_all(self, table):
        """获取指定数据库所有值"""
        with self.session.begin():
        #     # result = session.query(LineNumberInfo).all()
            stmt = select(LineNumberInfo.line_number)
            results = self.session.execute(stmt).scalars().all()
 
        print(len(results))
        for result in results:
            print(result.to_dict())
  • .scalar(): 获取单个

  • .scalars() 获取所有

  • 以字典形式返回所有数据

def get_all(self, table):
    result = session.execute(User.__table__.select())
    for row in result:
        print(dict(row))

示例

 

时间自动更新

from sqlalchemy import func
 
update = Column(DateTime, default=func.now(), onupdate=func.now())

报错

Authentication plugin ‘caching_sha2_password’ is not supported

pip 安装 mysql-connector-python 解决,而不是 mysql-connector-python

参考