Files
MuMuAINovel/backend/app/user_password.py
T

178 lines
5.8 KiB
Python
Raw Normal View History

"""
用户密码管理模块 - 使用数据库存储
"""
import asyncio
import hashlib
from typing import Optional
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from app.config import settings
class UserPasswordManager:
"""用户密码管理器 - 使用数据库存储(PostgreSQL共享库)"""
def __init__(self):
"""初始化密码管理器"""
pass
async def _get_session(self) -> AsyncSession:
"""获取数据库会话 - 使用共享的PostgreSQL引擎"""
from app.database import get_engine
# 使用共享的PostgreSQL引擎(user_id使用特殊标识)
engine = await get_engine("_global_users_")
session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
return session_maker()
def _hash_password(self, password: str) -> str:
"""密码哈希"""
return hashlib.sha256(password.encode()).hexdigest()
async def set_password(self, user_id: str, username: str, password: Optional[str] = None) -> str:
"""
设置用户密码
Args:
user_id: 用户ID
username: 用户名
password: 密码,如果为None则使用默认密码(username+@666
Returns:
实际使用的密码(明文,仅用于首次设置时返回给用户)
"""
from app.models.user import UserPassword as UserPasswordModel
# 如果没有提供密码,使用默认密码
actual_password = password if password else f"{username}@666"
async with await self._get_session() as session:
# 查询密码记录是否存在
result = await session.execute(
select(UserPasswordModel).where(UserPasswordModel.user_id == user_id)
)
pwd_record = result.scalar_one_or_none()
if pwd_record:
# 更新现有密码
pwd_record.username = username
pwd_record.password_hash = self._hash_password(actual_password)
pwd_record.has_custom_password = password is not None
pwd_record.updated_at = datetime.now()
else:
# 创建新密码记录
pwd_record = UserPasswordModel(
user_id=user_id,
username=username,
password_hash=self._hash_password(actual_password),
has_custom_password=password is not None,
created_at=datetime.now(),
updated_at=datetime.now()
)
session.add(pwd_record)
await session.commit()
return actual_password
async def verify_password(self, user_id: str, password: str) -> bool:
"""
验证用户密码
Args:
user_id: 用户ID
password: 待验证的密码
Returns:
是否验证通过
"""
from app.models.user import UserPassword as UserPasswordModel
async with await self._get_session() as session:
result = await session.execute(
select(UserPasswordModel).where(UserPasswordModel.user_id == user_id)
)
pwd_record = result.scalar_one_or_none()
if not pwd_record:
return False
password_hash = self._hash_password(password)
return pwd_record.password_hash == password_hash
async def has_password(self, user_id: str) -> bool:
"""
检查用户是否已设置密码
Args:
user_id: 用户ID
Returns:
是否已设置密码
"""
from app.models.user import UserPassword as UserPasswordModel
async with await self._get_session() as session:
result = await session.execute(
select(UserPasswordModel).where(UserPasswordModel.user_id == user_id)
)
pwd_record = result.scalar_one_or_none()
return pwd_record is not None
async def has_custom_password(self, user_id: str) -> bool:
"""
检查用户是否设置了自定义密码(非默认密码)
Args:
user_id: 用户ID
Returns:
是否使用自定义密码
"""
from app.models.user import UserPassword as UserPasswordModel
async with await self._get_session() as session:
result = await session.execute(
select(UserPasswordModel).where(UserPasswordModel.user_id == user_id)
)
pwd_record = result.scalar_one_or_none()
if not pwd_record:
return False
return pwd_record.has_custom_password
async def get_username(self, user_id: str) -> Optional[str]:
"""
获取用户名
Args:
user_id: 用户ID
Returns:
用户名,如果不存在返回None
"""
from app.models.user import UserPassword as UserPasswordModel
async with await self._get_session() as session:
result = await session.execute(
select(UserPasswordModel).where(UserPasswordModel.user_id == user_id)
)
pwd_record = result.scalar_one_or_none()
if not pwd_record:
return None
return pwd_record.username
# 全局密码管理器实例
password_manager = UserPasswordManager()