# FastAPI_first
**Repository Path**: su27sk/FastAPI_first
## Basic Information
- **Project Name**: FastAPI_first
- **Description**: 使用FastAPI开发web的练手demo
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2026-02-02
- **Last Updated**: 2026-02-09
## Categories & Tags
**Categories**: Uncategorized
**Tags**: 大模型
## README
# FastAPI入门案例
## 1.启动案例
```angular2html
from fastapi import FastAPI
# 创建FastAPI实例
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World666"}
```
## 2.请求和相应
- 路径参数
```angular2html
@app.get("/book/{id}")
async def get_book(id: int):
return {"id":id,"title":f"这是第{id}本书"}
```
- Query请求
```angular2html
# 需求 查询新闻 -> 分页:skip:跳过的记录数,limit:返回的记录数
@app.get("/news/news_list")
async def get_news_list(skip: int = Query(0,description="跳过的记录数",lt=100),
limit:int = Query(10,description="返回的记录数")):
return {"skip":skip,"limit":limit}
```
- 请求体请求
```angular2html
# 注册:用户名和密码
class User(BaseModel):
username:str = Field(default="张三",min_length=2,max_length=10,description="用户名,长度2-10个字")
password:str = Field(min_length=3,max_length=20,description="密码,长度3-20")
@app.post("/register")
async def register(user: User):
return user
```
- 返回其他格式
```angular2html
# 返回html格式
@app.get("/html",response_class=HTMLResponse)
async def get_html():
return "
这是一级标题
"
# 返回文件数据格式
@app.get("/file")
async def get_file():
path = "./files/1.jpeg"
return FileResponse(path)
```
- 自定义返回格式
```angular2html
# 自定义返回格式
@app.get("/news{id}",response_model=News)
async def get_news(id:int):
return {
"id":id,
"title":f"这是第{id}本书",
"content":"这是一本好书"
}
```
- 定义异常
```angular2html
# 异常
@app.get("/news2/{id}")
async def get_new(id:int):
id_list = [1,2,3,4,5,6]
if id not in id_list:
raise HTTPException(status_code=404,detail="您查找的新闻不存在")
return {"id":id}
```
## 3.进阶
- 中间件
```angular2html
# 中间件
@app.middleware("http")
async def middleware1(request,call_next):
print("中间件1 start")
response = await call_next(request)
print("中间件1 end")
return response
@app.middleware("http")
async def middleware2(request,call_next):
print("中间件2 start")
response = await call_next(request)
print("中间件2 end")
return response
```
- 依赖注入
```angular2html
# 依赖注入
# 1.定义依赖项
async def common_parameters(
skip: int = Query(0,ge=0),
limit: int = Query(10,le=60)
):
return {"skip":skip,"limit":limit}
@app.get("/news/news_list2")
async def get_news_list(commons = Depends(common_parameters)):
return commons
@app.get("/user/user_list2")
async def get_user_list(commons = Depends(common_parameters)):
return commons
```
# FastAPI进阶
## 需要的orm插件下载
```angular2html
pip install sqlalchemy[asyncio]
pip install aiomysql
```
## 配置模板
```angular2html
import datetime
from fastapi import FastAPI, Path, Query, HTTPException,Depends
from pydantic import BaseModel,Field
from fastapi.responses import HTMLResponse,FileResponse
from sqlalchemy import DateTime, String, Float
from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker,AsyncSession
from sqlalchemy.orm import DeclarativeBase,Mapped,mapped_column
from sqlalchemy.sql import func,select
# 1.创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/FastAPI_first?charset=utf8"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True,
pool_size=10,
max_overflow=20,
)
# 2. 创建类:基类和表对应的模型类
# 基类:创建时间 + 更新时间
class Base(DeclarativeBase):
craete_time:Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now(),comment="创建时间")
update_time:Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now(),onupdate=func.now(),comment="修改时间")
# 书籍表:id,书名,作者,价格,出版社
class Book(Base):
__tablename__ = "book"
id:Mapped[int]=mapped_column(primary_key=True,comment="书籍id")
bookname:Mapped[str]=mapped_column(String(255),comment="书名")
author:Mapped[str]=mapped_column(String(255),comment="作者")
price:Mapped[float]=mapped_column(Float,comment="价格")
publisher:Mapped[str]=mapped_column(String(255),comment="出版社")
# 3.建表
async def create_tables():
# 获取异步引擎创建表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@app.on_event("startup")
async def start_event():
await create_tables()
AsyncSessionLocal = async_sessionmaker(
bind=async_engine, #绑定数据库引擎
class_=AsyncSession, # 指定会话类型
expire_on_commit=False # 提交会话以后不过期,不会重新查询数据库
)
# 依赖项
async def get_database():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback() # 有异常回滚
raise
finally:
await session.close() # 关闭会话
```
## 查询操作
```angular2html
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
# 查询
# result = await db.execute(select(Book)) # 返沪一个orm对象
# book = result.scalars().all() # 查询所有
# return book
# result = await db.execute(select(Book))
# book = result.scalars().first()
# return book
book = await db.get(Book, 3)
return book
```
## 增删改
```angular2html
# 新增图书
class BookBase(BaseModel):
id:int
bookname:str
author:str
price:float
publisher:str
@app.post("/book/add_book")
async def add_book(book:BookBase,db:AsyncSession = Depends(get_database)):
book_obj = Book(**book.__dict__)
db.add(book_obj)
await db.commit()
return book
# 修改图书
class BookUpdate(BaseModel):
bookname: str
author: str
price: float
publisher: str
@app.put("/book/update_book/{book_id}")
async def update_book(book_id:int,date:BookUpdate,db:AsyncSession=Depends(get_database)):
db_book = await db.get(Book,book_id)
if db_book is None:
raise HTTPException(
status_code=404,
detail="查无此书"
)
# 重新赋值
db_book.bookname = date.bookname
db_book.author = date.author
db_book.price = date.price
db_book.publisher = date.publisher
await db.commit()
return db_book
# 删除图书
@app.delete("/book/delete_book/{book_id}")
async def delete_book(book_id:int,db:AsyncSession=Depends(get_database)):
# 先查再删
db_book = await db.get(Book,book_id)
if db_book is None:
raise HTTPException(
status_code=404,
detail="查无此书"
)
await db.delete(db_book)
await db.commit()
return {"msg":"删除图书成功"}
```