189 lines
5.1 KiB
Python
189 lines
5.1 KiB
Python
"""
|
||
用户管理 API
|
||
"""
|
||
from fastapi import APIRouter, HTTPException, Request, Depends
|
||
from pydantic import BaseModel
|
||
from typing import List, Optional
|
||
from app.user_manager import user_manager, User
|
||
from app.user_password import password_manager
|
||
|
||
router = APIRouter(prefix="/users", tags=["用户管理"])
|
||
|
||
|
||
def require_login(request: Request):
|
||
"""依赖:要求用户已登录"""
|
||
if not hasattr(request.state, "user") or not request.state.user:
|
||
raise HTTPException(status_code=401, detail="需要登录")
|
||
return request.state.user
|
||
|
||
|
||
def require_admin(request: Request):
|
||
"""依赖:要求用户为管理员"""
|
||
user = require_login(request)
|
||
if not request.state.is_admin:
|
||
raise HTTPException(status_code=403, detail="需要管理员权限")
|
||
return user
|
||
|
||
|
||
class SetAdminRequest(BaseModel):
|
||
user_id: str
|
||
is_admin: bool
|
||
|
||
|
||
class ResetPasswordRequest(BaseModel):
|
||
user_id: str
|
||
new_password: Optional[str] = None # 如果为空则使用默认密码
|
||
|
||
|
||
@router.get("/current")
|
||
async def get_current_user(user: User = Depends(require_login)):
|
||
"""获取当前登录用户信息"""
|
||
return user.dict()
|
||
|
||
|
||
@router.get("", response_model=List[dict])
|
||
async def list_users(admin_user: User = Depends(require_admin)):
|
||
"""
|
||
获取所有用户列表(仅管理员)
|
||
"""
|
||
users = await user_manager.get_all_users()
|
||
return [user.dict() for user in users]
|
||
|
||
|
||
@router.post("/set-admin")
|
||
async def set_admin(
|
||
data: SetAdminRequest,
|
||
request: Request,
|
||
admin_user: User = Depends(require_admin)
|
||
):
|
||
"""
|
||
设置用户的管理员权限(仅管理员)
|
||
|
||
限制:
|
||
- 不能撤销自己的管理员权限
|
||
- 至少保留一个管理员
|
||
"""
|
||
# 检查是否尝试撤销自己的权限
|
||
if data.user_id == admin_user.user_id and not data.is_admin:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="不能撤销自己的管理员权限"
|
||
)
|
||
|
||
# 尝试设置管理员权限
|
||
success = await user_manager.set_admin(data.user_id, data.is_admin)
|
||
|
||
if not success:
|
||
if not data.is_admin:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="无法撤销管理员权限,至少需要保留一个管理员"
|
||
)
|
||
else:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail="用户不存在"
|
||
)
|
||
|
||
return {
|
||
"message": f"已{'授予' if data.is_admin else '撤销'}管理员权限",
|
||
"user_id": data.user_id,
|
||
"is_admin": data.is_admin
|
||
}
|
||
|
||
|
||
@router.delete("/{user_id}")
|
||
async def delete_user(
|
||
user_id: str,
|
||
admin_user: User = Depends(require_admin)
|
||
):
|
||
"""
|
||
删除用户(仅管理员)
|
||
|
||
限制:
|
||
- 不能删除管理员用户
|
||
"""
|
||
success = await user_manager.delete_user(user_id)
|
||
|
||
if not success:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="无法删除该用户(用户不存在或为管理员)"
|
||
)
|
||
|
||
return {
|
||
"message": "用户已删除",
|
||
"user_id": user_id
|
||
}
|
||
|
||
|
||
@router.get("/{user_id}")
|
||
async def get_user(
|
||
user_id: str,
|
||
admin_user: User = Depends(require_admin)
|
||
):
|
||
"""获取指定用户信息(仅管理员)"""
|
||
user = await user_manager.get_user(user_id)
|
||
|
||
if not user:
|
||
raise HTTPException(status_code=404, detail="用户不存在")
|
||
|
||
return user.dict()
|
||
|
||
|
||
@router.post("/reset-password")
|
||
async def reset_user_password(
|
||
data: ResetPasswordRequest,
|
||
admin_user: User = Depends(require_admin)
|
||
):
|
||
"""
|
||
重置用户密码(仅管理员)
|
||
|
||
如果提供了 new_password,则设置为指定密码
|
||
如果未提供 new_password,则重置为默认密码(username@666)
|
||
|
||
限制:
|
||
- 不能重置自己的密码(应该使用修改密码功能)
|
||
"""
|
||
# 检查是否尝试重置自己的密码
|
||
if data.user_id == admin_user.user_id:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="不能重置自己的密码,请使用修改密码功能"
|
||
)
|
||
|
||
# 检查目标用户是否存在
|
||
target_user = await user_manager.get_user(data.user_id)
|
||
if not target_user:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail="目标用户不存在"
|
||
)
|
||
|
||
# 重置密码
|
||
try:
|
||
actual_password = await password_manager.set_password(
|
||
target_user.user_id,
|
||
target_user.username,
|
||
data.new_password
|
||
)
|
||
|
||
# 如果使用了默认密码,返回密码供管理员告知用户
|
||
message = "密码重置成功"
|
||
response_data = {
|
||
"message": message,
|
||
"user_id": data.user_id,
|
||
"username": target_user.username
|
||
}
|
||
|
||
if not data.new_password:
|
||
response_data["default_password"] = actual_password
|
||
response_data["message"] = f"密码已重置为默认密码: {actual_password}"
|
||
|
||
return response_data
|
||
|
||
except Exception as e:
|
||
raise HTTPException(
|
||
status_code=500,
|
||
detail=f"重置密码失败: {str(e)}"
|
||
) |