Files
DataClaw/backend/app/core/security.py
T

57 lines
2.1 KiB
Python
Raw Normal View History

2026-03-14 19:20:37 +08:00
from datetime import datetime, timedelta
from typing import Optional
2026-03-16 16:12:35 +08:00
from jose import jwt, JWTError
2026-03-14 19:20:37 +08:00
from passlib.context import CryptContext
2026-03-16 16:12:35 +08:00
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel
2026-03-14 19:20:37 +08:00
SECRET_KEY = "your-super-secret-key-for-dataclaw" # In production, use env variable
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 * 24 * 60 # 30 days
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
2026-03-16 16:12:35 +08:00
security = HTTPBearer()
class CurrentUser(BaseModel):
id: int
username: str
is_admin: bool = False
2026-03-14 19:20:37 +08:00
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
2026-03-16 16:12:35 +08:00
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> CurrentUser:
unauthorized = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise unauthorized
user_id = payload.get("id")
username = payload.get("sub")
is_admin = bool(payload.get("is_admin", False))
if user_id is None or username is None:
raise unauthorized
return CurrentUser(id=user_id, username=username, is_admin=is_admin)
def get_admin_user(current_user: CurrentUser = Depends(get_current_user)) -> CurrentUser:
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin permission required")
return current_user