连接数据库
连接MySQL
整体步骤:
安装数据库驱动
创建项目并连接SQLAlchemy
建立SQLAlchemy数据库模型
建立Pydantic数据模型
实现数据操作
实现FastAPI请求函数七个步骤
安装数据库驱动
推荐使用PyMySQL
,安装命令:
创建项目并连接SQLAlchemy
其中__init__.py
的作用是将sql_app
目录定义为一个Python包,我们可以在这个文件中实现一些初始化工作,以保证程序正常运行。示例代码:
1 2 3 4 5 6 import pymysqlpymysql.install_as_MySQLdb()
其中database.py
的内容如下:
1 2 3 4 5 6 7 8 9 10 11 from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmakerengine = create_engine("root:Mysql%4013@127.0.0.1:3306/cat" ) SessionLocal = sessionmaker(autocommit=False , autoflush=False , bind=engine) Base = declarative_base()
创建SQLAlchemy数据库模型
SQLAlchemy通过数据模型基础类的继承实现数据库模型的定义,并通过ROM映射把数据库表结构生成到数据库系统中。
其中,models.py
文件用于定义ORM数据模型,主要用于生成数据库表,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from sqlalchemy import Boolean, Column, ForeignKey, Integer, Stringfrom sqlalchemy.orm import relationshipfrom .database import Baseclass User (Base) : __tablename__ = "user" id = Column(Integer, primary_key=True , index=True ) email = Column(String(50 ), unique=True , index=True ) hashed_password = Column(String(50 )) is_active = Column(Boolean, default=True ) books = relationship("Book" , back_populates="owner" ) class Book (Base) : __tablename__ = "book" id = Column(Integer, primary_key=True , index=True ) title = Column(String(50 ), index=True ) description = Column(String(200 ), index=True ) owner_id = Column(Integer, ForeignKey("user.id" )) owner = relationship("User" , back_populates="books" )
上文代码中,导入所需的SQLAlchemy
组件包,然后从文件databases.py
中导入Base类,作为数据库模型的基类,最后定义两个数据库模型:User
、Book
。在数据模型User
中,使用relationship
函数定义一个多对多关系books
,该关系指向另一个数据库模型Book
。在数据库模型Book
中也使用 relationship
函数定义一个多对一关系owner
,指向数据库模型User
。
创建Pydantic数据模型
用Pydantic实现的数据模型主要为了实现数据的读写操作,并提供API接口文档。为了避免将Pydantic模型与 SQLAlchemy模型混淆,这里将Pydantic模型定义写在文件schemas.py
中,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from typing import List, Optionalfrom pydantic import BaseModelclass BookBase (BaseModel) : title: str description: Optional[str] = None class BookCreate (BookBase) : pass class Book (BookBase) : id: int owner_id: int class Config : orm_mode = True class UserBase (BaseModel) : email: str class UserCreate (UserBase) : password: str class User (UserBase) : id: int is_active: bool books: List[Book] = [] class Config : orm_mode = True
在上文代码中,首先导入相关的模块,然后从BaseModel
类继承定义BookBase
模型类,在模型中定义所需的字段:title
和description
;再从 BookBase
模型类继承,分别定义BookCreate
和Book
模型类,在Book
模型类内部的Conifg
类中增加选项:orm_mode=True
。
再使用上述相同的方式定义一组新模型类:UserBase
、UserCreate
、User
。
以上创建的Pydantic模型,可以通过不同的数据模型读取来自API接口的数据。比如创建Book数据类之前,还不知道字段id的值,所以使用BookCreate模型类接收数据,但是从数据库中读取一个已经存在的Book表数据时,就可以获取到字段id的值,所以使用Book模型。Book模型类和User模型类都定义了内部Config类,其中的配置项orm_mode=True
,意思是开启ORM模式,它的作用是让Pydantic模型也可以从任意类型的ORM模型读取数据。当这个配置项默认为False的时候,只能从字典中读取数据,不能从ORM模型读取数据。
实现数据操作
crud.py
,示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 from sqlalchemy.orm import Sessionfrom . import models, schemasdef get_user (db: Session, user_id: int) : return db.query(models.User).filter(models.User.id == user_id).first() def get_user_by_email (db: Session, email: str) : return db.query(models.User).filter(models.User.email == email).first() def get_users (db: Session, skip: int = 0 , limit: int = 100 ) : return db.query(models.User).offset(skip).limit(limit).all() def get_books (db: Session, skip: int = 0 , limit: int = 100 ) : return db.query(models.Book).offset(skip).limit(limit).all() def create_user (db: Session, user: schemas.UserCreate) : fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() db.refresh(db_user) return db_user def create_user_book (db: Session, book: schemas.BookCreate, user_id: int) : db_row = models.Book(**book.dict(), owner_id=user_id) db.add(db_row) db.commit() db.refresh(db_row) return db_row def update_book_title (db: Session, book: schemas.Book) : db.query(models.Book).filter(models.Book.id == book.id).update({"title" : book.title}) db.commit() return 1 def delete_book (db: Session, book: schemas.Book) : res = db.query(models.Book).filter(models.Book.id == book.id).delete() print(res) db.commit() return 1
以create_user
为例,分为以下几个步骤:
根据函数传入的数据模型schemas.UsersCreate
的实例user
中的数据,创建数据库模型的实例db_user
。
使用SQLAlchemy的增加数据方法,将数据保存到数据库中。
使用flush()
方法把数据应用到数据库,并提交事务。
使用refresh()
方法,从数据库中取回最新的数据,这一步是为了获取数据库中生成的id字段内容。
将保存完成的数据使用return
关键字返回。
实现FastAPI请求函数
最后一步,使用FastAPI将以上功能整合起来,变成一个Web后端应用系统。
main.py
内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 from typing import Listimport uvicornfrom fastapi import Depends, FastAPI, HTTPExceptionfrom sqlalchemy.orm import Sessionfrom sql_app import crud, models, schemasfrom sql_app.database import SessionLocal, enginemodels.Base.metadata.create_all(bind=engine) app = FastAPI() def get_db () : db = SessionLocal() try : yield db finally : db.close() @app.post("/users/", response_model=schemas.User) def create_user (user: schemas.UserCreate, db: Session = Depends(get_db) ) : db_user = crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400 , detail="Email already registered" ) return crud.create_user(db=db, user=user) @app.get("/users/", response_model=List[schemas.User]) def read_users (skip: int = 0 , limit: int = 100 , db: Session = Depends(get_db) ) : users = crud.get_users(db, skip=skip, limit=limit) return users @app.get("/users/{user_id}", response_model=schemas.User) def read_user (user_id: int, db: Session = Depends(get_db) ) : db_user = crud.get_user(db, user_id=user_id) if db_user is None : raise HTTPException(status_code=404 , detail="User not found" ) return db_user @app.post("/users/{user_id}/books/", response_model=schemas.Book) def create_book_for_user ( user_id: int, book: schemas.BookCreate, db: Session = Depends(get_db) ) : return crud.create_user_book(db=db, book=book, user_id=user_id) @app.get("/books/", response_model=List[schemas.Book]) def read_books (skip: int = 0 , limit: int = 100 , db: Session = Depends(get_db) ) : books = crud.get_books(db, skip=skip, limit=limit) return books @app.put("/books/") def update_book_title (book: schemas.Book, db: Session = Depends(get_db) ) : return crud.update_book_title(db, book) @app.delete("/books/") def delete_book (book: schemas.Book, db: Session = Depends(get_db) ) : return crud.delete_book(db, book) if __name__ == '__main__' : uvicorn.run(app=app)
解释说明:定义路径操作函数时使用了def
,没有使用async def
。这是因为async def
是定义异步函数的语法,但是SQLAlchemy框架的稳定版尚未支持异步数据模型,所以不能以异步的方式调用,只能用def
的方式定义路径操作函数。
连接Redis
安装数据库驱动
安装Redis驱动,示例代码:
实现Redis中的数据操作
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 from typing import Optionalfrom pydantic import BaseModelimport uvicornfrom fastapi import Depends, FastAPIfrom redis import Redis, ConnectionPoolimport jsonapp = FastAPI() def get_rdb () : pool = ConnectionPool(host='127.0.0.1' , port=6379 , ) rdb = Redis(connection_pool=pool) try : yield rdb finally : rdb.close() class Item (BaseModel) : title: str description: Optional[str] = None @app.post('/item/', response_model=Item) async def create_item (item: Item, rdb: Redis = Depends(get_rdb) ) : obj = rdb.set('item_name' , json.dumps(item.dict())) return item @app.get('/item/', response_model=Item) async def get_item (rdb: Redis = Depends(get_rdb) ) : obj = rdb.get('item_name' ) return json.loads(obj) if __name__ == '__main__' : uvicorn.run(app=app)
安全机制
OAuth2
OAuth2,一个关于令牌授权的开放网络规范,主要特点是在客户端与资源所有者之间,建立一个认证服务器。资源使用者不能直接访问资源服务器,而是登录到认证服务器,认证服务器发放"令牌";然后资源使用者携带"令牌"访问资源服务器,服务器根据"令牌"的权限范围和有效期,向资源使用者开放资源。
具体过程:
(A)客户端向从资源所有者请求授权。
(B)客户端收到授权许可,资源所有者给客户端颁发授权许可(比如授权码code)
(C)客户端与授权服务器进行身份认证并出示授权许可(比如授权码code)请求访问令牌。
(D)授权服务器验证客户端身份并验证授权许可,若有效则颁发访问令牌(accept token)。
(E)客户端从资源服务器请求受保护资源并出示访问令牌(accept token)进行身份验证。
(F)资源服务器验证访问令牌(accept token),若有效则满足该请求。
以上六个流程中,最关键的是(B),也就是资源提供者如何给资源使用者授权。
OAuth2定义了以下4种授权模式:
授权码模式
从资源使用者的角度来看,需要先从资源提供者处申请授权码,再根据授权码从认证服务器处申请"令牌",这是最常用的授权模式,各种流行的开放平台都用的这个模式,比如百度开放平台、腾迅开放平台。
隐藏模式
适用于纯前端应用,直接在前端请求中传递"令牌"。
密码模式
资源使用者直接通过提供用户名和密码的方式申请令牌,一般适用于提供OAuth2认证的自身平台。
客户端凭证模式
适用于后端应用服务之间的授权,通过交换凭证(应用ID,应用密钥)的方式,获取认证信息和"令牌",再使用"令牌"访问所需资源。
实现
FastAPI框架的security模块自带身份认证安全模块类OAuth2PasswordBearer。
在下文的代码中,使用参数tokenUrl="login"
创建了依赖类OAuth2PasswordBearer的一个实例,然后在路径操作函数read_items
中定义了依赖项,这样就把路径操作函数变成受保护的资源。也就是说访问该资源时,必须经过认证。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from fastapi import Depends, FastAPIfrom fastapi.security import OAuth2PasswordBearerimport uvicornapp = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login" ) @app.get("/items/") async def read_items (token: str = Depends(oauth2_scheme) ) : return {"token" : token} if __name__ == '__main__' : uvicorn.run(app=app)
注意:因为OAuth2使用表单方式提交数据,所以需要安装第三方库python-multipart
。
特别的,此时我们访问http://127.0.0.1:8000/docs
,会发现有两个主要变化:
页面右上角多了一个"Authorize"按钮,按钮上有一个"锁"的图标,说明当前的服务已经启用了基于OAuth2的安全机制。
API接口/items/
的右侧也多了一把灰色的"锁"图标,说明此接口处于被安全机制保护的状态。
此时,我们请求接口的会,会收到未授权的响应。
完整实现
上文介绍了添加OAuth2安全模式的方法,但要完成整个安全流程,还需要以下5个步骤:
创建数据库应用,并创建用户信息模型。
增加注册用户的功能,将用户信息存到数据库中。
根据登录信息,生成"令牌",并返回给前端。
增加用户登录功能,并验证有效性。
前端使用"令牌"访问后端服务器,获取当前登录用户数据。
auth
新建一个包auth
,PyCharm会自动添加文件__init__.py
,这个文件的作用是将auth
目录定义为Python模块包。
在auth
包下新建database.py
。
在下文的例子中,使用SQLAlchemy库连接SQLite数据库。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 from sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy.orm import declarative_baseengine = create_engine("sqlite:///./data.db" , connect_args={"check_same_thread" : False }) SessionLocal = sessionmaker(autocommit=False , autoflush=False , bind=engine) Base = declarative_base()
添加数据模型,新建models.py
。
在下文的例子中,定义了一个数据库模型UserInDB,其字段包括id、username、full_name、email、hashed_password等,用于保存用户注册信息。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 from sqlalchemy import Column, String, Integerfrom .database import Baseclass UserInDB (Base) : __tablename__ = "user" id = Column(Integer, primary_key=True , index=True ) username = Column('username' , String(50 )) full_name = Column('full_name' , String(50 )) email = Column('email' , String(100 )) hashed_password = Column('hashed_password' , String(64 ))
添加schemas.py
。
在下文的例子中,定义了两种模型:
与令牌数据相关的Token
与用户数据相关的UserBase、User和UserCreate。其中UserBase是用户数据的基类、User用于响应数据模型、UserCreate用于注册用户时的请求数据模型。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from pydantic import BaseModelfrom typing import Optionalclass Token (BaseModel) : access_token: str token_type: str class UserBase (BaseModel) : username: str email: Optional[str] = None full_name: Optional[str] = None class UserCreate (UserBase) : password: str class User (UserBase) : class Config : orm_mode = True
新建services.py
,包含以下函数:
get_user
,用参数传入的用户名获取数据库中的相应的用户记录。
create_user
,将参数传入的用户数据保存到数据库中。
使用了函数get_password_hash
,该函数定义在utils.py
中,作用是使用bcrypt算法计算字符串的哈希值。
authenticate_user
,获取参数传入的用户名和密码,验证其有效性。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 from sqlalchemy.orm import Sessionfrom . import modelsfrom . import schemasfrom .utils import get_password_hash, verify_passwordfrom datetime import datetime, timedeltafrom jose import jwtdef get_user (db: Session, username: str) : return db.query(models.UserInDB).filter(models.UserInDB.username == username).first() def create_user (db: Session, user: schemas.UserCreate) : hashed_password = get_password_hash(user.password) db_user = models.UserInDB(username=user.username, hashed_password=hashed_password, email=user.email, full_name=user.full_name ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def authenticate_user (db, username: str, password: str) : user = get_user(db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user SECRET_KEY = "0bb93eb8c00be764e8dc60b001091987bd50c41f18bd2fee1c6d8239f0b23048" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 5 def create_token (data: dict) : to_encode = data.copy() expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) expire = datetime.now() + expires_delta to_encode.update({"exp" : expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def extract_token (token: str) : payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload.get("username" )
以上代码包含两个函数,verify_password 用于验证密码,get_password_hash 用于获取密码明文的哈希值。代码中用到了 Python 第三方库,因此,需要使用pip3 工具安装第三方库, 安装方式是打开命令行终端,并输入以下命令:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from passlib.context import CryptContext_pwd_context = CryptContext(schemes=["bcrypt" ], deprecated="auto" ) def verify_password (plain_password, hashed_password) : return _pwd_context.verify(plain_password, hashed_password) def get_password_hash (password) : return _pwd_context.hash(password)
新建main.py
,内容如下:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 import uvicornfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom jose import JWTErrorfrom sqlalchemy.orm import Sessionfrom auth import schemas, services, databaseoauth2_scheme = OAuth2PasswordBearer(tokenUrl="login" ) app = FastAPI() def get_db () : db = database.SessionLocal() try : yield db finally : db.close() async def get_current_user (token: str = Depends(oauth2_scheme) , db: Session = Depends(get_db) ) : invalid_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的用户任据" , headers={"WWW-Authenticate" : "Bearer" }, ) try : username: str = services.extract_token(token) if username is None : raise invalid_exception except JWTError: raise invalid_exception user = services.get_user(db, username=username) if user is None : raise invalid_exception return user @app.post("/login", response_model=schemas.Token) async def login ( form: OAuth2PasswordRequestForm = Depends() , db: Session = Depends(get_db) ) : user = services.authenticate_user(db, form.username, form.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码无效" , headers={"WWW-Authenticate" : "Bearer" }, ) access_token = services.create_token(data={"username" : user.username}) return {"access_token" : access_token, "token_type" : "bearer" } @app.post("/user/create/", response_model=schemas.User) async def create_user (user: schemas.UserCreate, db: Session = Depends(get_db) ) : db_user = services.get_user(db, user.username) if db_user: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="用户名已存在" , ) return services.create_user(db, user) @app.get("/user/", response_model=schemas.User) async def read_current_user (current_user: schemas.User = Depends(get_current_user) ) : return current_user @app.get("/items/") async def read_items (token: str = Depends(oauth2_scheme) ) : return {"item_id" : "cool" , } if __name__ == '__main__' : database.Base.metadata.create_all(bind=database.engine) uvicorn.run(app=app)
创建用户的请求,示例代码:
1 2 3 4 5 6 7 8 9 10 curl -X 'POST' \ 'http://127.0.0.1:8000/user/create/' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "username": "kaka", "email": "i@m.kakawanyifan.com", "full_name": "kakawanyifan", "password": "123456" }'
运行结果:
1 2 3 4 5 { "username": "kaka", "email": "i@m.kakawanyifan.com", "full_name": "kakawanyifan" }
登录的请求,示例代码:
1 2 3 4 5 curl -X 'POST' \ 'http://127.0.0.1:8000/login' \ -H 'accept: application/json' \ -H 'Content-Type: application/x-www-form-urlencoded' \ -d 'grant_type=&username=kaka&password=123456&scope=&client_id=&client_secret='
运行结果:
1 2 3 4 { "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6Imtha2EiLCJleHAiOjE3MDcxNjc3NjR9.PtQw4NaC-odokRZ7_NY8rQKdT36H3APSvBzHlwbz0Jg", "token_type": "bearer" }
获取用户信息的请求,示例代码:
1 2 3 4 curl -X 'GET' \ 'http://127.0.0.1:8000/user/' \ -H 'accept: application/json' \ -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6Imtha2EiLCJleHAiOjE3MDcxNjc5NTd9.drPNmkXPfOAjD6NnAIRHuUKOUxVU6sObarPJ0KCUxD4'
运行结果:
1 2 3 4 5 { "username": "kaka", "email": "i@m.kakawanyifan.com", "full_name": "kakawanyifan" }
问题解决
如果我们在启动应用时,收到报错如下:
1 2 3 4 5 6 7 Traceback (most recent call last): File "C:\Dev\f\main.py", line 4, in <module> from jose import JWTError File "C:\Dev\f\venv\lib\site-packages\jose.py", line 546 print decrypt(deserialize_compact(jwt), {'k':key}, ^ SyntaxError: invalid syntax
原因是缺失包python-jose
,pip install,安装一下。
应用事件监听
FastAPI提供了on_event
装饰器,用于监听应用级别的事件。
在下文的例子中,在操作函数上方使用了装饰器@app.on_event("startup")
,监听启动事件;在操作函数上方使用装饰器@app.on_event("shutdown")
,监听停止事件。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from fastapi import FastAPIimport uvicornapp = FastAPI() def send_msg_manager (action) : print(f'通知管理员,XX主机的XX程序于XX时间{action} 了' ) @app.on_event("startup") async def startup_event () : send_msg_manager('启动' ) @app.on_event("startup") async def startup_event () : send_msg_manager('启动2' ) @app.on_event("shutdown") async def shutdown_event () : raise Exception(1 ) send_msg_manager('关闭2' ) @app.on_event("shutdown") async def shutdown_event () : send_msg_manager('关闭' ) @app.get("/") async def read_items (item_id: str) : return "hello" if __name__ == '__main__' : uvicorn.run(app)
运行结果:
1 2 3 4 5 6 7 C:\Dev\f\venv\Scripts\python.exe C:\Dev\f\main.py 通知管理员,XX主机的XX程序于XX时间启动了 通知管理员,XX主机的XX程序于XX时间启动2了 INFO: Started server process [2688] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
FastAPI中还支持对于同一个事件,定义多个监听函数,例如上文在多个操作函数的上方使用装饰器@app.on_event("startup")
。
架构管理
管理子应用
我们可能会有多个子系统或子应用,负责不同的业务。比如会员子系统负责管理会员的信息,商品子系统用来管理商品的种类和数量信息,销售子系统用来管理商品的销售情况等,子系统之间以接口的形式相互通信。
FastAPI提供了一种方式,可以用一个主应用管理各个子应用,这个过程称为"挂载",通过app的mount()
方法来实现。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from fastapi import FastAPIimport uvicornapp = FastAPI() @app.get("/app") def read_main () : return {"message" : "Hello" } catapp = FastAPI() @catapp.get("/hello") def read_sub () : return {"message" : "喵" } app.mount("/cat" , catapp) dogapp = FastAPI() @dogapp.get("/hello") def read_sub () : return {"message" : "汪" } app.mount("/dog" , dogapp) if __name__ == '__main__' : uvicorn.run(app=app)
管理Flask应用
通过app.mount()
,还可以管理Flask应用。
在下文的例子中,导入了Flask相关组件,并建了一段简单的Flask应用,然后使用中间件WSGIMiddleware,将Flask应用挂载到FastAPI的主应用的路径/flask
上。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import uvicornfrom fastapi import FastAPIfrom fastapi.middleware.wsgi import WSGIMiddlewarefrom flask import Flask, escape, requestflaskapp = Flask(__name__) @flaskapp.route("/") def flask_main () : name = request.args.get("name" , "Flask" ) return f"Hello, {escape(name)} !" app = FastAPI() @app.get("/app") def read_main () : return {"message" : "Hello" } app.mount("/flask" , WSGIMiddleware(flaskapp)) if __name__ == '__main__' : uvicorn.run(app=app)
路由类
在上文的例子中,将路由和对应的路径操作函数写在了主文件中。但是,在真实的应用程序中,会有几十个甚至上百个路由,如果写在同一个代码文件中,会使应用程序的主文件变得十分复杂。
FastAPI提供了路由类APIRouter
,用于解决这个问题。
路由类APIRouter
和应用类FastAPI有相同的特性和相似的用法。
在下文的例子中,首先导入了路由类APIRouter,然后定义路由类APIRouter的实例,并指定了参数值,如下:
prefix,路径前缀。
tags,API文档中显示的标签名
dependencies,依赖项列表
responses,自定义响应。
然后在路径操作函数上,使用路由类的实例router替代的装饰器方法定义了路由/hello
。
这与使用FastAPI的实例app定义路由的方法是相同的,也可以用同样的方式定义路径参数、请求参数、请求体,以及使用依赖项、响应体等。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from fastapi import APIRouterrouter = APIRouter( prefix="/child" , tags=["child" ], dependencies=[], responses={404 : {"detail" : "未找到项目" }}, ) @router.get("/hello") async def hello (name: str) : return {"message" : f"hello {name} " }
需要注意的是,上述代码并不能单独执行,因为必须使用应用类FastAPI的实例才能启动Web服务。
以上代码的使用方式是使用FastAPI的方法include_router
,将路由类实例引用到应用中。示例代码:
1 2 3 4 5 6 7 8 9 from fastapi import FastAPIimport code_demoapp = FastAPT() app.include_router(code_demo.router)
这样一来,当FastAPI接收到以/child
开头的请求地址后,会匹配到此路由实例,再匹配路由实例中定义的路径,找到路径操作函数。
通过这种方式,可以将主程序中的路由定义部分拆分成模块,将不同功能路径操作函数定义在相应的路由类实例中。
测试
TestClient
TestClient是FastAPI中的一套测试工具,基于requests库进行网络通信,支持Python中标准的pytest测试框架。
在使用TesiClient之前,首先要安装requests
和pytest
。
常规测试
在下文的例子中,比之前的FastAPI示例多了一些操作,如下:
导入TestClient库
创建TestClient库的实例,参数为FastAPI的实例app,表示本实例的测试目标是app。
定义了一个以test_开头的测试函数test_index。
根据pytest的约定,测试函数必须以test_开头。
在测试函数test_index的实现代码中,使用client对象发起请求,将请求的返回值写到response对象,请求的路径是在应用中使用装饰器定义的路由地址
test_index函数是没有用async定义,说明这是一个同步函数,这是因为TestClient仅支持使用同步函数进行测试。
使用了assert(断言)关键字验证response对象的属性。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from fastapi import FastAPIfrom fastapi.testclient import TestClientimport uvicornapp = FastAPI() client = TestClient(app) @app.get("/") async def index () : return {"name" : "kaka" } def test_index () : response = client.get("/" ) assert response.status_code == 200 assert response.json() == {"name" : "kaka" } if __name__ == '__main__' : uvicorn.run(app)
在PyCharm的底部工具栏找到Terminal并点击,打开命令行控制台,输人如下命令进行代码测试。示例代码:
运行结果:
1 2 3 4 5 6 7 8 =================== test session starts =================== platform win32 -- Python 3.6.8, pytest-7.0.1, pluggy-1.0.0 rootdir: C:\Dev\f plugins: anyio-3.6.2 collected 1 item main.py . [100%] ==================== 1 passed in 0.40s ====================
显示本次测试结果为1 passed in 0.40s
。意味着通过1个测试,用时0.40秒。
分离测试代码
功能代码是为了完成功能,测试代码是为了验证功能代码。在实际项目中,需要把功能代码和测试代码分离成不同的文件,以便于管理。
功能代码文件,main.py ,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from fastapi import FastAPIimport uvicornapp = FastAPI() @app.get("/") async def index () : return {"name" : "kaka" } if __name__ == '__main__' : uvicorn.run(app)
测试代码,main_test.py,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from fastapi.testclient import TestClientimport mainclient = TestClient(main.app) def test_index () : response = client.get("/" ) assert response.status_code == 200 assert response.json() == {"name" : "kaka" }
执行命令pytest main_test.py
,进行测试。
应用事件测试
在FastAPI通过@app.on_event("startup")
,可以在应用启动前执行一些操作,这些操作也可以用TestClient进行测试。
在下文的例子中,定义了模拟数据data,在路径操作函数中通过路径参数name,获取data中的值,并返回获取到的数据。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from fastapi import FastAPIimport uvicornapp = FastAPI() data = { "cat" : "猫" , "dog" : "狗" } @app.get("/{name}") async def index (name: str) : return {"name" : data[name]} if __name__ == '__main__' : uvicorn.run(app)
在同级目录下创建文件main_test.py
,定义应用启动事件和相应的事件函数,改变data的默认值。
在测试函数test_index()
中,通过with TestClient(main.app)as client
语句,创建了TestClient的实例,发起请求,将请求的返回值写到response对象,然后使用了assert,验证response对象的属性。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from fastapi.testclient import TestClientimport main@main.app.on_event("startup") async def startup_event () : main.data["cat" ] = "小猫" main.data["dog" ] = "小狗" def test_index () : with TestClient(main.app) as client: response = client.get("/cat" ) assert response.status_code == 200 assert response.json() == {"name" : "小猫" }
依赖项测试
在FastAPI程序中,可以使用依赖注人的方式调用其他模块或集成第三方服务。
在测试期间可能需要不同的依赖项,比如在程序中通过依赖的方式调用第三方短信平台发送短信,会产生费用,但在测试期间不需要每次都真正发送短信,只要确认此步骤已经执行即可。这时就要通过覆盖的方式,将发送短信的依赖项替换为消息输出的依赖项。
FastAPI提供了设置覆盖依赖性的属性dependency_overides
,其类型是一个字典,字典的键是原来的依赖项,字典的值是替换后的依赖项。
在下文的例子中,定义了一个依赖注入函数,用于调用短信平台,发送短信,在路径操作函数中指定了该依赖注入函数为依赖项。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from fastapi import FastAPI, Dependsimport uvicornapp = FastAPI() async def sms_sender (text: str) : print(f'调用短信平台发送短信,内容为:{text} ' ) return f'成功发送内容:{text} ' @app.get("/sendsms") async def sendsms (sms=Depends(sms_sender) ) : return {'data' : sms} if __name__ == '__main__' : uvicorn.run(app)
在同级目录下,新建main_test.py
。定义一个依赖注入函数override_sms_sender
,其作用是打印内容,不真正发送短信。然后使用app.dependency_overrides
方法,将应用原有的依赖项sms_sender
替換成override_sms_sender
。通过这种方式,可以在测试期间,替换原有的依赖项,不真正发送短信,仅打印内容,而不影响程序中原有的功能。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from fastapi.testclient import TestClientimport mainclient = TestClient(main.app) async def override_sms_sender (text: str) : print(f'需发送短信内容为:{text} ,仅记录,未发送' ) return f'成功发送内容:{text} ' main.app.dependency_overrides[main.sms_sender] = override_sms_sender def test_sendsms () : response = client.get("/sendsms?text=验证码" ) assert response.status_code == 200
在PyCharm的命令行控制台中,输入命令,进行测试。示例代码:
运行结果:
1 2 3 4 5 6 7 8 ==================== test session starts ==================== platform win32 -- Python 3.6.8, pytest-7.0.1, pluggy-1.0.0 rootdir: C:\Dev\f plugins: anyio-3.6.2 collected 1 item main_test.py 需发送短信内容为:验证码,仅记录,未发送 ===================== 1 passed in 0.39s =====================
解释说明:命令中的参数-s
,是为了输出控制台消息。