# FastAPI_first **Repository Path**: su27sk/FastAPI_first ## Basic Information - **Project Name**: FastAPI_first - **Description**: 使用FastAPI开发web的练手demo - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-02-02 - **Last Updated**: 2026-02-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: 大模型 ## README # FastAPI入门案例 ## 1.启动案例 ```angular2html from fastapi import FastAPI # 创建FastAPI实例 app = FastAPI() @app.get("/") async def root(): return {"message": "Hello World666"} ``` ## 2.请求和相应 - 路径参数 ```angular2html @app.get("/book/{id}") async def get_book(id: int): return {"id":id,"title":f"这是第{id}本书"} ``` - Query请求 ```angular2html # 需求 查询新闻 -> 分页:skip:跳过的记录数,limit:返回的记录数 @app.get("/news/news_list") async def get_news_list(skip: int = Query(0,description="跳过的记录数",lt=100), limit:int = Query(10,description="返回的记录数")): return {"skip":skip,"limit":limit} ``` - 请求体请求 ```angular2html # 注册:用户名和密码 class User(BaseModel): username:str = Field(default="张三",min_length=2,max_length=10,description="用户名,长度2-10个字") password:str = Field(min_length=3,max_length=20,description="密码,长度3-20") @app.post("/register") async def register(user: User): return user ``` - 返回其他格式 ```angular2html # 返回html格式 @app.get("/html",response_class=HTMLResponse) async def get_html(): return "

这是一级标题

" # 返回文件数据格式 @app.get("/file") async def get_file(): path = "./files/1.jpeg" return FileResponse(path) ``` - 自定义返回格式 ```angular2html # 自定义返回格式 @app.get("/news{id}",response_model=News) async def get_news(id:int): return { "id":id, "title":f"这是第{id}本书", "content":"这是一本好书" } ``` - 定义异常 ```angular2html # 异常 @app.get("/news2/{id}") async def get_new(id:int): id_list = [1,2,3,4,5,6] if id not in id_list: raise HTTPException(status_code=404,detail="您查找的新闻不存在") return {"id":id} ``` ## 3.进阶 - 中间件 ```angular2html # 中间件 @app.middleware("http") async def middleware1(request,call_next): print("中间件1 start") response = await call_next(request) print("中间件1 end") return response @app.middleware("http") async def middleware2(request,call_next): print("中间件2 start") response = await call_next(request) print("中间件2 end") return response ``` - 依赖注入 ```angular2html # 依赖注入 # 1.定义依赖项 async def common_parameters( skip: int = Query(0,ge=0), limit: int = Query(10,le=60) ): return {"skip":skip,"limit":limit} @app.get("/news/news_list2") async def get_news_list(commons = Depends(common_parameters)): return commons @app.get("/user/user_list2") async def get_user_list(commons = Depends(common_parameters)): return commons ``` # FastAPI进阶 ## 需要的orm插件下载 ```angular2html pip install sqlalchemy[asyncio] pip install aiomysql ``` ## 配置模板 ```angular2html import datetime from fastapi import FastAPI, Path, Query, HTTPException,Depends from pydantic import BaseModel,Field from fastapi.responses import HTMLResponse,FileResponse from sqlalchemy import DateTime, String, Float from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker,AsyncSession from sqlalchemy.orm import DeclarativeBase,Mapped,mapped_column from sqlalchemy.sql import func,select # 1.创建异步引擎 ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/FastAPI_first?charset=utf8" async_engine = create_async_engine( ASYNC_DATABASE_URL, echo=True, pool_size=10, max_overflow=20, ) # 2. 创建类:基类和表对应的模型类 # 基类:创建时间 + 更新时间 class Base(DeclarativeBase): craete_time:Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now(),comment="创建时间") update_time:Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now(),onupdate=func.now(),comment="修改时间") # 书籍表:id,书名,作者,价格,出版社 class Book(Base): __tablename__ = "book" id:Mapped[int]=mapped_column(primary_key=True,comment="书籍id") bookname:Mapped[str]=mapped_column(String(255),comment="书名") author:Mapped[str]=mapped_column(String(255),comment="作者") price:Mapped[float]=mapped_column(Float,comment="价格") publisher:Mapped[str]=mapped_column(String(255),comment="出版社") # 3.建表 async def create_tables(): # 获取异步引擎创建表 async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) @app.on_event("startup") async def start_event(): await create_tables() AsyncSessionLocal = async_sessionmaker( bind=async_engine, #绑定数据库引擎 class_=AsyncSession, # 指定会话类型 expire_on_commit=False # 提交会话以后不过期,不会重新查询数据库 ) # 依赖项 async def get_database(): async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() # 有异常回滚 raise finally: await session.close() # 关闭会话 ``` ## 查询操作 ```angular2html @app.get("/book/books") async def get_book_list(db: AsyncSession = Depends(get_database)): # 查询 # result = await db.execute(select(Book)) # 返沪一个orm对象 # book = result.scalars().all() # 查询所有 # return book # result = await db.execute(select(Book)) # book = result.scalars().first() # return book book = await db.get(Book, 3) return book ``` ## 增删改 ```angular2html # 新增图书 class BookBase(BaseModel): id:int bookname:str author:str price:float publisher:str @app.post("/book/add_book") async def add_book(book:BookBase,db:AsyncSession = Depends(get_database)): book_obj = Book(**book.__dict__) db.add(book_obj) await db.commit() return book # 修改图书 class BookUpdate(BaseModel): bookname: str author: str price: float publisher: str @app.put("/book/update_book/{book_id}") async def update_book(book_id:int,date:BookUpdate,db:AsyncSession=Depends(get_database)): db_book = await db.get(Book,book_id) if db_book is None: raise HTTPException( status_code=404, detail="查无此书" ) # 重新赋值 db_book.bookname = date.bookname db_book.author = date.author db_book.price = date.price db_book.publisher = date.publisher await db.commit() return db_book # 删除图书 @app.delete("/book/delete_book/{book_id}") async def delete_book(book_id:int,db:AsyncSession=Depends(get_database)): # 先查再删 db_book = await db.get(Book,book_id) if db_book is None: raise HTTPException( status_code=404, detail="查无此书" ) await db.delete(db_book) await db.commit() return {"msg":"删除图书成功"} ```