用FastAPI的Depends实现细粒度权限控制

张开发
2026/4/4 23:11:47 15 分钟阅读

分享文章

用FastAPI的Depends实现细粒度权限控制
第一部分只有JWT到底缺了啥先别急着写代码咱们捋一捋最常见的翻车现场。想象你的API是一个高档小区。JWT就像门禁卡——刷卡能进小区大门通过认证。但进门之后你能随便进别人家吗能去物业办公室调监控吗显然不行。你需要“房间钥匙”和“工作证”。很多初版代码长这样app.get(/tasks/{task_id}) def get_task(task_id: int, user: User Depends(get_current_user)): task db.query(Task).filter(Task.id task_id).first() return task # 危险没检查这个task是不是该用户的⚠️ 警告这段代码等于让张三拿着身份证就能进李四家拿东西。只要用户伪造或篡改了token里的user_id甚至只是猜到别人的ID数据就裸奔了。 第二部分细粒度权限到底“细”在哪儿咱们做权限通常分三个层级你可以对着看看自己项目到哪一步了①全局认证Authentication你是谁——JWT搞定。②角色权限RBAC你有什么身份——比如“管理员”能删除“普通用户”只能看。③数据权限ABAC/行级你对这个具体的数据有什么权限——比如“只能改自己创建的任务”或“状态为‘草稿’的文章才能编辑”。今天重点聊②和③因为这是最容易被忽略的“隐形漏洞”。️ 第三部分实战用Depends写一个“带脑子的”权限拦截器FastAPI的Depends简直是权限控制的瑞士军刀。它不仅能拿用户信息还能嵌套依赖、提前拦截请求比在路由函数里写一堆if判断优雅一万倍。 第一步给JWT加点“料”别再只存个user_id了生成token的时候把角色(role)和权限标识(permissions)塞进payload里后续查询会省很多事儿。# 登录时生成token def create_access_token(data: dict, expires_delta: timedelta None): to_encode data.copy() # 除了user_id把角色和权限列表放进去 to_encode.update({ role: user.role, perms: user.permissions # 比如 [task:create, task:delete_own] }) return jwt.encode(to_encode, SECRET_KEY, algorithmALGORITHM)⚙️ 第二步封装权限校验依赖核心干货这里咱们写一个“可配置”的权限依赖。它的工作流程是拿到token - 解析用户 - 检查角色 - 检查具体权限。只要不通过直接抛403路由函数根本不会执行。from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from typing import List, Optional oauth2_scheme OAuth2PasswordBearer(tokenUrltoken) # 模拟获取当前用户实际项目中会查DB或解码JWT async def get_current_user(token: str Depends(oauth2_scheme)): try: payload jwt.decode(token, SECRET_KEY, algorithms[ALGORITHM]) user_id: str payload.get(sub) role: str payload.get(role) perms: List payload.get(perms, []) if user_id is None: raise HTTPException(status_code401, detail无效凭证) return {id: user_id, role: role, permissions: perms} except jwt.PyJWTError: raise HTTPException(status_code401, detail无效凭证) # 重磅可组合的权限依赖 class PermissionChecker: def __init__(self, required_role: Optional[str] None, required_perm: Optional[str] None): self.required_role required_role self.required_perm required_perm def __call__(self, user: dict Depends(get_current_user)): # 角色检查RBAC if self.required_role and user.get(role) ! self.required_role: raise HTTPException( status_codestatus.HTTP_403_FORBIDDEN, detailf需要 {self.required_role} 角色 ) # 权限检查细粒度 if self.required_perm and self.required_perm not in user.get(permissions, []): raise HTTPException( status_codestatus.HTTP_403_FORBIDDEN, detailf缺少权限: {self.required_perm} ) return user # 通过检查把用户信息传给路由用 第三步在路由中使用清爽到飞起看看封装后的效果是不是比在函数里写一堆 if 舒服多了# 实例化不同的权限检查器 require_admin PermissionChecker(required_roleadmin) require_task_create PermissionChecker(required_permtask:create) require_task_delete_own PermissionChecker(required_permtask:delete_own) # 只有admin能删任何任务 app.delete(/tasks/{task_id}) async def delete_task_any(task_id: int, admin_user: dict Depends(require_admin)): # 业务逻辑直接删反正已经确认是admin return {msg: 任务已删除} # 用户只能删自己的任务这里只检查了“是否有删除自己任务的权限”具体数据还得再查 app.delete(/tasks/my/{task_id}) async def delete_my_task(task_id: int, user: dict Depends(require_task_delete_own)): task db.query(Task).filter(Task.id task_id).first() if task.owner_id ! user[id]: raise HTTPException(status_code403, detail只能删除自己的任务) db.delete(task) return {msg: 自己的任务已删除} 第四部分进阶玩法——数据级权限ABAC上面代码里还有个瑕疵require_task_delete_own只保证了“用户有删除自己任务的权限”但没保证“这个任务真的是他的”。这就是典型的数据级权限缺失。更优雅的做法是把“数据归属校验”也封装进依赖里。参考fastapi-permissions库的思路可以给每个数据模型定义一个“权限控制列表”ACLfrom fastapi_permissions import Allow, Authenticated class Task: def __acl__(self): return [ (Allow, Authenticated, view), (Allow, fuser:{self.owner_id}, edit), (Allow, role:admin, delete), ]然后在依赖里把当前用户的principals身份列表和资源的ACL进行比对。如果 “user:123” 不在允许列表里直接 403。这样就把权限逻辑和数据模型绑定了维护起来特别清晰。

更多文章