Files

398 lines
15 KiB
Python
Raw Permalink Normal View History

"""职业更新服务 - 根据章节分析自动更新角色职业信息"""
from typing import Dict, Any, List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.character import Character
from app.models.career import Career, CharacterCareer
from app.logger import get_logger
logger = get_logger(__name__)
class CareerUpdateService:
"""职业更新服务 - 根据章节分析结果自动更新角色职业"""
@staticmethod
async def update_careers_from_analysis(
db: AsyncSession,
project_id: str,
character_states: List[Dict[str, Any]],
chapter_id: str,
chapter_number: int
) -> Dict[str, Any]:
"""
根据章节分析结果更新角色职业
Args:
db: 数据库会话
project_id: 项目ID
character_states: 角色状态变化列表(来自PlotAnalysis
chapter_id: 章节ID
chapter_number: 章节编号
Returns:
更新结果字典,包含更新数量和变更日志
"""
if not character_states:
logger.info("📋 角色状态列表为空,跳过职业更新")
return {"updated_count": 0, "changes": []}
updated_count = 0
changes_log = []
logger.info(f"🔍 开始分析第{chapter_number}章的角色职业变化...")
for char_state in character_states:
char_name = char_state.get('character_name')
career_changes = char_state.get('career_changes', {})
# 如果没有职业变化信息,跳过
if not career_changes or not isinstance(career_changes, dict):
continue
# 检查是否有实质性的职业变化
main_stage_change = career_changes.get('main_career_stage_change', 0)
sub_career_changes = career_changes.get('sub_career_changes', [])
new_careers = career_changes.get('new_careers', [])
if main_stage_change == 0 and not sub_career_changes and not new_careers:
continue
logger.info(f" 👤 检测到角色 [{char_name}] 有职业变化")
# 1. 查询角色
char_result = await db.execute(
select(Character).where(
Character.name == char_name,
Character.project_id == project_id
)
)
character = char_result.scalar_one_or_none()
if not character:
logger.warning(f" ⚠️ 角色不存在: {char_name},跳过")
continue
# 2. 更新主职业阶段
if main_stage_change != 0 and character.main_career_id:
success = await CareerUpdateService._update_main_career_stage(
db=db,
character=character,
stage_change=main_stage_change,
chapter_number=chapter_number,
career_changes=career_changes,
changes_log=changes_log
)
if success:
updated_count += 1
# 3. 更新副职业(如果有)
if sub_career_changes and isinstance(sub_career_changes, list):
for sub_change in sub_career_changes:
success = await CareerUpdateService._update_sub_career_stage(
db=db,
character=character,
project_id=project_id,
sub_change=sub_change,
chapter_number=chapter_number,
changes_log=changes_log
)
if success:
updated_count += 1
# 4. 添加新职业(如果有)
if new_careers and isinstance(new_careers, list):
for new_career_name in new_careers:
success = await CareerUpdateService._add_new_career(
db=db,
character=character,
project_id=project_id,
career_name=new_career_name,
chapter_number=chapter_number,
changes_log=changes_log
)
if success:
updated_count += 1
# 提交所有更改
if updated_count > 0:
await db.commit()
logger.info(f"✅ 职业更新完成: 共更新了 {updated_count} 个角色的职业信息")
else:
logger.info("📋 本章没有角色职业变化")
return {
"updated_count": updated_count,
"changes": changes_log
}
@staticmethod
async def _update_main_career_stage(
db: AsyncSession,
character: Character,
stage_change: int,
chapter_number: int,
career_changes: Dict[str, Any],
changes_log: List[Dict[str, Any]]
) -> bool:
"""更新主职业阶段"""
try:
# 查询主职业关联
char_career_result = await db.execute(
select(CharacterCareer).where(
CharacterCareer.character_id == character.id,
CharacterCareer.career_type == 'main'
)
)
char_career = char_career_result.scalar_one_or_none()
if not char_career:
logger.warning(f" ⚠️ {character.name} 没有主职业关联记录")
return False
# 查询职业信息
career_result = await db.execute(
select(Career).where(Career.id == char_career.career_id)
)
career = career_result.scalar_one_or_none()
if not career:
logger.warning(f" ⚠️ 职业ID {char_career.career_id} 不存在")
return False
# 计算新阶段(不超过最大阶段,不低于1)
old_stage = char_career.current_stage
new_stage = min(max(1, old_stage + stage_change), career.max_stage)
# 如果没有实际变化,跳过
if new_stage == old_stage:
logger.info(f" 📊 {character.name}{career.name} 已达到边界,无法变更")
return False
# 更新CharacterCareer表
char_career.current_stage = new_stage
# 同步更新Character表的冗余字段
character.main_career_stage = new_stage
# 记录变更日志
change_desc = f"{'晋升' if stage_change > 0 else '降级'}"
breakthrough_desc = career_changes.get('career_breakthrough', '')
changes_log.append({
'character': character.name,
'career': career.name,
'career_type': 'main',
'old_stage': old_stage,
'new_stage': new_stage,
'change': stage_change,
'chapter': chapter_number,
'description': breakthrough_desc
})
logger.info(
f"{character.name} 的主职业 [{career.name}] "
f"{old_stage}阶 → {new_stage}阶 ({change_desc})"
)
if breakthrough_desc:
logger.info(f" 突破描述: {breakthrough_desc[:50]}...")
return True
except Exception as e:
logger.error(f" ❌ 更新主职业失败: {str(e)}")
return False
@staticmethod
async def _update_sub_career_stage(
db: AsyncSession,
character: Character,
project_id: str,
sub_change: Dict[str, Any],
chapter_number: int,
changes_log: List[Dict[str, Any]]
) -> bool:
"""更新副职业阶段"""
try:
career_name = sub_change.get('career_name')
stage_change = sub_change.get('stage_change', 0)
if not career_name or stage_change == 0:
return False
# 1. 查询职业(通过名称)
career_result = await db.execute(
select(Career).where(
Career.name == career_name,
Career.project_id == project_id,
Career.type == 'sub'
)
)
career = career_result.scalar_one_or_none()
if not career:
logger.warning(f" ⚠️ 副职业 [{career_name}] 不存在")
return False
# 2. 查询角色-职业关联
char_career_result = await db.execute(
select(CharacterCareer).where(
CharacterCareer.character_id == character.id,
CharacterCareer.career_id == career.id,
CharacterCareer.career_type == 'sub'
)
)
char_career = char_career_result.scalar_one_or_none()
if not char_career:
logger.warning(f" ⚠️ {character.name} 没有 [{career_name}] 副职业")
return False
# 3. 计算新阶段
old_stage = char_career.current_stage
new_stage = min(max(1, old_stage + stage_change), career.max_stage)
if new_stage == old_stage:
return False
# 4. 更新阶段
char_career.current_stage = new_stage
# 5. 同步更新Character表的sub_careers JSON字段
import json
sub_careers = json.loads(character.sub_careers) if character.sub_careers else []
for sc in sub_careers:
if sc.get('career_id') == career.id:
sc['stage'] = new_stage
break
character.sub_careers = json.dumps(sub_careers, ensure_ascii=False)
# 6. 记录变更
changes_log.append({
'character': character.name,
'career': career.name,
'career_type': 'sub',
'old_stage': old_stage,
'new_stage': new_stage,
'change': stage_change,
'chapter': chapter_number
})
logger.info(
f"{character.name} 的副职业 [{career.name}] "
f"{old_stage}阶 → {new_stage}"
)
return True
except Exception as e:
logger.error(f" ❌ 更新副职业失败: {str(e)}")
return False
@staticmethod
async def _add_new_career(
db: AsyncSession,
character: Character,
project_id: str,
career_name: str,
chapter_number: int,
changes_log: List[Dict[str, Any]]
) -> bool:
"""为角色添加新职业"""
try:
# 1. 查询职业
career_result = await db.execute(
select(Career).where(
Career.name == career_name,
Career.project_id == project_id
)
)
career = career_result.scalar_one_or_none()
if not career:
logger.warning(f" ⚠️ 职业 [{career_name}] 不存在,无法添加")
return False
# 2. 检查是否已存在
existing_result = await db.execute(
select(CharacterCareer).where(
CharacterCareer.character_id == character.id,
CharacterCareer.career_id == career.id
)
)
if existing_result.scalar_one_or_none():
logger.info(f" 📋 {character.name} 已拥有 [{career_name}],跳过")
return False
# 3. 根据职业类型添加
if career.type == 'main':
# 检查是否已有主职业
if character.main_career_id:
logger.warning(f" ⚠️ {character.name} 已有主职业,无法添加新主职业")
return False
# 添加主职业
import uuid
new_char_career = CharacterCareer(
id=str(uuid.uuid4()),
character_id=character.id,
career_id=career.id,
career_type='main',
current_stage=1
)
db.add(new_char_career)
# 更新Character表
character.main_career_id = career.id
character.main_career_stage = 1
logger.info(f"{character.name} 获得新主职业 [{career_name}]")
else: # sub职业
# 检查副职业数量(最多2个)
sub_count_result = await db.execute(
select(CharacterCareer).where(
CharacterCareer.character_id == character.id,
CharacterCareer.career_type == 'sub'
)
)
if len(sub_count_result.scalars().all()) >= 2:
logger.warning(f" ⚠️ {character.name} 的副职业已达上限(2个)")
return False
# 添加副职业
import uuid
new_char_career = CharacterCareer(
id=str(uuid.uuid4()),
character_id=character.id,
career_id=career.id,
career_type='sub',
current_stage=1
)
db.add(new_char_career)
# 更新Character表的sub_careers JSON
import json
sub_careers = json.loads(character.sub_careers) if character.sub_careers else []
sub_careers.append({
'career_id': career.id,
'stage': 1
})
character.sub_careers = json.dumps(sub_careers, ensure_ascii=False)
logger.info(f"{character.name} 获得新副职业 [{career_name}]")
# 记录变更
changes_log.append({
'character': character.name,
'career': career.name,
'career_type': career.type,
'action': 'new',
'chapter': chapter_number
})
return True
except Exception as e:
logger.error(f" ❌ 添加新职业失败: {str(e)}")
return False