update: 1.新增职业管理模块和角色职业关联 2.章节分析自动更新角色职业状态 3.优化章节生成的角色信息构建 4.批量生成强制开启同步分析 5.章节内容批量生成增加系统提示

This commit is contained in:
xiamuceer
2025-12-22 19:53:31 +08:00
parent 6886d903fe
commit b2dec41464
25 changed files with 4635 additions and 89 deletions
+341 -39
View File
@@ -14,6 +14,7 @@ from app.models.chapter import Chapter
from app.models.project import Project
from app.models.outline import Outline
from app.models.character import Character
from app.models.career import Career, CharacterCareer
from app.models.generation_history import GenerationHistory
from app.models.writing_style import WritingStyle
from app.models.analysis_task import AnalysisTask
@@ -665,6 +666,114 @@ async def build_smart_chapter_context(
return context_parts
async def build_characters_info_with_careers(
db: AsyncSession,
project_id: str,
characters: list[Character],
filter_character_names: Optional[list[str]] = None
) -> str:
"""
构建包含职业信息的角色上下文
Args:
db: 数据库会话
project_id: 项目ID
characters: 角色列表
filter_character_names: 可选,筛选特定角色名称列表(用于1-1模式的structure.characters或1-n模式的expansion_plan.character_focus
Returns:
格式化的角色信息字符串,包含职业信息
"""
if not characters:
return '暂无角色信息'
# 如果提供了筛选名单,只保留匹配的角色
if filter_character_names:
filtered_characters = [c for c in characters if c.name in filter_character_names]
if not filtered_characters:
logger.warning(f"筛选后无匹配角色,使用全部角色。筛选名单: {filter_character_names}")
filtered_characters = characters
else:
logger.info(f"根据筛选名单保留 {len(filtered_characters)}/{len(characters)} 个角色: {[c.name for c in filtered_characters]}")
characters = filtered_characters
# 获取所有职业信息(一次性查询,提高效率)
careers_result = await db.execute(
select(Career).where(Career.project_id == project_id)
)
careers_map = {c.id: c for c in careers_result.scalars().all()}
# 获取所有角色的职业关联(一次性查询)
character_ids = [c.id for c in characters]
if not character_ids:
return '暂无角色信息'
character_careers_result = await db.execute(
select(CharacterCareer).where(CharacterCareer.character_id.in_(character_ids))
)
character_careers = character_careers_result.scalars().all()
# 构建角色ID到职业信息的映射
char_career_map = {}
for cc in character_careers:
if cc.character_id not in char_career_map:
char_career_map[cc.character_id] = {'main': None, 'sub': []}
career = careers_map.get(cc.career_id)
if not career:
continue
career_info = {
'name': career.name,
'stage': cc.current_stage,
'max_stage': career.max_stage,
'stage_progress': cc.stage_progress
}
if cc.career_type == 'main':
char_career_map[cc.character_id]['main'] = career_info
else:
char_career_map[cc.character_id]['sub'].append(career_info)
# 构建角色信息字符串
characters_info_parts = []
for c in characters:
# 基本信息
entity_type = '组织' if c.is_organization else '角色'
base_info = f"- {c.name}({entity_type}, {c.role_type})"
# 职业信息
career_info_str = ""
if c.id in char_career_map:
career_data = char_career_map[c.id]
# 主职业
if career_data['main']:
main = career_data['main']
stage_desc = f"{main['stage']}/{main['max_stage']}"
career_info_str += f" | 主职业: {main['name']}({stage_desc})"
# 副职业
if career_data['sub']:
sub_list = []
for sub in career_data['sub']:
stage_desc = f"{sub['stage']}/{sub['max_stage']}"
sub_list.append(f"{sub['name']}({stage_desc})")
career_info_str += f" | 副职业: {', '.join(sub_list)}"
# 性格描述
personality_str = ""
if c.personality:
personality_preview = c.personality[:100] if len(c.personality) > 100 else c.personality
personality_str = f": {personality_preview}"
# 组合完整信息
full_info = base_info + career_info_str + personality_str
characters_info_parts.append(full_info)
return "\n".join(characters_info_parts)
@router.get("/{chapter_id}/can-generate", summary="检查章节是否可以生成")
async def check_can_generate(
chapter_id: str,
@@ -716,7 +825,7 @@ async def analyze_chapter_background(
project_id: str,
task_id: str,
ai_service: AIService
):
) -> bool:
"""
后台异步分析章节(支持并发,使用锁保护数据库写入)
@@ -726,6 +835,9 @@ async def analyze_chapter_background(
project_id: 项目ID
task_id: 任务ID
ai_service: AI服务实例
Returns:
bool: True表示分析成功,False表示分析失败
"""
db_session = None
write_lock = await get_db_write_lock(user_id)
@@ -942,6 +1054,37 @@ async def analyze_chapter_background(
)
logger.info(f"✅ 添加{added_count}条记忆到向量库")
# 💼 更新角色职业(根据分析结果)
if analysis_result.get('character_states'):
try:
from app.services.career_update_service import CareerUpdateService
logger.info(f"💼 开始根据分析结果更新角色职业...")
career_update_result = await CareerUpdateService.update_careers_from_analysis(
db=db_session,
project_id=project_id,
character_states=analysis_result.get('character_states', []),
chapter_id=chapter_id,
chapter_number=chapter.chapter_number
)
if career_update_result['updated_count'] > 0:
logger.info(
f"✅ 更新了 {career_update_result['updated_count']} 个角色的职业信息: "
f"{', '.join(career_update_result['updated_characters'])}"
)
if career_update_result['changes']:
for change in career_update_result['changes']:
logger.info(f" - {change}")
else:
logger.info("️ 本章节无角色职业变化")
except Exception as career_error:
# 职业更新失败不应影响整个分析流程
logger.error(f"⚠️ 更新角色职业失败: {str(career_error)}", exc_info=True)
else:
logger.debug("📋 分析结果中无角色状态信息,跳过职业更新")
# 最终更新任务状态(写操作,需要锁)- 增加重试机制
update_success = False
for retry in range(3):
@@ -965,6 +1108,9 @@ async def analyze_chapter_background(
if not update_success:
logger.warning(f"⚠️ 章节分析完成但状态更新失败: {chapter_id}")
# 返回成功状态
return True
except Exception as e:
logger.error(f"❌ 后台分析异常: {str(e)}", exc_info=True)
# 确保任务状态被更新为failed(写操作,需要锁)
@@ -995,6 +1141,10 @@ async def analyze_chapter_background(
await asyncio.sleep(0.1) # 短暂等待后重试
else:
logger.error(f"❌ 任务状态更新失败,已达到最大重试次数: {task_id}")
# 返回失败状态
return False
finally:
if db_session:
await db_session.close()
@@ -1108,15 +1258,41 @@ async def generate_chapter_content_stream(
for o in all_outlines
])
# 获取角色信息
# 获取角色信息(包含职业信息)
characters_result = await db_session.execute(
select(Character).where(Character.project_id == current_chapter.project_id)
)
characters = characters_result.scalars().all()
characters_info = "\n".join([
f"- {c.name}({'组织' if c.is_organization else '角色'}, {c.role_type}): {c.personality[:100] if c.personality else ''}"
for c in characters
])
# 📝 根据大纲模式智能筛选相关角色
filter_character_names = None
if outline_mode == 'one-to-one':
# 1-1模式:从outline.structure中提取characters字段
if outline and outline.structure:
try:
structure = json.loads(outline.structure)
filter_character_names = structure.get('characters', [])
if filter_character_names:
logger.info(f"📋 1-1模式:从structure提取角色列表 {filter_character_names}")
except json.JSONDecodeError:
logger.warning(f"⚠️ outline.structure解析失败,使用全部角色")
else:
# 1-n模式:从chapter.expansion_plan中提取character_focus字段
if current_chapter.expansion_plan:
try:
plan = json.loads(current_chapter.expansion_plan)
filter_character_names = plan.get('character_focus', [])
if filter_character_names:
logger.info(f"📋 1-n模式:从expansion_plan提取角色焦点 {filter_character_names}")
except json.JSONDecodeError:
logger.warning(f"⚠️ expansion_plan解析失败,使用全部角色")
characters_info = await build_characters_info_with_careers(
db=db_session,
project_id=current_chapter.project_id,
characters=characters,
filter_character_names=filter_character_names
)
# 获取写作风格
style_content = ""
@@ -2325,28 +2501,83 @@ async def execute_batch_generation_in_order(
if task.enable_analysis:
logger.info(f"🔍 开始同步分析章节: 第{chapter.chapter_number}")
async with write_lock:
analysis_task = AnalysisTask(
chapter_id=chapter_id,
user_id=user_id,
project_id=task.project_id,
status='pending',
progress=0
)
db_session.add(analysis_task)
await db_session.commit()
await db_session.refresh(analysis_task)
# 分析重试机制(最多3次)
analysis_retry_count = 0
analysis_success = False
last_analysis_error = None
# 同步执行分析(等待完成)
await analyze_chapter_background(
chapter_id=chapter_id,
user_id=user_id,
project_id=task.project_id,
task_id=analysis_task.id,
ai_service=ai_service
)
logger.info(f"✅ 章节分析完成: 第{chapter.chapter_number}")
while analysis_retry_count < 3 and not analysis_success:
try:
if analysis_retry_count > 0:
logger.info(f"🔄 重试分析章节 (第{analysis_retry_count}次): 第{chapter.chapter_number}")
async with write_lock:
analysis_task = AnalysisTask(
chapter_id=chapter_id,
user_id=user_id,
project_id=task.project_id,
status='pending',
progress=0
)
db_session.add(analysis_task)
await db_session.commit()
await db_session.refresh(analysis_task)
# 同步执行分析,直接使用返回值判断成功/失败
analysis_result = await analyze_chapter_background(
chapter_id=chapter_id,
user_id=user_id,
project_id=task.project_id,
task_id=analysis_task.id,
ai_service=ai_service
)
# 直接根据返回值判断
if not analysis_result:
last_analysis_error = "分析函数返回失败"
logger.error(f"❌ 章节分析失败: 第{chapter.chapter_number}")
raise Exception(f"章节分析失败")
# 分析成功
analysis_success = True
logger.info(f"✅ 章节分析成功: 第{chapter.chapter_number}")
except Exception as analysis_error:
last_analysis_error = str(analysis_error)
analysis_retry_count += 1
if analysis_retry_count < 3:
# 还有重试机会,等待后重试
wait_time = min(2 ** analysis_retry_count, 10)
logger.warning(f"⏳ 分析失败,等待 {wait_time} 秒后重试...")
await asyncio.sleep(wait_time)
else:
# 达到最大重试次数,必须终止整个批量任务
logger.error(f"❌ 章节分析失败,已达最大重试次数(3次): 第{chapter.chapter_number}")
# 记录失败信息
failed_info = {
'chapter_id': chapter_id,
'chapter_number': chapter.chapter_number,
'title': chapter.title,
'error': f"分析失败(重试3次): {last_analysis_error}",
'retry_count': 3
}
async with write_lock:
if task.failed_chapters is None:
task.failed_chapters = []
task.failed_chapters.append(failed_info)
# 标记任务失败并终止
task.status = 'failed'
task.error_message = f"{chapter.chapter_number}章分析失败(重试3次): {last_analysis_error}"[:500]
task.completed_at = datetime.now()
task.current_retry_count = 0
await db_session.commit()
logger.error(f"🛑 批量生成中断: 第{chapter.chapter_number}章分析失败")
return # 立即终止整个批量生成任务
# 标记成功
chapter_success = True
@@ -2361,7 +2592,8 @@ async def execute_batch_generation_in_order(
except Exception as e:
last_error = str(e)
logger.error(f"❌ 章节生成失败: {chapter.chapter_number if chapter else '?'}, 错误: {last_error}")
error_msg = f"{chapter.chapter_number if chapter else '?'}出错: {last_error}"
logger.error(f"{error_msg}")
retry_count += 1
@@ -2394,7 +2626,13 @@ async def execute_batch_generation_in_order(
task.current_retry_count = 0
await db_session.commit()
logger.error(f"🛑 批量生成终止于第{chapter.chapter_number}")
# ⚠️ 如果启用了同步分析,任何错误都应该中断任务
# 因为章节生成或分析失败会影响后续章节的职业更新和剧情连贯性
if task.enable_analysis:
logger.error(f"🛑 批量生成中断: 因启用同步分析,任何错误都会中断任务以确保职业信息和剧情连贯性")
else:
logger.error(f"🛑 批量生成终止于第{chapter.chapter_number}")
return
# 全部完成
@@ -2469,15 +2707,41 @@ async def generate_single_chapter_for_batch(
for o in all_outlines
])
# 获取角色信息
# 获取角色信息(包含职业信息)
characters_result = await db_session.execute(
select(Character).where(Character.project_id == chapter.project_id)
)
characters = characters_result.scalars().all()
characters_info = "\n".join([
f"- {c.name}({'组织' if c.is_organization else '角色'}, {c.role_type}): {c.personality[:100] if c.personality else ''}"
for c in characters
])
# 📝 根据大纲模式智能筛选相关角色(批量生成)
filter_character_names = None
if outline_mode == 'one-to-one':
# 1-1模式:从outline.structure中提取characters字段
if outline and outline.structure:
try:
structure = json.loads(outline.structure)
filter_character_names = structure.get('characters', [])
if filter_character_names:
logger.info(f"📋 批量生成 - 1-1模式:从structure提取角色列表 {filter_character_names}")
except json.JSONDecodeError:
logger.warning(f"⚠️ 批量生成 - outline.structure解析失败,使用全部角色")
else:
# 1-n模式:从chapter.expansion_plan中提取character_focus字段
if chapter.expansion_plan:
try:
plan = json.loads(chapter.expansion_plan)
filter_character_names = plan.get('character_focus', [])
if filter_character_names:
logger.info(f"📋 批量生成 - 1-n模式:从expansion_plan提取角色焦点 {filter_character_names}")
except json.JSONDecodeError:
logger.warning(f"⚠️ 批量生成 - expansion_plan解析失败,使用全部角色")
characters_info = await build_characters_info_with_careers(
db=db_session,
project_id=chapter.project_id,
characters=characters,
filter_character_names=filter_character_names
)
# 获取写作风格
style_content = ""
@@ -2721,12 +2985,53 @@ async def regenerate_chapter_stream(
)
project = project_result.scalar_one_or_none()
# 获取角色信息
# 获取角色信息(包含职业信息)
characters_result = await temp_db.execute(
select(Character).where(Character.project_id == chapter.project_id)
)
characters = characters_result.scalars().all()
# 📝 根据大纲模式智能筛选相关角色(重新生成)
outline_mode_result = await temp_db.execute(
select(Project.outline_mode).where(Project.id == chapter.project_id)
)
outline_mode = outline_mode_result.scalar_one_or_none() or 'one-to-many'
filter_character_names = None
if outline_mode == 'one-to-one':
# 1-1模式:从outline.structure中提取characters字段
outline_result_temp = await temp_db.execute(
select(Outline.structure)
.where(Outline.project_id == chapter.project_id)
.where(Outline.order_index == chapter.chapter_number)
)
outline_structure = outline_result_temp.scalar_one_or_none()
if outline_structure:
try:
structure = json.loads(outline_structure)
filter_character_names = structure.get('characters', [])
if filter_character_names:
logger.info(f"📋 重新生成 - 1-1模式:从structure提取角色列表 {filter_character_names}")
except json.JSONDecodeError:
logger.warning(f"⚠️ 重新生成 - outline.structure解析失败,使用全部角色")
else:
# 1-n模式:从chapter.expansion_plan中提取character_focus字段
if chapter.expansion_plan:
try:
plan = json.loads(chapter.expansion_plan)
filter_character_names = plan.get('character_focus', [])
if filter_character_names:
logger.info(f"📋 重新生成 - 1-n模式:从expansion_plan提取角色焦点 {filter_character_names}")
except json.JSONDecodeError:
logger.warning(f"⚠️ 重新生成 - expansion_plan解析失败,使用全部角色")
characters_info_with_careers = await build_characters_info_with_careers(
db=temp_db,
project_id=chapter.project_id,
characters=characters,
filter_character_names=filter_character_names
)
# 获取章节大纲
outline_result = await temp_db.execute(
select(Outline)
@@ -2779,10 +3084,7 @@ async def regenerate_chapter_stream(
'time_period': project.world_time_period if project else '未设定',
'location': project.world_location if project else '未设定',
'atmosphere': project.world_atmosphere if project else '未设定',
'characters_info': "\n".join([
f"- {c.name}({'组织' if c.is_organization else '角色'}, {c.role_type}): {c.personality[:100] if c.personality else ''}"
for c in characters
]) if characters else '暂无角色信息',
'characters_info': characters_info_with_careers,
'chapter_outline': outline.content if outline else chapter.summary or '暂无大纲',
'previous_context': '' # 可以后续扩展添加前置章节上下文
}