Files
MuMuAINovel/backend/scripts/migrate_users_to_postgres.py
T

300 lines
9.7 KiB
Python
Raw Normal View History

"""
用户数据迁移脚本 - 从JSON文件迁移到PostgreSQL数据库
使用方法:
python migrate_users_to_postgres.py
python migrate_users_to_postgres.py --db-url postgresql+asyncpg://user:pass@localhost/dbname
"""
import asyncio
import json
import os
import sys
import argparse
from pathlib import Path
from datetime import datetime
# 添加项目根目录到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.config import settings, DATA_DIR
async def create_tables(engine):
"""创建用户相关表"""
from app.database import Base
from app.models.user import User, UserPassword
print("📋 创建数据库表...")
async with engine.begin() as conn:
# 只创建用户相关的表
await conn.run_sync(User.metadata.create_all)
await conn.run_sync(UserPassword.metadata.create_all)
print("✅ 表创建成功")
async def migrate_users(session):
"""迁移用户数据"""
from app.models.user import User as UserModel
users_file = DATA_DIR / "users.json"
if not users_file.exists():
print("️ 用户数据文件不存在,跳过迁移")
return 0
try:
with open(users_file, "r", encoding="utf-8") as f:
users_data = json.load(f)
if not users_data:
print("️ 用户数据为空,跳过迁移")
return 0
migrated_count = 0
for user_id, user_info in users_data.items():
try:
# 检查用户是否已存在
result = await session.execute(
select(UserModel).where(UserModel.user_id == user_id)
)
existing = result.scalar_one_or_none()
if existing:
print(f"️ 用户已存在,跳过: {user_info['username']} ({user_id})")
continue
# 创建用户记录
user = UserModel(
user_id=user_id,
username=user_info["username"],
display_name=user_info["display_name"],
avatar_url=user_info.get("avatar_url"),
trust_level=user_info.get("trust_level", 0),
is_admin=user_info.get("is_admin", False),
linuxdo_id=user_info["linuxdo_id"],
created_at=datetime.fromisoformat(user_info.get("created_at", datetime.now().isoformat())),
last_login=datetime.fromisoformat(user_info.get("last_login", datetime.now().isoformat()))
)
session.add(user)
migrated_count += 1
print(f"✅ 迁移用户: {user_info['username']} ({user_id})")
except Exception as e:
print(f"❌ 迁移用户 {user_id} 失败: {e}")
await session.commit()
print(f"\n✅ 用户数据迁移完成: {migrated_count}/{len(users_data)} 个用户")
return migrated_count
except Exception as e:
print(f"❌ 迁移用户数据失败: {e}")
await session.rollback()
return 0
async def migrate_passwords(session):
"""迁移密码数据"""
from app.models.user import UserPassword
passwords_file = DATA_DIR / "user_passwords.json"
if not passwords_file.exists():
print("️ 密码数据文件不存在,跳过迁移")
return 0
try:
with open(passwords_file, "r", encoding="utf-8") as f:
passwords_data = json.load(f)
if not passwords_data:
print("️ 密码数据为空,跳过迁移")
return 0
migrated_count = 0
for user_id, pwd_info in passwords_data.items():
try:
# 检查密码是否已存在
result = await session.execute(
select(UserPassword).where(UserPassword.user_id == user_id)
)
existing = result.scalar_one_or_none()
if existing:
print(f"️ 密码已存在,跳过: {pwd_info['username']} ({user_id})")
continue
# 创建密码记录
pwd_record = UserPassword(
user_id=user_id,
username=pwd_info["username"],
password_hash=pwd_info["password_hash"],
has_custom_password=pwd_info.get("has_custom_password", False),
created_at=datetime.now(),
updated_at=datetime.now()
)
session.add(pwd_record)
migrated_count += 1
print(f"✅ 迁移密码: {pwd_info['username']} ({user_id})")
except Exception as e:
print(f"❌ 迁移密码 {user_id} 失败: {e}")
await session.commit()
print(f"\n✅ 密码数据迁移完成: {migrated_count}/{len(passwords_data)} 个密码")
return migrated_count
except Exception as e:
print(f"❌ 迁移密码数据失败: {e}")
await session.rollback()
return 0
async def backup_json_files():
"""备份原始JSON文件"""
files_to_backup = ["users.json", "user_passwords.json", "admins.json"]
print("\n📦 备份原始文件...")
for filename in files_to_backup:
source = DATA_DIR / filename
if source.exists():
backup = DATA_DIR / f"{filename}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
import shutil
shutil.copy2(source, backup)
print(f"✅ 备份: {filename} -> {backup.name}")
async def main(db_url=None):
"""主函数
Args:
db_url: 可选的数据库URL,如果不提供则使用配置文件中的
"""
print("=" * 70)
print("用户数据迁移工具 - JSON 到 PostgreSQL")
print("=" * 70)
print()
# 确定使用的数据库URL
target_db_url = db_url if db_url else settings.database_url
# 检查数据库配置
if "postgresql" not in target_db_url:
print("❌ 错误: 未指定 PostgreSQL 数据库")
if not db_url:
print(f" 当前配置: {settings.database_url}")
print(" 请使用 --db-url 参数指定PostgreSQL数据库,或在 .env 中配置 DATABASE_URL")
else:
print(f" 提供的URL: {target_db_url}")
print()
print("示例:")
print(" python migrate_users_to_postgres.py --db-url postgresql+asyncpg://user:pass@localhost/dbname")
return
# 隐藏密码部分显示
display_url = target_db_url
if '@' in display_url:
parts = display_url.split('@')
if ':' in parts[0]:
user_part = parts[0].split(':')[0]
display_url = f"{user_part}:****@{parts[1]}"
print(f"📊 目标数据库: {display_url}")
print()
try:
# 创建数据库引擎
engine = create_async_engine(
target_db_url,
echo=False,
future=True,
pool_pre_ping=True,
)
# 创建表
await create_tables(engine)
print()
# 创建会话
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
# 迁移用户
print("📋 步骤 1/2: 迁移用户数据")
print("-" * 70)
async with async_session() as session:
user_count = await migrate_users(session)
print()
# 迁移密码
print("📋 步骤 2/2: 迁移密码数据")
print("-" * 70)
async with async_session() as session:
pwd_count = await migrate_passwords(session)
print()
# 备份原文件
await backup_json_files()
print()
# 总结
print("=" * 70)
print("迁移完成")
print("=" * 70)
print(f"✅ 用户: {user_count}")
print(f"✅ 密码: {pwd_count}")
print()
print("💡 提示:")
print(" - 原文件已备份(带时间戳)")
print(" - 可以安全删除 users.json 和 user_passwords.json")
print(" - 如需回滚,请从备份文件恢复")
print()
# 关闭引擎
await engine.dispose()
except Exception as e:
print(f"\n❌ 迁移过程出错: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
# 解析命令行参数
parser = argparse.ArgumentParser(
description="迁移用户数据从JSON到PostgreSQL数据库",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 使用 .env 配置的数据库
python migrate_users_to_postgres.py
# 指定数据库URL
python migrate_users_to_postgres.py --db-url postgresql+asyncpg://user:pass@localhost/dbname
# 使用环境变量
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db python migrate_users_to_postgres.py
"""
)
parser.add_argument(
"--db-url",
type=str,
help="PostgreSQL数据库连接URL (格式: postgresql+asyncpg://user:password@host:port/database)",
default=None
)
args = parser.parse_args()
# 运行迁移
asyncio.run(main(db_url=args.db_url))