avatar


7.FastAPI [3/3]

连接数据库

连接MySQL

整体步骤:

  1. 安装数据库驱动
  2. 创建项目并连接SQLAlchemy
  3. 建立SQLAlchemy数据库模型
  4. 建立Pydantic数据模型
  5. 实现数据操作
  6. 实现FastAPI请求函数七个步骤

安装数据库驱动

推荐使用PyMySQL,安装命令:

1
pip install PyMySQL

创建项目并连接SQLAlchemy

创建项目并连接SQLAlchemy

其中__init__.py的作用是将sql_app目录定义为一个Python包,我们可以在这个文件中实现一些初始化工作,以保证程序正常运行。示例代码:

1
2
3
4
5
6
# 初始化代码

# 导入数据库驱动
import pymysql
# 将数据库驱动注册为MySQLdb模式
pymysql.install_as_MySQLdb()

其中database.py的内容如下:

1
2
3
4
5
6
7
8
9
10
11
# 第一步,导入SQLAlchemy组件包
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 第二步,创建数据连接引擎
engine = create_engine("root:Mysql%4013@127.0.0.1:3306/cat")
# 第三步,创建本地会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 第四步,创建数据模型基础类
Base = declarative_base()

创建SQLAlchemy数据库模型

SQLAlchemy通过数据模型基础类的继承实现数据库模型的定义,并通过ROM映射把数据库表结构生成到数据库系统中。

其中,models.py文件用于定义ORM数据模型,主要用于生成数据库表,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 第一步,导入SQLAlchemy组件包
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
# 第二步,从database模块中导入基类Base
from .database import Base


# 声明User模型,继承自Base
class User(Base):
# 指定数据库中的表名
__tablename__ = "user"
# 定义类的属性,对应表中的字段
id = Column(Integer, primary_key=True, index=True)
email = Column(String(50), unique=True, index=True)
hashed_password = Column(String(50))
is_active = Column(Boolean, default=True)
# 定义一对多关系
books = relationship("Book", back_populates="owner")


# 声明Book模型,继承自Base类
class Book(Base):
# 指定数据库中对应的表名
__tablename__ = "book"
# 定义类的属性,对应表中的字段
id = Column(Integer, primary_key=True, index=True)
title = Column(String(50), index=True)
description = Column(String(200), index=True)
owner_id = Column(Integer, ForeignKey("user.id"))
# 定义关联
owner = relationship("User", back_populates="books")

上文代码中,导入所需的SQLAlchemy组件包,然后从文件databases.py中导入Base类,作为数据库模型的基类,最后定义两个数据库模型:UserBook。在数据模型User中,使用relationship函数定义一个多对多关系books,该关系指向另一个数据库模型Book。在数据库模型Book中也使用 relationship函数定义一个多对一关系owner,指向数据库模型User

创建Pydantic数据模型

用Pydantic实现的数据模型主要为了实现数据的读写操作,并提供API接口文档。为了避免将Pydantic模型与 SQLAlchemy模型混淆,这里将Pydantic模型定义写在文件schemas.py中,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 第一步,导入相关的模块
from typing import List, Optional
from pydantic import BaseModel


# 第二步,声明BookBase模型,从BaseModel继承
class BookBase(BaseModel):
# 第三步,声明模型的属性
title: str
description: Optional[str] = None


# 第四步,声明ItemCreate模型,从ItemBase继承
class BookCreate(BookBase):
pass


# 第五步,声明Item模型,从ItemBase继承
class Book(BookBase):
id: int
owner_id: int

# 第六步,配置项中起用orm模式
class Config:
orm_mode = True


# 第七步,同样的方式声明一组User模型
class UserBase(BaseModel):
email: str


class UserCreate(UserBase):
password: str


class User(UserBase):
id: int
is_active: bool
books: List[Book] = []

# 配置项中起用orm模式
class Config:
orm_mode = True

在上文代码中,首先导入相关的模块,然后从BaseModel类继承定义BookBase模型类,在模型中定义所需的字段:titledescription;再从 BookBase模型类继承,分别定义BookCreateBook模型类,在Book模型类内部的Conifg类中增加选项:orm_mode=True

再使用上述相同的方式定义一组新模型类:UserBaseUserCreateUser

以上创建的Pydantic模型,可以通过不同的数据模型读取来自API接口的数据。比如创建Book数据类之前,还不知道字段id的值,所以使用BookCreate模型类接收数据,但是从数据库中读取一个已经存在的Book表数据时,就可以获取到字段id的值,所以使用Book模型。Book模型类和User模型类都定义了内部Config类,其中的配置项orm_mode=True,意思是开启ORM模式,它的作用是让Pydantic模型也可以从任意类型的ORM模型读取数据。当这个配置项默认为False的时候,只能从字典中读取数据,不能从ORM模型读取数据。

实现数据操作

crud.py,示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 第一步,导入会话组件
from sqlalchemy.orm import Session
# 第二步,导入前两步定义的models和schemas模块
from . import models, schemas


# 第三步,读取数据的函数
# 读取单个用户
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()


# 通过email读取单个用户
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()


# 读取带分页的用户列表
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()


# 读取图书列表
def get_books(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Book).offset(skip).limit(limit).all()


# 第四步,创建数据的函数
# 创建一个用户
def create_user(db: Session, user: schemas.UserCreate):
# 模拟生成密码的过程,并没有真正生成加密值
fake_hashed_password = user.password + "notreallyhashed"
# 第一步,根据数据创建数据库模型的实例
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
# 第二步,将实例添加到会话
db.add(db_user)
# 第三步,提交会话
db.commit()
# 第四步,刷新实例,用于获取数据或者生成数据库中的ID
db.refresh(db_user)
return db_user


# 创建用户相关的一本图书
def create_user_book(db: Session, book: schemas.BookCreate, user_id: int):
db_row = models.Book(**book.dict(), owner_id=user_id)
db.add(db_row)
db.commit()
db.refresh(db_row)
return db_row


# 更新图书的标题
def update_book_title(db: Session, book: schemas.Book):
db.query(models.Book).filter(models.Book.id == book.id).update({"title": book.title})
db.commit()
return 1


# 删除图书
def delete_book(db: Session, book: schemas.Book):
res = db.query(models.Book).filter(models.Book.id == book.id).delete()
print(res)
db.commit()
return 1

create_user为例,分为以下几个步骤:

  1. 根据函数传入的数据模型schemas.UsersCreate的实例user中的数据,创建数据库模型的实例db_user
  2. 使用SQLAlchemy的增加数据方法,将数据保存到数据库中。
  3. 使用flush()方法把数据应用到数据库,并提交事务。
  4. 使用refresh()方法,从数据库中取回最新的数据,这一步是为了获取数据库中生成的id字段内容。
  5. 将保存完成的数据使用return关键字返回。

实现FastAPI请求函数

最后一步,使用FastAPI将以上功能整合起来,变成一个Web后端应用系统。
main.py内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from typing import List
import uvicorn
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
# 导入本程序的模块
from sql_app import crud, models, schemas
from sql_app.database import SessionLocal, engine

# 生成数据库中的表
models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# 创建依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()


# 定义路径操作函数,并注册路由路径 创建用户
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)


# 定义路径操作函数,并注册路由路径 获取用户列表
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users


# 定义路径操作函数,并注册路由路径 获取用户信息
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user


# 定义路径操作函数,并注册路由路径 创建用户相关的项目
@app.post("/users/{user_id}/books/", response_model=schemas.Book)
def create_book_for_user(
user_id: int, book: schemas.BookCreate, db: Session = Depends(get_db)
):
return crud.create_user_book(db=db, book=book, user_id=user_id)


# 定义路径操作函数,并注册路由路径 获取项目列表
@app.get("/books/", response_model=List[schemas.Book])
def read_books(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
books = crud.get_books(db, skip=skip, limit=limit)
return books


# 定义路径操作函数,并注册路由路径 修改图书标题
@app.put("/books/")
def update_book_title(book: schemas.Book, db: Session = Depends(get_db)):
return crud.update_book_title(db, book)


# 定义路径操作函数,并注册路由路径 删除图书
@app.delete("/books/")
def delete_book(book: schemas.Book, db: Session = Depends(get_db)):
return crud.delete_book(db, book)


if __name__ == '__main__':
uvicorn.run(app=app)

解释说明:定义路径操作函数时使用了def,没有使用async def。这是因为async def是定义异步函数的语法,但是SQLAlchemy框架的稳定版尚未支持异步数据模型,所以不能以异步的方式调用,只能用def的方式定义路径操作函数。

连接Redis

安装数据库驱动

安装Redis驱动,示例代码:

1
pip install redis

实现Redis中的数据操作

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from typing import Optional
from pydantic import BaseModel
import uvicorn
from fastapi import Depends, FastAPI
from redis import Redis, ConnectionPool
import json

app = FastAPI()


# 定义依赖函数,用于连接redis
def get_rdb():
pool = ConnectionPool(host='127.0.0.1', port=6379, )
rdb = Redis(connection_pool=pool)
try:
yield rdb
finally:
rdb.close()


# 定义数据模型
class Item(BaseModel):
title: str
description: Optional[str] = None


# 注册路由路径
@app.post('/item/', response_model=Item)
# 定义路径操作函数,定义参数item
# 定义依赖项数据库
# 将对象转换成JSON字符串并保存
# 获取集合
async def create_item(item: Item,
rdb: Redis = Depends(get_rdb)
):
obj = rdb.set('item_name', json.dumps(item.dict()))
return item


# 注册路由路径
@app.get('/item/', response_model=Item)
# 定义路径操作函数,定义依赖项数据库
# 获取集合
# 将数据库中的对象转换为dict并返回
async def get_item(rdb: Redis = Depends(get_rdb)):
obj = rdb.get('item_name')
return json.loads(obj)


if __name__ == '__main__':
uvicorn.run(app=app)

安全机制

OAuth2

OAuth2,一个关于令牌授权的开放网络规范,主要特点是在客户端与资源所有者之间,建立一个认证服务器。资源使用者不能直接访问资源服务器,而是登录到认证服务器,认证服务器发放"令牌";然后资源使用者携带"令牌"访问资源服务器,服务器根据"令牌"的权限范围和有效期,向资源使用者开放资源。

OAuth2

具体过程:

  • (A)客户端向从资源所有者请求授权。
  • (B)客户端收到授权许可,资源所有者给客户端颁发授权许可(比如授权码code)
  • (C)客户端与授权服务器进行身份认证并出示授权许可(比如授权码code)请求访问令牌。
  • (D)授权服务器验证客户端身份并验证授权许可,若有效则颁发访问令牌(accept token)。
  • (E)客户端从资源服务器请求受保护资源并出示访问令牌(accept token)进行身份验证。
  • (F)资源服务器验证访问令牌(accept token),若有效则满足该请求。

以上六个流程中,最关键的是(B),也就是资源提供者如何给资源使用者授权。

OAuth2定义了以下4种授权模式:

  1. 授权码模式
    从资源使用者的角度来看,需要先从资源提供者处申请授权码,再根据授权码从认证服务器处申请"令牌",这是最常用的授权模式,各种流行的开放平台都用的这个模式,比如百度开放平台、腾迅开放平台。
  2. 隐藏模式
    适用于纯前端应用,直接在前端请求中传递"令牌"。
  3. 密码模式
    资源使用者直接通过提供用户名和密码的方式申请令牌,一般适用于提供OAuth2认证的自身平台。
  4. 客户端凭证模式
    适用于后端应用服务之间的授权,通过交换凭证(应用ID,应用密钥)的方式,获取认证信息和"令牌",再使用"令牌"访问所需资源。

实现

FastAPI框架的security模块自带身份认证安全模块类OAuth2PasswordBearer。

在下文的代码中,使用参数tokenUrl="login"创建了依赖类OAuth2PasswordBearer的一个实例,然后在路径操作函数read_items中定义了依赖项,这样就把路径操作函数变成受保护的资源。也就是说访问该资源时,必须经过认证。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from fastapi import Depends, FastAPI
# 导入安全模块
from fastapi.security import OAuth2PasswordBearer
import uvicorn

app = FastAPI()
# 创建依赖类实例
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")


# 注册路由路径
@app.get("/items/")
# 定义路径操作函数
# 定义依赖项
async def read_items(token: str = Depends(oauth2_scheme)):
return {"token": token}


if __name__ == '__main__':
uvicorn.run(app=app)

注意:因为OAuth2使用表单方式提交数据,所以需要安装第三方库python-multipart

特别的,此时我们访问http://127.0.0.1:8000/docs,会发现有两个主要变化:

  1. 页面右上角多了一个"Authorize"按钮,按钮上有一个"锁"的图标,说明当前的服务已经启用了基于OAuth2的安全机制。
  2. API接口/items/的右侧也多了一把灰色的"锁"图标,说明此接口处于被安全机制保护的状态。

此时,我们请求接口的会,会收到未授权的响应。

Authorize

完整实现

上文介绍了添加OAuth2安全模式的方法,但要完成整个安全流程,还需要以下5个步骤:

  1. 创建数据库应用,并创建用户信息模型。
  2. 增加注册用户的功能,将用户信息存到数据库中。
  3. 根据登录信息,生成"令牌",并返回给前端。
  4. 增加用户登录功能,并验证有效性。
  5. 前端使用"令牌"访问后端服务器,获取当前登录用户数据。

auth

新建一个包auth,PyCharm会自动添加文件__init__.py,这个文件的作用是将auth目录定义为Python模块包。

auth

database.py

auth包下新建database.py

在下文的例子中,使用SQLAlchemy库连接SQLite数据库。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
# 第一步,导入SQLAlchemy库
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import declarative_base

# 第二步,创建数据连接引擎
engine = create_engine("sqlite:///./data.db", connect_args={"check_same_thread": False})
# 第三步,创建本地会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 第四步,创建数据模型基类
Base = declarative_base()

models.py

添加数据模型,新建models.py

在下文的例子中,定义了一个数据库模型UserInDB,其字段包括id、username、full_name、email、hashed_password等,用于保存用户注册信息。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
from sqlalchemy import Column, String, Integer

from .database import Base


# 定义用户表,继承自Base
class UserInDB(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True, index=True)
username = Column('username', String(50))
full_name = Column('full_name', String(50))
email = Column('email', String(100))
hashed_password = Column('hashed_password', String(64))

schemas.py

添加schemas.py

在下文的例子中,定义了两种模型:

  1. 与令牌数据相关的Token
  2. 与用户数据相关的UserBase、User和UserCreate。其中UserBase是用户数据的基类、User用于响应数据模型、UserCreate用于注册用户时的请求数据模型。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from pydantic import BaseModel
from typing import Optional


# 响应模型-令牌
class Token(BaseModel):
access_token: str
token_type: str


# 数据模型基类-用户信息
class UserBase(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None


# 数据模型,创建用户,继承自UserBase
class UserCreate(UserBase):
password: str


# 数据模型,用户,继承自UserBase
class User(UserBase):
class Config:
orm_mode = True

services.py

新建services.py,包含以下函数:

  1. get_user,用参数传入的用户名获取数据库中的相应的用户记录。
  2. create_user,将参数传入的用户数据保存到数据库中。
    使用了函数get_password_hash,该函数定义在utils.py中,作用是使用bcrypt算法计算字符串的哈希值。
  3. authenticate_user,获取参数传入的用户名和密码,验证其有效性。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from sqlalchemy.orm import Session
from . import models
from . import schemas
from .utils import get_password_hash, verify_password
from datetime import datetime, timedelta
from jose import jwt


# 获取单个用户
def get_user(db: Session, username: str):
return db.query(models.UserInDB).filter(models.UserInDB.username == username).first()


# 创建一个用户
def create_user(db: Session, user: schemas.UserCreate):
# 计算密码的哈希值
hashed_password = get_password_hash(user.password)
db_user = models.UserInDB(username=user.username,
hashed_password=hashed_password,
email=user.email,
full_name=user.full_name
)
# 第二步,将实例添加到会话
db.add(db_user)
# 第三步,提交会话
db.commit()
# 第四步,刷新实例,用于获取数据或者生成数据库中的ID
db.refresh(db_user)
return db_user


# 验证用户和密码
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user


# 使用命令获取SECRET_KEY:
# openssl rand -hex 32
# 密钥
SECRET_KEY = "0bb93eb8c00be764e8dc60b001091987bd50c41f18bd2fee1c6d8239f0b23048"
ALGORITHM = "HS256" # 算法
ACCESS_TOKEN_EXPIRE_MINUTES = 5 # 令牌有效期 5分钟


# 创建令牌,将用户名放进令牌
def create_token(data: dict):
to_encode = data.copy()
expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
expire = datetime.now() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt


# 解析令牌,返回用户名
def extract_token(token: str):
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload.get("username")

utils.py

以上代码包含两个函数,verify_password 用于验证密码,get_password_hash 用于获取密码明文的哈希值。代码中用到了 Python 第三方库,因此,需要使用pip3 工具安装第三方库, 安装方式是打开命令行终端,并输入以下命令:

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 【示例7.2】 第七章 第7.2节 utils.py
from passlib.context import CryptContext

_pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


# 验证密码
def verify_password(plain_password, hashed_password):
return _pwd_context.verify(plain_password, hashed_password)


# 生成密码
def get_password_hash(password):
return _pwd_context.hash(password)

main.py

新建main.py,内容如下:

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError
from sqlalchemy.orm import Session
from auth import schemas, services, database

# 创建安全模式-密码模式
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
# 创建应用实例
app = FastAPI()


# 创建依赖项
def get_db():
db = database.SessionLocal()
try:
yield db
finally:
db.close()


# 获取当前用户信息的依赖函数
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
invalid_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的用户任据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
username: str = services.extract_token(token)
if username is None:
raise invalid_exception
except JWTError:
raise invalid_exception
user = services.get_user(db, username=username)
if user is None:
raise invalid_exception
return user


# 登录的请求接口
@app.post("/login", response_model=schemas.Token)
async def login(
# 依赖项,登录表单
form: OAuth2PasswordRequestForm = Depends(),
# 依赖项,数据库会话
db: Session = Depends(get_db)
):
# 验证用户有效性
user = services.authenticate_user(db, form.username, form.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码无效",
headers={"WWW-Authenticate": "Bearer"},
)
# 发放令牌
access_token = services.create_token(data={"username": user.username})
# 返回令牌
return {"access_token": access_token, "token_type": "bearer"}


# 创建新用的接口
# 创建用户
@app.post("/user/create/", response_model=schemas.User)
async def create_user(user: schemas.UserCreate,
db: Session = Depends(get_db)):
db_user = services.get_user(db, user.username)
# 判断用户名是否存在
if db_user:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="用户名已存在",
# headers={"WWW-Authenticate": "Bearer"},
)
# 在数据库中创建用户
return services.create_user(db, user)


# 获取用户当前信息,安全模式
@app.get("/user/", response_model=schemas.User)
async def read_current_user(current_user: schemas.User = Depends(get_current_user)):
return current_user


# 获取其他信息, 安全模式
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
return {"item_id": "cool", }


if __name__ == '__main__':
# 生成数据库中的表
database.Base.metadata.create_all(bind=database.engine)
uvicorn.run(app=app)

创建用户的请求,示例代码:

1
2
3
4
5
6
7
8
9
10
curl -X 'POST' \
'http://127.0.0.1:8000/user/create/' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"username": "kaka",
"email": "i@m.kakawanyifan.com",
"full_name": "kakawanyifan",
"password": "123456"
}'

运行结果:

1
2
3
4
5
{
"username": "kaka",
"email": "i@m.kakawanyifan.com",
"full_name": "kakawanyifan"
}

登录的请求,示例代码:

1
2
3
4
5
curl -X 'POST' \
'http://127.0.0.1:8000/login' \
-H 'accept: application/json' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=&username=kaka&password=123456&scope=&client_id=&client_secret='

运行结果:

1
2
3
4
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6Imtha2EiLCJleHAiOjE3MDcxNjc3NjR9.PtQw4NaC-odokRZ7_NY8rQKdT36H3APSvBzHlwbz0Jg",
"token_type": "bearer"
}

获取用户信息的请求,示例代码:

1
2
3
4
curl -X 'GET' \
'http://127.0.0.1:8000/user/' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6Imtha2EiLCJleHAiOjE3MDcxNjc5NTd9.drPNmkXPfOAjD6NnAIRHuUKOUxVU6sObarPJ0KCUxD4'

运行结果:

1
2
3
4
5
{
"username": "kaka",
"email": "i@m.kakawanyifan.com",
"full_name": "kakawanyifan"
}

问题解决

如果我们在启动应用时,收到报错如下:

1
2
3
4
5
6
7
Traceback (most recent call last):
File "C:\Dev\f\main.py", line 4, in <module>
from jose import JWTError
File "C:\Dev\f\venv\lib\site-packages\jose.py", line 546
print decrypt(deserialize_compact(jwt), {'k':key},
^
SyntaxError: invalid syntax

原因是缺失包python-jose,pip install,安装一下。

应用事件监听

FastAPI提供了on_event装饰器,用于监听应用级别的事件。

在下文的例子中,在操作函数上方使用了装饰器@app.on_event("startup"),监听启动事件;在操作函数上方使用装饰器@app.on_event("shutdown"),监听停止事件。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from fastapi import FastAPI
import uvicorn

app = FastAPI()


def send_msg_manager(action):
print(f'通知管理员,XX主机的XX程序于XX时间{action}了')


@app.on_event("startup")
async def startup_event():
# raise Exception('ss')
send_msg_manager('启动')


@app.on_event("startup")
async def startup_event():
send_msg_manager('启动2')


@app.on_event("shutdown")
async def shutdown_event():
raise Exception(1)
send_msg_manager('关闭2')


@app.on_event("shutdown")
async def shutdown_event():
send_msg_manager('关闭')


@app.get("/")
async def read_items(item_id: str):
return "hello"


if __name__ == '__main__':
uvicorn.run(app)

运行结果:

1
2
3
4
5
6
7
C:\Dev\f\venv\Scripts\python.exe C:\Dev\f\main.py 
通知管理员,XX主机的XX程序于XX时间启动了
通知管理员,XX主机的XX程序于XX时间启动2了
INFO: Started server process [2688]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

FastAPI中还支持对于同一个事件,定义多个监听函数,例如上文在多个操作函数的上方使用装饰器@app.on_event("startup")

架构管理

管理子应用

我们可能会有多个子系统或子应用,负责不同的业务。比如会员子系统负责管理会员的信息,商品子系统用来管理商品的种类和数量信息,销售子系统用来管理商品的销售情况等,子系统之间以接口的形式相互通信。

FastAPI提供了一种方式,可以用一个主应用管理各个子应用,这个过程称为"挂载",通过app的mount()方法来实现。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from fastapi import FastAPI
import uvicorn

# 定义主应用
app = FastAPI()


@app.get("/app")
def read_main():
return {"message": "Hello"}


# 定义第一个子应用
catapp = FastAPI()


# 在第一个子应用中定义路由
@catapp.get("/hello")
def read_sub():
return {"message": "喵"}


# 在路径/cat下挂载子应用
app.mount("/cat", catapp)

# 定义第二个子应用
dogapp = FastAPI()


# 在第二个子应用中定义路由
@dogapp.get("/hello")
def read_sub():
return {"message": "汪"}


# 在路径/dog下挂载子应用
app.mount("/dog", dogapp)

if __name__ == '__main__':
uvicorn.run(app=app)

管理Flask应用

通过app.mount(),还可以管理Flask应用。

在下文的例子中,导入了Flask相关组件,并建了一段简单的Flask应用,然后使用中间件WSGIMiddleware,将Flask应用挂载到FastAPI的主应用的路径/flask上。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.wsgi import WSGIMiddleware

##### 开始 Flask App #####

from flask import Flask, escape, request

flaskapp = Flask(__name__)


@flaskapp.route("/")
def flask_main():
name = request.args.get("name", "Flask")
return f"Hello, {escape(name)} !"

##### 结束 Flask App #####


# 定义主应用
app = FastAPI()


# 定义路由
@app.get("/app")
def read_main():
return {"message": "Hello"}


# 挂载外部应用
app.mount("/flask", WSGIMiddleware(flaskapp))

if __name__ == '__main__':
uvicorn.run(app=app)

路由类

在上文的例子中,将路由和对应的路径操作函数写在了主文件中。但是,在真实的应用程序中,会有几十个甚至上百个路由,如果写在同一个代码文件中,会使应用程序的主文件变得十分复杂。
FastAPI提供了路由类APIRouter,用于解决这个问题。

路由类APIRouter和应用类FastAPI有相同的特性和相似的用法。

在下文的例子中,首先导入了路由类APIRouter,然后定义路由类APIRouter的实例,并指定了参数值,如下:

  1. prefix,路径前缀。
  2. tags,API文档中显示的标签名
  3. dependencies,依赖项列表
  4. responses,自定义响应。

然后在路径操作函数上,使用路由类的实例router替代的装饰器方法定义了路由/hello

这与使用FastAPI的实例app定义路由的方法是相同的,也可以用同样的方式定义路径参数、请求参数、请求体,以及使用依赖项、响应体等。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from fastapi import APIRouter

# 定义路由类的实例
router = APIRouter(
# 路由的路径前缀
prefix="/child",
# API文档中显示的名称
tags=["child"],
# 给当前路由类实例指定依赖项
dependencies=[],
# 自定义响应
responses={404: {"detail": "未找到项目"}},
)


# 使用router实例的装饰器定义路由
@router.get("/hello")
# 定义路径操作函数
async def hello(name: str):
return {"message": f"hello {name}"}

需要注意的是,上述代码并不能单独执行,因为必须使用应用类FastAPI的实例才能启动Web服务。

以上代码的使用方式是使用FastAPI的方法include_router,将路由类实例引用到应用中。示例代码:

1
2
3
4
5
6
7
8
9
from fastapi import FastAPI
# 导人路由所在的包
import code_demo

app = FastAPT()

# 定义应用实例
# 在应用中引用路由实例
app.include_router(code_demo.router)

这样一来,当FastAPI接收到以/child开头的请求地址后,会匹配到此路由实例,再匹配路由实例中定义的路径,找到路径操作函数。
通过这种方式,可以将主程序中的路由定义部分拆分成模块,将不同功能路径操作函数定义在相应的路由类实例中。

测试

TestClient

TestClient是FastAPI中的一套测试工具,基于requests库进行网络通信,支持Python中标准的pytest测试框架。

在使用TesiClient之前,首先要安装requestspytest

常规测试

在下文的例子中,比之前的FastAPI示例多了一些操作,如下:

  1. 导入TestClient库
  2. 创建TestClient库的实例,参数为FastAPI的实例app,表示本实例的测试目标是app。
  3. 定义了一个以test_开头的测试函数test_index。
    根据pytest的约定,测试函数必须以test_开头。
  4. 在测试函数test_index的实现代码中,使用client对象发起请求,将请求的返回值写到response对象,请求的路径是在应用中使用装饰器定义的路由地址
    test_index函数是没有用async定义,说明这是一个同步函数,这是因为TestClient仅支持使用同步函数进行测试。
  5. 使用了assert(断言)关键字验证response对象的属性。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from fastapi import FastAPI
from fastapi.testclient import TestClient
import uvicorn

app = FastAPI()

# 创建TestClient实例
client = TestClient(app)


# 注册路由路径
@app.get("/")
# 定义路径操作函数
async def index():
# 返回一个对象
return {"name": "kaka"}


# 定义测试函数
def test_index():
# 使用TestClient的实例发起请求,接收返回数据
response = client.get("/")
# 断言: 状态码
assert response.status_code == 200
# 断言:返回对象
assert response.json() == {"name": "kaka"}


if __name__ == '__main__':
uvicorn.run(app)

在PyCharm的底部工具栏找到Terminal并点击,打开命令行控制台,输人如下命令进行代码测试。示例代码:

1
pytest main.py

运行结果:

1
2
3
4
5
6
7
8
=================== test session starts ===================
platform win32 -- Python 3.6.8, pytest-7.0.1, pluggy-1.0.0
rootdir: C:\Dev\f
plugins: anyio-3.6.2
collected 1 item

main.py . [100%]
==================== 1 passed in 0.40s ====================

显示本次测试结果为1 passed in 0.40s。意味着通过1个测试,用时0.40秒。

分离测试代码

功能代码是为了完成功能,测试代码是为了验证功能代码。在实际项目中,需要把功能代码和测试代码分离成不同的文件,以便于管理。

功能代码文件,main.py,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from fastapi import FastAPI
import uvicorn

app = FastAPI()


# 定义路由
@app.get("/")
# 定义路径操作函数
async def index():
# 返回一个对象
return {"name": "kaka"}


if __name__ == '__main__':
uvicorn.run(app)

测试代码,main_test.py,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from fastapi.testclient import TestClient
import main

# 创建TestClient实例
client = TestClient(main.app)


# 定义测试方法
def test_index():
# 使用TestClient的实例发起请求,接收返回数据
response = client.get("/")
# 断言: 状态码
assert response.status_code == 200
# 断言:返回对象
assert response.json() == {"name": "kaka"}

执行命令pytest main_test.py,进行测试。

应用事件测试

在FastAPI通过@app.on_event("startup"),可以在应用启动前执行一些操作,这些操作也可以用TestClient进行测试。

在下文的例子中,定义了模拟数据data,在路径操作函数中通过路径参数name,获取data中的值,并返回获取到的数据。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from fastapi import FastAPI
import uvicorn

app = FastAPI()
# 应用中的模拟数据
data = {
"cat": "猫",
"dog": "狗"
}


# 注册路由法就很难,定义路径参数
@app.get("/{name}")
# 定义路径操作函数
async def index(name: str):
# 返回数据
return {"name": data[name]}


if __name__ == '__main__':
uvicorn.run(app)

在同级目录下创建文件main_test.py,定义应用启动事件和相应的事件函数,改变data的默认值。
在测试函数test_index()中,通过with TestClient(main.app)as client语句,创建了TestClient的实例,发起请求,将请求的返回值写到response对象,然后使用了assert,验证response对象的属性。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from fastapi.testclient import TestClient
import main


@main.app.on_event("startup")
async def startup_event():
main.data["cat"] = "小猫"
main.data["dog"] = "小狗"


# 定义测试方法
def test_index():
# 创建TestClient实例
with TestClient(main.app) as client:
# 使用TestClient的实例发起请求,接收返回数据
response = client.get("/cat")
# 断言: 状态码
assert response.status_code == 200
# 断言:返回对象
assert response.json() == {"name": "小猫"}

依赖项测试

在FastAPI程序中,可以使用依赖注人的方式调用其他模块或集成第三方服务。

在测试期间可能需要不同的依赖项,比如在程序中通过依赖的方式调用第三方短信平台发送短信,会产生费用,但在测试期间不需要每次都真正发送短信,只要确认此步骤已经执行即可。这时就要通过覆盖的方式,将发送短信的依赖项替换为消息输出的依赖项。

FastAPI提供了设置覆盖依赖性的属性dependency_overides,其类型是一个字典,字典的键是原来的依赖项,字典的值是替换后的依赖项。

在下文的例子中,定义了一个依赖注入函数,用于调用短信平台,发送短信,在路径操作函数中指定了该依赖注入函数为依赖项。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import FastAPI, Depends
import uvicorn

app = FastAPI()


# 定义依赖函数
async def sms_sender(text: str):
# 发送短信的代码
print(f'调用短信平台发送短信,内容为:{text}')
# 返回值
return f'成功发送内容:{text}'


# 注册路由路径,定义路径参数
@app.get("/sendsms")
# 定义路径操作函数
async def sendsms(sms=Depends(sms_sender)):
return {'data': sms}


if __name__ == '__main__':
uvicorn.run(app)

在同级目录下,新建main_test.py。定义一个依赖注入函数override_sms_sender,其作用是打印内容,不真正发送短信。然后使用app.dependency_overrides方法,将应用原有的依赖项sms_sender替換成override_sms_sender。通过这种方式,可以在测试期间,替换原有的依赖项,不真正发送短信,仅打印内容,而不影响程序中原有的功能。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from fastapi.testclient import TestClient
import main

# 创建TestClient实例
client = TestClient(main.app)


# 定义依赖函数,仅输出消息,不发送短信
async def override_sms_sender(text: str):
print(f'需发送短信内容为:{text},仅记录,未发送')
return f'成功发送内容:{text}'


main.app.dependency_overrides[main.sms_sender] = override_sms_sender


# 定义测试方法
def test_sendsms():
# 使用TestClient的实例发起请求,接收返回数据
response = client.get("/sendsms?text=验证码")
# 断言: 状态码
assert response.status_code == 200

在PyCharm的命令行控制台中,输入命令,进行测试。示例代码:

1
pytest main_test.py -s

运行结果:

1
2
3
4
5
6
7
8
==================== test session starts ====================
platform win32 -- Python 3.6.8, pytest-7.0.1, pluggy-1.0.0
rootdir: C:\Dev\f
plugins: anyio-3.6.2
collected 1 item

main_test.py 需发送短信内容为:验证码,仅记录,未发送
===================== 1 passed in 0.39s =====================

解释说明:命令中的参数-s,是为了输出控制台消息。

文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/10907
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板