from datetime import datetime, timedelta from typing import Optional from jose import jwt, JWTError from passlib.context import CryptContext from fastapi import HTTPException, Depends, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from pydantic import BaseModel SECRET_KEY = "your-super-secret-key-for-dataclaw" # In production, use env variable ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 * 24 * 60 # 30 days pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") security = HTTPBearer() class CurrentUser(BaseModel): id: int username: str is_admin: bool = False def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> CurrentUser: unauthorized = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", ) try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: raise unauthorized user_id = payload.get("id") username = payload.get("sub") is_admin = bool(payload.get("is_admin", False)) if user_id is None or username is None: raise unauthorized return CurrentUser(id=user_id, username=username, is_admin=is_admin) def get_admin_user(current_user: CurrentUser = Depends(get_current_user)) -> CurrentUser: if not current_user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin permission required") return current_user