"""章节管理API""" from fastapi import APIRouter, Depends, HTTPException, Request, Query, BackgroundTasks from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from sqlalchemy.orm import selectinload import json import asyncio from typing import Optional from datetime import datetime from asyncio import Queue, Lock from app.database import get_db from app.api.common import verify_project_access from app.services.chapter_context_service import ChapterContextBuilder, FocusedMemoryRetriever from app.models.chapter import Chapter from app.models.project import Project from app.models.outline import Outline from app.models.character import Character from app.models.career import Career, CharacterCareer from app.models.generation_history import GenerationHistory from app.models.writing_style import WritingStyle from app.models.analysis_task import AnalysisTask from app.models.memory import PlotAnalysis, StoryMemory from app.models.batch_generation_task import BatchGenerationTask from app.models.regeneration_task import RegenerationTask from app.schemas.chapter import ( ChapterCreate, ChapterUpdate, ChapterResponse, ChapterListResponse, ChapterGenerateRequest, BatchGenerateRequest, BatchGenerateResponse, BatchGenerateStatusResponse, ExpansionPlanUpdate, PartialRegenerateRequest ) from app.schemas.regeneration import ( ChapterRegenerateRequest, RegenerationTaskResponse, RegenerationTaskStatus ) from app.services.ai_service import AIService from app.services.prompt_service import prompt_service, PromptService, WritingStyleManager from app.services.plot_analyzer import PlotAnalyzer from app.services.memory_service import memory_service from app.services.foreshadow_service import foreshadow_service from app.services.chapter_regenerator import ChapterRegenerator from app.logger import get_logger from app.api.settings import get_user_ai_service from app.utils.sse_response import SSEResponse, create_sse_response router = APIRouter(prefix="/chapters", tags=["章节管理"]) logger = get_logger(__name__) # 全局数据库写入锁(每个用户一个锁,用于保护SQLite写入操作) db_write_locks: dict[str, Lock] = {} async def get_db_write_lock(user_id: str) -> Lock: """获取或创建用户的数据库写入锁""" if user_id not in db_write_locks: db_write_locks[user_id] = Lock() logger.debug(f"🔒 为用户 {user_id} 创建数据库写入锁") return db_write_locks[user_id] @router.post("", response_model=ChapterResponse, summary="创建章节") async def create_chapter( chapter: ChapterCreate, request: Request, db: AsyncSession = Depends(get_db) ): """创建新的章节""" # 验证用户权限和项目是否存在 user_id = getattr(request.state, 'user_id', None) project = await verify_project_access(chapter.project_id, user_id, db) # 计算字数(处理content可能为None的情况) word_count = len(chapter.content) if chapter.content else 0 db_chapter = Chapter( **chapter.model_dump(), word_count=word_count ) db.add(db_chapter) # 更新项目的当前字数 project.current_words = project.current_words + word_count await db.commit() await db.refresh(db_chapter) return db_chapter @router.get("/project/{project_id}", response_model=ChapterListResponse, summary="获取项目的所有章节") async def get_project_chapters( project_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """获取指定项目的所有章节(带大纲信息)""" # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(project_id, user_id, db) # 获取总数 count_result = await db.execute( select(func.count(Chapter.id)).where(Chapter.project_id == project_id) ) total = count_result.scalar_one() # 获取章节列表,同时加载关联的大纲信息 result = await db.execute( select(Chapter) .where(Chapter.project_id == project_id) .order_by(Chapter.chapter_number) ) chapters = result.scalars().all() # 获取所有大纲信息(用于填充outline_title) outline_ids = [ch.outline_id for ch in chapters if ch.outline_id] outlines_map = {} if outline_ids: outlines_result = await db.execute( select(Outline).where(Outline.id.in_(outline_ids)) ) outlines_map = {o.id: o for o in outlines_result.scalars().all()} # 为所有章节添加大纲信息(统一处理) chapters_with_outline = [] for chapter in chapters: chapter_dict = { "id": chapter.id, "project_id": chapter.project_id, "chapter_number": chapter.chapter_number, "title": chapter.title, "content": chapter.content, "summary": chapter.summary, "word_count": chapter.word_count, "status": chapter.status, "outline_id": chapter.outline_id, "sub_index": chapter.sub_index, "expansion_plan": chapter.expansion_plan, "created_at": chapter.created_at, "updated_at": chapter.updated_at, } # 添加大纲信息 if chapter.outline_id and chapter.outline_id in outlines_map: outline = outlines_map[chapter.outline_id] chapter_dict["outline_title"] = outline.title chapter_dict["outline_order"] = outline.order_index else: chapter_dict["outline_title"] = None chapter_dict["outline_order"] = None chapters_with_outline.append(chapter_dict) return ChapterListResponse(total=total, items=chapters_with_outline) @router.get("/{chapter_id}", response_model=ChapterResponse, summary="获取章节详情") async def get_chapter( chapter_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """根据ID获取章节详情""" result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(chapter.project_id, user_id, db) return chapter @router.get("/{chapter_id}/navigation", summary="获取章节导航信息") async def get_chapter_navigation( chapter_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """ 获取章节的导航信息(上一章/下一章) 用于章节阅读器的翻页功能 """ # 获取当前章节 result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) current_chapter = result.scalar_one_or_none() if not current_chapter: raise HTTPException(status_code=404, detail="章节不存在") # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(current_chapter.project_id, user_id, db) # 获取上一章 prev_result = await db.execute( select(Chapter) .where(Chapter.project_id == current_chapter.project_id) .where(Chapter.chapter_number < current_chapter.chapter_number) .order_by(Chapter.chapter_number.desc()) .limit(1) ) prev_chapter = prev_result.scalar_one_or_none() # 获取下一章 next_result = await db.execute( select(Chapter) .where(Chapter.project_id == current_chapter.project_id) .where(Chapter.chapter_number > current_chapter.chapter_number) .order_by(Chapter.chapter_number.asc()) .limit(1) ) next_chapter = next_result.scalar_one_or_none() return { "current": { "id": current_chapter.id, "chapter_number": current_chapter.chapter_number, "title": current_chapter.title }, "previous": { "id": prev_chapter.id, "chapter_number": prev_chapter.chapter_number, "title": prev_chapter.title } if prev_chapter else None, "next": { "id": next_chapter.id, "chapter_number": next_chapter.chapter_number, "title": next_chapter.title } if next_chapter else None } @router.put("/{chapter_id}", response_model=ChapterResponse, summary="更新章节") async def update_chapter( chapter_id: str, chapter_update: ChapterUpdate, request: Request, db: AsyncSession = Depends(get_db) ): """更新章节信息""" result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(chapter.project_id, user_id, db) # 记录旧字数 old_word_count = chapter.word_count or 0 # 更新字段 update_data = chapter_update.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(chapter, field, value) # 如果内容更新了,重新计算字数(包括清空内容的情况) if "content" in update_data: new_word_count = len(chapter.content) if chapter.content else 0 chapter.word_count = new_word_count # 更新项目字数 result = await db.execute( select(Project).where(Project.id == chapter.project_id) ) project = result.scalar_one_or_none() if project: project.current_words = project.current_words - old_word_count + new_word_count # 如果内容被清空,清理相关数据 if not chapter.content or chapter.content.strip() == "": chapter.status = "draft" # 清理分析任务 analysis_tasks_result = await db.execute( select(AnalysisTask).where(AnalysisTask.chapter_id == chapter_id) ) analysis_tasks = analysis_tasks_result.scalars().all() for task in analysis_tasks: await db.delete(task) # 清理分析结果 plot_analysis_result = await db.execute( select(PlotAnalysis).where(PlotAnalysis.chapter_id == chapter_id) ) plot_analyses = plot_analysis_result.scalars().all() for analysis in plot_analyses: await db.delete(analysis) # 清理故事记忆(关系数据库) story_memories_result = await db.execute( select(StoryMemory).where(StoryMemory.chapter_id == chapter_id) ) story_memories = story_memories_result.scalars().all() for memory in story_memories: await db.delete(memory) # 清理向量数据库中的记忆数据 try: await memory_service.delete_chapter_memories( user_id=user_id, project_id=chapter.project_id, chapter_id=chapter_id ) logger.info(f"✅ 已清理章节 {chapter_id[:8]} 的向量记忆数据") except Exception as e: logger.warning(f"⚠️ 清理向量记忆数据失败: {str(e)}") # 🔮 清理章节相关的分析伏笔数据 try: foreshadow_result = await foreshadow_service.delete_chapter_foreshadows( db=db, project_id=chapter.project_id, chapter_id=chapter_id, only_analysis_source=True # 只删除分析来源的伏笔,保留手动创建的 ) if foreshadow_result['deleted_count'] > 0: logger.info(f"🔮 已清理章节 {chapter_id[:8]} 的 {foreshadow_result['deleted_count']} 个伏笔数据") except Exception as e: logger.warning(f"⚠️ 清理伏笔数据失败: {str(e)}") logger.info(f"🗑️ 章节 {chapter_id[:8]} 内容已清空,已清理分析、记忆和伏笔数据") await db.commit() await db.refresh(chapter) chapter_dict = { "id": chapter.id, "project_id": chapter.project_id, "chapter_number": chapter.chapter_number, "title": chapter.title, "content": chapter.content, "summary": chapter.summary, "word_count": chapter.word_count, "status": chapter.status, "outline_id": chapter.outline_id, "sub_index": chapter.sub_index, "expansion_plan": chapter.expansion_plan, "created_at": chapter.created_at, "updated_at": chapter.updated_at, "outline_title": None, "outline_order": None } # 如果章节关联了大纲,查询大纲信息 if chapter.outline_id: outline_result = await db.execute( select(Outline).where(Outline.id == chapter.outline_id) ) outline = outline_result.scalar_one_or_none() if outline: chapter_dict["outline_title"] = outline.title chapter_dict["outline_order"] = outline.order_index return chapter_dict @router.delete("/{chapter_id}", summary="删除章节") async def delete_chapter( chapter_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """删除章节""" result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(chapter.project_id, user_id, db) # 更新项目字数 result = await db.execute( select(Project).where(Project.id == chapter.project_id) ) project = result.scalar_one_or_none() if project: # 处理 word_count 和 current_words 可能为 None 的情况 chapter_word_count = chapter.word_count or 0 project.current_words = max(0, (project.current_words or 0) - chapter_word_count) # 🗑️ 清理向量数据库中的记忆数据 try: await memory_service.delete_chapter_memories( user_id=user_id, project_id=chapter.project_id, chapter_id=chapter_id ) logger.info(f"✅ 已清理章节 {chapter_id[:8]} 的向量记忆数据") except Exception as e: logger.warning(f"⚠️ 清理向量记忆数据失败: {str(e)}") # 不阻断删除流程,继续执行 # 🔮 清理与该章节相关的伏笔数据(仅分析来源的伏笔) try: foreshadow_result = await foreshadow_service.delete_chapter_foreshadows( db=db, project_id=chapter.project_id, chapter_id=chapter_id, only_analysis_source=True # 只删除分析来源的伏笔,保留手动创建的 ) if foreshadow_result['deleted_count'] > 0: logger.info(f"🔮 已清理章节 {chapter_id[:8]} 的 {foreshadow_result['deleted_count']} 个伏笔数据") except Exception as e: logger.warning(f"⚠️ 清理伏笔数据失败: {str(e)}") # 不阻断删除流程,继续执行 # 删除章节(关系数据库中的记忆会被级联删除) await db.delete(chapter) await db.commit() return {"message": "章节删除成功"} async def check_prerequisites(db: AsyncSession, chapter: Chapter) -> tuple[bool, str, list[Chapter]]: """ 检查章节前置条件 Args: db: 数据库会话 chapter: 当前章节 Returns: (可否生成, 错误信息, 前置章节列表) """ # 如果是第一章,无需检查前置 if chapter.chapter_number == 1: return True, "", [] # 查询所有前置章节(序号小于当前章节的) result = await db.execute( select(Chapter) .where(Chapter.project_id == chapter.project_id) .where(Chapter.chapter_number < chapter.chapter_number) .order_by(Chapter.chapter_number) ) previous_chapters = result.scalars().all() # 检查是否所有前置章节都有内容 incomplete_chapters = [ ch for ch in previous_chapters if not ch.content or ch.content.strip() == "" ] if incomplete_chapters: missing_numbers = [str(ch.chapter_number) for ch in incomplete_chapters] error_msg = f"需要先完成前置章节:第 {', '.join(missing_numbers)} 章" return False, error_msg, previous_chapters return True, "", previous_chapters async def build_smart_chapter_context( db: AsyncSession, project_id: str, current_chapter_number: int, user_id: str ) -> dict: """ 智能构建章节生成上下文(支持海量章节场景) 策略: 1. 故事骨架:每50章采样1章(标题+摘要) 2. 相关历史:通过chapter_summary记忆语义检索15个最相关章节 3. 近期概要:最近30章的简要摘要(200字/章) 4. 最近完整:最近3章的完整内容 Args: db: 数据库会话 project_id: 项目ID current_chapter_number: 当前章节序号 user_id: 用户ID Returns: 包含各部分上下文的字典 """ context_parts = { 'story_skeleton': '', # 故事骨架 'relevant_history': '', # 相关历史章节 'recent_summary': '', # 近期概要 'recent_full': '', # 最近完整内容 'stats': {} # 统计信息 } try: # 1. 获取所有已完成的前置章节(只取ID和序号) all_chapters_result = await db.execute( select(Chapter.id, Chapter.chapter_number, Chapter.title) .where(Chapter.project_id == project_id) .where(Chapter.chapter_number < current_chapter_number) .where(Chapter.content != None) .where(Chapter.content != "") .order_by(Chapter.chapter_number) ) all_chapters_info = all_chapters_result.all() total_previous = len(all_chapters_info) if total_previous == 0: logger.info("📚 这是第一章,无需构建前置上下文") return context_parts logger.info(f"📚 开始构建智能上下文:共{total_previous}章前置内容") # 2. 构建故事骨架(每50章采样) skeleton_chapters = [] if total_previous > 50: sample_interval = 50 skeleton_indices = list(range(0, total_previous, sample_interval)) for idx in skeleton_indices: chapter_info = all_chapters_info[idx] # 获取章节摘要(优先从chapter_summary记忆获取) summary_result = await db.execute( select(StoryMemory.content) .where(StoryMemory.project_id == project_id) .where(StoryMemory.chapter_id == chapter_info.id) .where(StoryMemory.memory_type == 'chapter_summary') .limit(1) ) summary_row = summary_result.scalar_one_or_none() summary = summary_row if summary_row else "(无摘要)" skeleton_chapters.append({ 'number': chapter_info.chapter_number, 'title': chapter_info.title, 'summary': summary }) context_parts['story_skeleton'] = "【故事骨架】\n" + "\n".join([ f"第{ch['number']}章《{ch['title']}》:{ch['summary']}" for ch in skeleton_chapters ]) logger.info(f" ✅ 故事骨架:采样{len(skeleton_chapters)}章(每50章1个)") # 3. 语义检索相关历史章节(使用chapter_summary记忆) # 获取当前章节的大纲作为查询 current_outline_result = await db.execute( select(Outline.content) .where(Outline.project_id == project_id) .where(Outline.order_index == current_chapter_number) ) current_outline = current_outline_result.scalar_one_or_none() if current_outline and total_previous > 3: # 使用记忆服务进行语义检索 relevant_memories = await memory_service.search_memories( user_id=user_id, project_id=project_id, query=current_outline, memory_types=['chapter_summary'], limit=15, # 检索15个最相关的章节 min_importance=0.0 # 不过滤重要性,依赖语义相关度 ) if relevant_memories: relevant_chapters_text = [] for mem in relevant_memories: # 获取章节信息 chapter_result = await db.execute( select(Chapter.chapter_number, Chapter.title) .where(Chapter.id == mem['metadata'].get('chapter_id')) ) chapter_info = chapter_result.first() if chapter_info: relevant_chapters_text.append( f"第{chapter_info.chapter_number}章《{chapter_info.title}》:{mem['content']} " f"(相关度:{mem['similarity']:.2f})" ) context_parts['relevant_history'] = "【相关历史章节】\n" + "\n".join(relevant_chapters_text) logger.info(f" ✅ 相关历史:语义检索到{len(relevant_chapters_text)}章") # 4. 近期概要(最近30章,每章200字摘要) recent_summary_count = min(30, total_previous) recent_for_summary = all_chapters_info[-recent_summary_count:] if total_previous > 3 else [] if recent_for_summary and len(recent_for_summary) > 3: # 至少要有3章才做摘要 recent_summaries = [] for chapter_info in recent_for_summary[:-3]: # 排除最后3章(它们会完整展示) # 优先获取chapter_summary记忆 summary_result = await db.execute( select(StoryMemory.content) .where(StoryMemory.project_id == project_id) .where(StoryMemory.chapter_id == chapter_info.id) .where(StoryMemory.memory_type == 'chapter_summary') .limit(1) ) summary = summary_result.scalar_one_or_none() if summary: recent_summaries.append( f"第{chapter_info.chapter_number}章《{chapter_info.title}》:{summary}" ) if recent_summaries: context_parts['recent_summary'] = "【近期章节概要】\n" + "\n".join(recent_summaries) logger.info(f" ✅ 近期概要:{len(recent_summaries)}章摘要") # 5. 最近完整内容(最近3章) recent_full_count = min(3, total_previous) recent_full_chapters = all_chapters_info[-recent_full_count:] # 获取完整内容 recent_full_texts = [] for chapter_info in recent_full_chapters: chapter_result = await db.execute( select(Chapter.content) .where(Chapter.id == chapter_info.id) ) content = chapter_result.scalar_one_or_none() if content: recent_full_texts.append( f"=== 第{chapter_info.chapter_number}章:{chapter_info.title} ===\n{content}" ) context_parts['recent_full'] = "【最近章节完整内容】\n" + "\n\n".join(recent_full_texts) logger.info(f" ✅ 最近完整:{len(recent_full_texts)}章全文") # 6. 统计信息 context_parts['stats'] = { 'total_previous': total_previous, 'skeleton_samples': len(skeleton_chapters), 'relevant_history': len(relevant_memories) if current_outline and total_previous > 3 else 0, 'recent_summaries': len(recent_summaries) if recent_for_summary and len(recent_for_summary) > 3 else 0, 'recent_full': len(recent_full_texts) } # 计算总长度 total_length = sum([ len(context_parts['story_skeleton']), len(context_parts['relevant_history']), len(context_parts['recent_summary']), len(context_parts['recent_full']) ]) context_parts['stats']['total_length'] = total_length logger.info(f"📊 智能上下文构建完成:总长度 {total_length} 字符") except Exception as e: logger.error(f"❌ 构建智能上下文失败: {str(e)}", exc_info=True) return context_parts async def build_characters_info_with_careers( db: AsyncSession, project_id: str, characters: list[Character], filter_character_names: Optional[list[str]] = None ) -> str: """ 构建包含职业信息的角色上下文 Args: db: 数据库会话 project_id: 项目ID characters: 角色列表 filter_character_names: 可选,筛选特定角色名称列表(用于1-1模式的structure.characters或1-n模式的expansion_plan.character_focus) Returns: 格式化的角色信息字符串,包含职业信息 """ if not characters: return '暂无角色信息' # 如果提供了筛选名单,只保留匹配的角色 if filter_character_names: filtered_characters = [c for c in characters if c.name in filter_character_names] if not filtered_characters: logger.warning(f"筛选后无匹配角色,使用全部角色。筛选名单: {filter_character_names}") filtered_characters = characters else: logger.info(f"根据筛选名单保留 {len(filtered_characters)}/{len(characters)} 个角色: {[c.name for c in filtered_characters]}") characters = filtered_characters # 获取所有职业信息(一次性查询,提高效率) careers_result = await db.execute( select(Career).where(Career.project_id == project_id) ) careers_map = {c.id: c for c in careers_result.scalars().all()} # 获取所有角色的职业关联(一次性查询) character_ids = [c.id for c in characters] if not character_ids: return '暂无角色信息' character_careers_result = await db.execute( select(CharacterCareer).where(CharacterCareer.character_id.in_(character_ids)) ) character_careers = character_careers_result.scalars().all() # 构建角色ID到职业信息的映射 char_career_map = {} for cc in character_careers: if cc.character_id not in char_career_map: char_career_map[cc.character_id] = {'main': None, 'sub': []} career = careers_map.get(cc.career_id) if not career: continue career_info = { 'name': career.name, 'stage': cc.current_stage, 'max_stage': career.max_stage, 'stage_progress': cc.stage_progress } if cc.career_type == 'main': char_career_map[cc.character_id]['main'] = career_info else: char_career_map[cc.character_id]['sub'].append(career_info) # 构建角色信息字符串 characters_info_parts = [] for c in characters: # 基本信息 entity_type = '组织' if c.is_organization else '角色' base_info = f"- {c.name}({entity_type}, {c.role_type})" # 职业信息 career_info_str = "" if c.id in char_career_map: career_data = char_career_map[c.id] # 主职业 if career_data['main']: main = career_data['main'] stage_desc = f"{main['stage']}/{main['max_stage']}阶" career_info_str += f" | 主职业: {main['name']}({stage_desc})" # 副职业 if career_data['sub']: sub_list = [] for sub in career_data['sub']: stage_desc = f"{sub['stage']}/{sub['max_stage']}阶" sub_list.append(f"{sub['name']}({stage_desc})") career_info_str += f" | 副职业: {', '.join(sub_list)}" # 性格描述 personality_str = "" if c.personality: personality_preview = c.personality[:100] if len(c.personality) > 100 else c.personality personality_str = f": {personality_preview}" # 组合完整信息 full_info = base_info + career_info_str + personality_str characters_info_parts.append(full_info) return "\n".join(characters_info_parts) @router.get("/{chapter_id}/can-generate", summary="检查章节是否可以生成") async def check_can_generate( chapter_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """ 检查章节是否满足生成条件 返回可生成状态和前置章节信息 """ # 获取章节 result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(chapter.project_id, user_id, db) # 检查前置条件 can_generate, error_msg, previous_chapters = await check_prerequisites(db, chapter) # 构建前置章节信息 previous_info = [ { "id": ch.id, "chapter_number": ch.chapter_number, "title": ch.title, "has_content": bool(ch.content and ch.content.strip()), "word_count": ch.word_count or 0 } for ch in previous_chapters ] return { "can_generate": can_generate, "reason": error_msg if not can_generate else "", "previous_chapters": previous_info, "chapter_number": chapter.chapter_number } async def analyze_chapter_background( chapter_id: str, user_id: str, project_id: str, task_id: str, ai_service: AIService ) -> bool: """ 后台异步分析章节(支持并发,使用锁保护数据库写入) Args: chapter_id: 章节ID user_id: 用户ID project_id: 项目ID task_id: 任务ID ai_service: AI服务实例 Returns: bool: True表示分析成功,False表示分析失败 """ db_session = None write_lock = await get_db_write_lock(user_id) try: logger.info(f"🔍 开始分析章节: {chapter_id}, 任务ID: {task_id}") # 创建独立数据库会话 from app.database import get_engine from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession engine = await get_engine(user_id) AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) db_session = AsyncSessionLocal() # 1. 获取任务(读操作) task_result = await db_session.execute( select(AnalysisTask).where(AnalysisTask.id == task_id) ) task = task_result.scalar_one_or_none() if not task: logger.error(f"❌ 任务不存在: {task_id}") return False # 更新任务状态(写操作,需要锁) async with write_lock: task.status = 'running' task.started_at = datetime.now() task.progress = 10 await db_session.commit() # 2. 获取章节信息(读操作) chapter_result = await db_session.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = chapter_result.scalar_one_or_none() if not chapter or not chapter.content: async with write_lock: task.status = 'failed' task.error_message = '章节不存在或内容为空' task.completed_at = datetime.now() await db_session.commit() logger.error(f"❌ 章节不存在或内容为空: {chapter_id}") return False async with write_lock: task.progress = 20 await db_session.commit() # 获取已埋入的伏笔列表(用于回收匹配,传入当前章节号以启用智能标记) existing_foreshadows = await foreshadow_service.get_planted_foreshadows_for_analysis( db=db_session, project_id=project_id, current_chapter_number=chapter.chapter_number # 传入当前章节号以启用智能标记 ) logger.info(f"📋 后台分析 - 已获取{len(existing_foreshadows)}个已埋入伏笔用于匹配(含智能回收标记)") # 定义重试回调函数,用于在重试时更新任务状态 async def on_retry_callback(attempt: int, max_retries: int, wait_time: int, error_reason: str): """重试时更新任务状态,让前端能感知到重试进度""" try: async with write_lock: # 重新获取任务(确保获取最新状态) task_result_retry = await db_session.execute( select(AnalysisTask).where(AnalysisTask.id == task_id) ) task_retry = task_result_retry.scalar_one_or_none() if task_retry: # 更新任务状态,保持 running 但更新 started_at 以重置超时计时器 task_retry.status = 'running' task_retry.started_at = datetime.now() # 重置开始时间,防止超时检测误判 task_retry.progress = 25 + attempt * 5 # 根据重试次数更新进度 task_retry.error_message = f"正在重试({attempt}/{max_retries}):{error_reason[:100]}" await db_session.commit() logger.info(f"🔄 分析任务重试状态已更新: 尝试 {attempt}/{max_retries}, 等待 {wait_time}s, 原因: {error_reason[:50]}...") except Exception as callback_error: logger.warning(f"⚠️ 更新重试状态失败: {callback_error}") # 3. 使用PlotAnalyzer分析章节(传入已有伏笔列表和重试回调) analyzer = PlotAnalyzer(ai_service) analysis_result = await analyzer.analyze_chapter( chapter_number=chapter.chapter_number, title=chapter.title, content=chapter.content, word_count=chapter.word_count or len(chapter.content), existing_foreshadows=existing_foreshadows, on_retry=on_retry_callback ) if not analysis_result: async with write_lock: task.status = 'failed' task.error_message = 'AI分析失败,请检查日志' task.completed_at = datetime.now() await db_session.commit() logger.error(f"❌ AI分析失败: {chapter_id}") return False async with write_lock: task.progress = 60 await db_session.commit() # 4. 保存分析结果到数据库(写操作,需要锁) async with write_lock: existing_analysis_result = await db_session.execute( select(PlotAnalysis).where(PlotAnalysis.chapter_id == chapter_id) ) existing_analysis = existing_analysis_result.scalar_one_or_none() if existing_analysis: # 更新现有记录 logger.info(f" 更新现有分析记录: {existing_analysis.id}") existing_analysis.plot_stage = analysis_result.get('plot_stage', '发展') existing_analysis.conflict_level = analysis_result.get('conflict', {}).get('level', 0) existing_analysis.conflict_types = analysis_result.get('conflict', {}).get('types', []) existing_analysis.emotional_tone = analysis_result.get('emotional_arc', {}).get('primary_emotion', '') existing_analysis.emotional_intensity = analysis_result.get('emotional_arc', {}).get('intensity', 0) / 10.0 existing_analysis.hooks = analysis_result.get('hooks', []) existing_analysis.hooks_count = len(analysis_result.get('hooks', [])) existing_analysis.foreshadows = analysis_result.get('foreshadows', []) existing_analysis.foreshadows_planted = sum(1 for f in analysis_result.get('foreshadows', []) if f.get('type') == 'planted') existing_analysis.foreshadows_resolved = sum(1 for f in analysis_result.get('foreshadows', []) if f.get('type') == 'resolved') existing_analysis.plot_points = analysis_result.get('plot_points', []) existing_analysis.plot_points_count = len(analysis_result.get('plot_points', [])) existing_analysis.character_states = analysis_result.get('character_states', []) existing_analysis.scenes = analysis_result.get('scenes', []) existing_analysis.pacing = analysis_result.get('pacing', 'moderate') existing_analysis.overall_quality_score = analysis_result.get('scores', {}).get('overall', 0) existing_analysis.pacing_score = analysis_result.get('scores', {}).get('pacing', 0) existing_analysis.engagement_score = analysis_result.get('scores', {}).get('engagement', 0) existing_analysis.coherence_score = analysis_result.get('scores', {}).get('coherence', 0) existing_analysis.analysis_report = analyzer.generate_analysis_summary(analysis_result) existing_analysis.suggestions = analysis_result.get('suggestions', []) existing_analysis.dialogue_ratio = analysis_result.get('dialogue_ratio', 0) existing_analysis.description_ratio = analysis_result.get('description_ratio', 0) else: # 创建新记录 logger.info(f" 创建新的分析记录") plot_analysis = PlotAnalysis( chapter_id=chapter_id, project_id=project_id, plot_stage=analysis_result.get('plot_stage', '发展'), conflict_level=analysis_result.get('conflict', {}).get('level', 0), conflict_types=analysis_result.get('conflict', {}).get('types', []), emotional_tone=analysis_result.get('emotional_arc', {}).get('primary_emotion', ''), emotional_intensity=analysis_result.get('emotional_arc', {}).get('intensity', 0) / 10.0, hooks=analysis_result.get('hooks', []), hooks_count=len(analysis_result.get('hooks', [])), foreshadows=analysis_result.get('foreshadows', []), foreshadows_planted=sum(1 for f in analysis_result.get('foreshadows', []) if f.get('type') == 'planted'), foreshadows_resolved=sum(1 for f in analysis_result.get('foreshadows', []) if f.get('type') == 'resolved'), plot_points=analysis_result.get('plot_points', []), plot_points_count=len(analysis_result.get('plot_points', [])), character_states=analysis_result.get('character_states', []), scenes=analysis_result.get('scenes', []), pacing=analysis_result.get('pacing', 'moderate'), overall_quality_score=analysis_result.get('scores', {}).get('overall', 0), pacing_score=analysis_result.get('scores', {}).get('pacing', 0), engagement_score=analysis_result.get('scores', {}).get('engagement', 0), coherence_score=analysis_result.get('scores', {}).get('coherence', 0), analysis_report=analyzer.generate_analysis_summary(analysis_result), suggestions=analysis_result.get('suggestions', []), dialogue_ratio=analysis_result.get('dialogue_ratio', 0), description_ratio=analysis_result.get('description_ratio', 0) ) db_session.add(plot_analysis) await db_session.commit() task.progress = 80 await db_session.commit() # 5. 清理旧的分析伏笔(重新分析时需要先清理) try: async with write_lock: clean_result = await foreshadow_service.clean_chapter_analysis_foreshadows( db=db_session, project_id=project_id, chapter_id=chapter_id ) if clean_result['cleaned_count'] > 0: logger.info(f"🧹 重新分析前清理了 {clean_result['cleaned_count']} 个旧伏笔") except Exception as clean_error: logger.warning(f"⚠️ 清理旧伏笔失败(继续分析): {str(clean_error)}") # 6. 提取记忆并保存到向量数据库(传入章节内容用于计算位置) memories = analyzer.extract_memories_from_analysis( analysis=analysis_result, chapter_id=chapter_id, chapter_number=chapter.chapter_number, chapter_content=chapter.content or "", chapter_title=chapter.title or "" ) # 先删除该章节的旧记忆(写操作,需要锁) async with write_lock: old_memories_result = await db_session.execute( select(StoryMemory).where(StoryMemory.chapter_id == chapter_id) ) old_memories = old_memories_result.scalars().all() for old_mem in old_memories: await db_session.delete(old_mem) await db_session.commit() logger.info(f" 删除旧记忆: {len(old_memories)}条") # 准备批量添加的记忆数据(不需要锁) memory_records = [] for mem in memories: memory_id = f"{chapter_id}_{mem['type']}_{len(memory_records)}" memory_records.append({ 'id': memory_id, 'content': mem['content'], 'type': mem['type'], 'metadata': mem['metadata'] }) # 保存到关系数据库(写操作,需要锁) async with write_lock: for mem in memories: memory_id = memory_records[memories.index(mem)]['id'] text_position = mem['metadata'].get('text_position', -1) text_length = mem['metadata'].get('text_length', 0) story_memory = StoryMemory( id=memory_id, project_id=project_id, chapter_id=chapter_id, memory_type=mem['type'], content=mem['content'], title=mem['title'], importance_score=mem['metadata'].get('importance_score', 0.5), tags=mem['metadata'].get('tags', []), is_foreshadow=mem['metadata'].get('is_foreshadow', 0), story_timeline=chapter.chapter_number, chapter_position=text_position, text_length=text_length, related_characters=mem['metadata'].get('related_characters', []), related_locations=mem['metadata'].get('related_locations', []) ) db_session.add(story_memory) if text_position >= 0: logger.debug(f" 保存记忆 {memory_id}: position={text_position}, length={text_length}") await db_session.commit() # 批量添加到向量数据库 if memory_records: added_count = await memory_service.batch_add_memories( user_id=user_id, project_id=project_id, memories=memory_records ) logger.info(f"✅ 添加{added_count}条记忆到向量库") # 💼 更新角色职业(根据分析结果) if analysis_result.get('character_states'): try: from app.services.career_update_service import CareerUpdateService logger.info(f"💼 开始根据分析结果更新角色职业...") career_update_result = await CareerUpdateService.update_careers_from_analysis( db=db_session, project_id=project_id, character_states=analysis_result.get('character_states', []), chapter_id=chapter_id, chapter_number=chapter.chapter_number ) if career_update_result['updated_count'] > 0: logger.info( f"✅ 更新了 {career_update_result['updated_count']} 个角色的职业信息" ) if career_update_result['changes']: for change in career_update_result['changes']: logger.info(f" - {change}") else: logger.info("ℹ️ 本章节无角色职业变化") except Exception as career_error: # 职业更新失败不应影响整个分析流程 logger.error(f"⚠️ 更新角色职业失败: {str(career_error)}", exc_info=True) else: logger.debug("📋 分析结果中无角色状态信息,跳过职业更新") # 🔮 自动更新伏笔状态(根据分析结果) if analysis_result.get('foreshadows'): try: logger.info(f"🔮 开始根据分析结果自动更新伏笔状态...") async with write_lock: foreshadow_stats = await foreshadow_service.auto_update_from_analysis( db=db_session, project_id=project_id, chapter_id=chapter_id, chapter_number=chapter.chapter_number, analysis_foreshadows=analysis_result.get('foreshadows', []) ) if foreshadow_stats['planted_count'] > 0 or foreshadow_stats['resolved_count'] > 0: logger.info( f"✅ 伏笔自动更新: 埋入{foreshadow_stats['planted_count']}个, " f"回收{foreshadow_stats['resolved_count']}个" ) else: logger.info("ℹ️ 本章节无新的伏笔状态变化") except Exception as foreshadow_error: # 伏笔更新失败不应影响整个分析流程 logger.error(f"⚠️ 自动更新伏笔失败: {str(foreshadow_error)}", exc_info=True) else: logger.debug("📋 分析结果中无伏笔信息,跳过伏笔自动更新") # 最终更新任务状态(写操作,需要锁)- 增加重试机制 update_success = False for retry in range(3): try: async with write_lock: task.progress = 100 task.status = 'completed' task.completed_at = datetime.now() await db_session.commit() update_success = True logger.info(f"✅ 章节分析完成: {chapter_id}, 提取{len(memories)}条记忆") break except Exception as commit_error: logger.error(f"❌ 提交任务完成状态失败(重试{retry+1}/3): {str(commit_error)}") if retry < 2: await asyncio.sleep(0.1) else: logger.error(f"❌ 无法更新任务为completed状态: {task_id}") # 即使失败也不抛出异常,因为分析本身已经完成 if not update_success: logger.warning(f"⚠️ 章节分析完成但状态更新失败: {chapter_id}") # 返回成功状态 return True except Exception as e: logger.error(f"❌ 后台分析异常: {str(e)}", exc_info=True) # 确保任务状态被更新为failed(写操作,需要锁) if db_session: # 多次重试更新任务状态 for retry in range(3): try: async with write_lock: # 重新获取任务(可能是旧会话导致的问题) task_result = await db_session.execute( select(AnalysisTask).where(AnalysisTask.id == task_id) ) task = task_result.scalar_one_or_none() if task: task.status = 'failed' task.error_message = str(e)[:500] task.completed_at = datetime.now() task.progress = 0 await db_session.commit() logger.info(f"✅ 任务状态已更新为failed: {task_id} (重试{retry+1}次)") break else: logger.error(f"❌ 无法找到任务进行状态更新: {task_id}") break except Exception as update_error: logger.error(f"❌ 更新任务状态失败(重试{retry+1}/3): {str(update_error)}") if retry < 2: await asyncio.sleep(0.1) # 短暂等待后重试 else: logger.error(f"❌ 任务状态更新失败,已达到最大重试次数: {task_id}") # 返回失败状态 return False finally: if db_session: await db_session.close() @router.post("/{chapter_id}/generate-stream", summary="AI创作章节内容(流式)") async def generate_chapter_content_stream( chapter_id: str, request: Request, background_tasks: BackgroundTasks, generate_request: ChapterGenerateRequest = ChapterGenerateRequest(), user_ai_service: AIService = Depends(get_user_ai_service) ): """ 根据大纲、前置章节内容和项目信息AI创作章节完整内容(流式返回) 要求:必须按顺序生成,确保前置章节都已完成 请求体参数: - style_id: 可选,指定使用的写作风格ID。不提供则不使用任何风格 - target_word_count: 可选,目标字数,默认3000字,范围500-10000字 - enable_mcp: 可选,是否启用MCP工具增强,默认True 注意:此函数不使用依赖注入的db,而是在生成器内部创建独立的数据库会话 以避免流式响应期间的连接泄漏问题 """ style_id = generate_request.style_id target_word_count = generate_request.target_word_count or 3000 custom_model = generate_request.model if hasattr(generate_request, 'model') else None temp_narrative_perspective = generate_request.narrative_perspective if hasattr(generate_request, 'narrative_perspective') else None # 预先验证章节存在性(使用临时会话) async for temp_db in get_db(request): try: result = await temp_db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") # 检查前置条件 can_generate, error_msg, previous_chapters = await check_prerequisites(temp_db, chapter) if not can_generate: raise HTTPException(status_code=400, detail=error_msg) # 保存前置章节数据供生成器使用 previous_chapters_data = [ { 'id': ch.id, 'chapter_number': ch.chapter_number, 'title': ch.title, 'content': ch.content } for ch in previous_chapters ] finally: await temp_db.close() break async def event_generator(): # 在生成器内部创建独立的数据库会话 db_session = None db_committed = False # 获取当前用户ID(在生成器外部就需要) current_user_id = getattr(request.state, "user_id", "system") # 初始化标准进度追踪器 from app.utils.sse_response import WizardProgressTracker tracker = WizardProgressTracker("章节") try: yield await tracker.start() # 创建新的数据库会话 async for db_session in get_db(request): # === 加载阶段 === yield await tracker.loading("加载章节信息...", 0.2) # 重新获取章节信息 chapter_result = await db_session.execute( select(Chapter).where(Chapter.id == chapter_id) ) current_chapter = chapter_result.scalar_one_or_none() if not current_chapter: yield await tracker.error("章节不存在", 404) return yield await tracker.loading("加载项目信息...", 0.4) # 获取项目信息 project_result = await db_session.execute( select(Project).where(Project.id == current_chapter.project_id) ) project = project_result.scalar_one_or_none() if not project: yield await tracker.error("项目不存在", 404) return # 获取项目的大纲模式 outline_mode = project.outline_mode if project else 'one-to-many' logger.info(f"📋 项目大纲模式: {outline_mode}") # 获取对应的大纲 outline_result = await db_session.execute( select(Outline) .where(Outline.project_id == current_chapter.project_id) .where(Outline.order_index == current_chapter.chapter_number) .execution_options(populate_existing=True) ) outline = outline_result.scalar_one_or_none() # 获取所有大纲用于上下文 all_outlines_result = await db_session.execute( select(Outline) .where(Outline.project_id == current_chapter.project_id) .order_by(Outline.order_index) .execution_options(populate_existing=True) ) all_outlines = all_outlines_result.scalars().all() outlines_context = "\n".join([ f"第{o.order_index}章 {o.title}: {o.content[:100]}..." for o in all_outlines ]) # 获取角色信息(包含职业信息) characters_result = await db_session.execute( select(Character).where(Character.project_id == current_chapter.project_id) ) characters = characters_result.scalars().all() # 📝 根据大纲模式智能筛选相关角色 filter_character_names = None if outline_mode == 'one-to-one': # 1-1模式:从outline.structure中提取characters字段 if outline and outline.structure: try: structure = json.loads(outline.structure) filter_character_names = structure.get('characters', []) if filter_character_names: logger.info(f"📋 1-1模式:从structure提取角色列表 {filter_character_names}") except json.JSONDecodeError: logger.warning(f"⚠️ outline.structure解析失败,使用全部角色") else: # 1-n模式:从chapter.expansion_plan中提取character_focus字段 if current_chapter.expansion_plan: try: plan = json.loads(current_chapter.expansion_plan) filter_character_names = plan.get('character_focus', []) if filter_character_names: logger.info(f"📋 1-n模式:从expansion_plan提取角色焦点 {filter_character_names}") except json.JSONDecodeError: logger.warning(f"⚠️ expansion_plan解析失败,使用全部角色") characters_info = await build_characters_info_with_careers( db=db_session, project_id=current_chapter.project_id, characters=characters, filter_character_names=filter_character_names ) # 获取写作风格 style_content = "" if style_id: # 使用指定的风格 style_result = await db_session.execute( select(WritingStyle).where(WritingStyle.id == style_id) ) style = style_result.scalar_one_or_none() if style: # 验证风格是否可用:全局预设风格(user_id为NULL)或者当前用户的自定义风格 if style.user_id is None or style.user_id == current_user_id: style_content = style.prompt_content or "" style_type = "全局预设" if style.user_id is None else "用户自定义" logger.info(f"使用指定风格: {style.name} ({style_type})") else: logger.warning(f"风格 {style_id} 不属于当前项目,无法使用") else: logger.warning(f"未找到风格 {style_id}") else: logger.info("未指定写作风格,使用原始提示词") # 🚀 使用新的优化上下文构建器(含伏笔服务) logger.info(f"🔧 使用优化的章节上下文构建器(V2 + 伏笔提醒)") context_builder = ChapterContextBuilder(foreshadow_service=foreshadow_service) chapter_context = await context_builder.build( chapter=current_chapter, project=project, outline=outline, user_id=current_user_id, db=db_session ) # 日志输出统计信息 logger.info(f"📊 优化上下文统计:") logger.info(f" - 章节序号: {current_chapter.chapter_number}") logger.info(f" - 衔接锚点长度: {len(chapter_context.continuation_point or '')} 字符") logger.info(f" - 相关记忆: {chapter_context.context_stats.get('memory_count', 0)} 条") logger.info(f" - 总上下文长度: {chapter_context.context_stats.get('total_length', 0)} 字符") yield await tracker.loading("上下文构建完成", 0.8) # 🎭 确定使用的叙事人称(临时指定 > 项目默认 > 系统默认) chapter_perspective = ( temp_narrative_perspective or project.narrative_perspective or '第三人称' ) logger.info(f"📝 使用叙事人称: {chapter_perspective}") # 📋 根据大纲模式构建差异化的章节大纲上下文 chapter_outline_content = "" if outline_mode == 'one-to-one': # 一对一模式:使用大纲的 content chapter_outline_content = outline.content if outline else current_chapter.summary or '暂无大纲' logger.info(f"✏️ 一对一模式:使用大纲内容作为章节指导") else: # 一对多模式:优先使用 expansion_plan 的详细规划 if current_chapter.expansion_plan: try: plan = json.loads(current_chapter.expansion_plan) chapter_outline_content = f"""【本章详细规划】 剧情摘要:{plan.get('plot_summary', '无')} 关键事件: {chr(10).join(f'- {event}' for event in plan.get('key_events', []))} 角色焦点:{', '.join(plan.get('character_focus', []))} 情感基调:{plan.get('emotional_tone', '未设定')} 叙事目标:{plan.get('narrative_goal', '未设定')} 冲突类型:{plan.get('conflict_type', '未设定')}""" # 可选:附加章节 summary if current_chapter.summary and current_chapter.summary.strip(): chapter_outline_content += f"\n\n【章节补充说明】\n{current_chapter.summary}" # 可选:附加大纲的背景信息(限制长度,避免喧宾夺主) if outline: outline_bg = outline.content if len(outline_bg) > 200: outline_bg = outline_bg[:200] + "..." chapter_outline_content += f"\n\n【大纲节点背景】\n{outline_bg}" logger.info(f"✏️ 一对多模式:使用expansion_plan详细规划({len(chapter_outline_content)}字符)") except json.JSONDecodeError as e: logger.warning(f"⚠️ expansion_plan解析失败: {e},回退到大纲内容") chapter_outline_content = outline.content if outline else current_chapter.summary or '暂无大纲' else: # 没有expansion_plan,使用大纲内容 chapter_outline_content = outline.content if outline else current_chapter.summary or '暂无大纲' logger.warning(f"⚠️ 一对多模式但无expansion_plan,使用大纲内容") # 🚀 使用 V2 优化模板构建提示词 if chapter_context.continuation_point: # 有前置内容,使用 WITH_CONTEXT 模板 # 尝试从context中提取上一章摘要 previous_summary = "(无上一章摘要,请根据锚点续写)" if chapter_context.context_stats.get('recent_summaries', 0) > 0: # 简单的提取逻辑,实际可能需要更精确的解析 # 但在这里,context_stats并没有直接存储内容。 # 我们利用ChapterContext对象中可能存在的summary信息,或者直接从recent_summary文本中截取最后一段 if hasattr(chapter_context, 'recent_summary') and chapter_context.recent_summary: lines = chapter_context.recent_summary.strip().split('\n') if lines: previous_summary = lines[-1] template = await PromptService.get_template("CHAPTER_GENERATION_V2_WITH_CONTEXT", current_user_id, db_session) base_prompt = PromptService.format_prompt( template, # P0 核心参数 project_title=project.title, chapter_number=current_chapter.chapter_number, chapter_title=current_chapter.title, chapter_outline=chapter_outline_content, target_word_count=target_word_count, continuation_point=chapter_context.continuation_point, # P1 重要参数 genre=project.genre or '未设定', narrative_perspective=chapter_perspective, characters_info=characters_info or '暂无角色信息', foreshadow_reminders=chapter_context.foreshadow_reminders or '暂无需要关注的伏笔', previous_chapter_summary=previous_summary, # P2 参考参数(动态裁剪后的) story_skeleton=chapter_context.story_skeleton or '', relevant_memories=chapter_context.relevant_memories or '' ) else: # 第一章,使用无前置内容模板 template = await PromptService.get_template("CHAPTER_GENERATION_V2", current_user_id, db_session) base_prompt = PromptService.format_prompt( template, # P0 核心参数 project_title=project.title, chapter_number=current_chapter.chapter_number, chapter_title=current_chapter.title, chapter_outline=chapter_outline_content, target_word_count=target_word_count, # P1 重要参数 genre=project.genre or '未设定', narrative_perspective=chapter_perspective, characters_info=characters_info or '暂无角色信息' ) # 应用写作风格 if style_content: prompt = WritingStyleManager.apply_style_to_prompt(base_prompt, style_content) else: prompt = base_prompt # === 准备阶段 === yield await tracker.preparing("准备AI提示词...") logger.info(f"开始AI流式创作章节 {chapter_id}") # 🎨 方案一:将写作风格注入到系统提示词(最高优先级) system_prompt_with_style = None if style_content: system_prompt_with_style = f"""【🎨 写作风格要求 - 最高优先级】 {style_content} ⚠️ 请严格遵循上述写作风格要求进行创作,这是最重要的指令! 确保在整个章节创作过程中始终保持风格的一致性。""" logger.info(f"✅ 已将写作风格注入系统提示词({len(style_content)}字符)") # 🔢 计算 max_tokens 限制 # 中文字符约 1.5-2 个 token,使用 2.5 倍系数确保有足够空间完成段落 # 同时设置上限防止过长,下限确保基本可用 calculated_max_tokens = int(target_word_count * 3) calculated_max_tokens = max(2000, min(calculated_max_tokens, 16000)) # 限制在 2000-16000 之间 logger.info(f"📊 目标字数: {target_word_count}, 计算 max_tokens: {calculated_max_tokens}") # 准备生成参数 generate_kwargs = { "prompt": prompt, "system_prompt": system_prompt_with_style, "tool_choice": "required", "max_tokens": calculated_max_tokens # 添加 max_tokens 限制 } if custom_model: logger.info(f" 使用自定义模型: {custom_model}") generate_kwargs["model"] = custom_model # 注意:这里使用用户配置的AI服务,模型参数会覆盖默认模型 # 如果需要切换provider,需要在前端传递provider参数 # === 生成阶段 === full_content = "" chunk_count = 0 yield await tracker.generating( current_chars=0, estimated_total=target_word_count ) async for chunk in user_ai_service.generate_text_stream(**generate_kwargs): full_content += chunk chunk_count += 1 # 发送内容块 yield await tracker.generating_chunk(chunk) # 每5个chunk发送一次进度更新 if chunk_count % 5 == 0: yield await tracker.generating( current_chars=len(full_content), estimated_total=target_word_count, message=f'正在创作中... 已生成 {len(full_content)} 字' ) # 每20个chunk发送心跳 if chunk_count % 20 == 0: yield await tracker.heartbeat() await asyncio.sleep(0) # 让出控制权 # === 保存阶段 === yield await tracker.saving("正在保存章节...", 0.3) # 更新章节内容到数据库 old_word_count = current_chapter.word_count or 0 current_chapter.content = full_content new_word_count = len(full_content) current_chapter.word_count = new_word_count current_chapter.status = "completed" # 更新项目字数 project.current_words = project.current_words - old_word_count + new_word_count # 记录生成历史 history = GenerationHistory( project_id=current_chapter.project_id, chapter_id=current_chapter.id, prompt=f"创作章节: 第{current_chapter.chapter_number}章 {current_chapter.title}", generated_content=full_content[:500] if len(full_content) > 500 else full_content, model="default" ) db_session.add(history) await db_session.commit() db_committed = True await db_session.refresh(current_chapter) logger.info(f"成功创作章节 {chapter_id},共 {new_word_count} 字") # 🔮 章节生成后自动标记计划在本章埋入的伏笔 try: plant_result = await foreshadow_service.auto_plant_pending_foreshadows( db=db_session, project_id=project.id, chapter_id=chapter_id, chapter_number=current_chapter.chapter_number, chapter_content=full_content ) if plant_result.get('planted_count', 0) > 0: logger.info(f"🔮 自动标记伏笔已埋入: {plant_result['planted_count']}个") except Exception as plant_error: logger.warning(f"⚠️ 自动标记伏笔埋入失败: {str(plant_error)}") # 创建分析任务 analysis_task = AnalysisTask( chapter_id=chapter_id, user_id=current_user_id, project_id=project.id, status='pending', progress=0 ) db_session.add(analysis_task) await db_session.commit() await db_session.refresh(analysis_task) task_id = analysis_task.id logger.info(f"📋 已创建分析任务: {task_id}") # 短暂延迟确保SQLite WAL完成写入 await asyncio.sleep(0.05) # 直接启动后台分析(并发执行) background_tasks.add_task( analyze_chapter_background, chapter_id=chapter_id, user_id=current_user_id, project_id=project.id, task_id=task_id, ai_service=user_ai_service ) yield await tracker.saving("章节保存完成", 0.8) # === 完成阶段 === yield await tracker.complete("创作完成!") # 发送结果数据 yield await tracker.result({ 'word_count': new_word_count, 'analysis_task_id': task_id }) # 发送分析开始事件(使用自定义事件) yield await SSEResponse.send_event( event='analysis_started', data={ 'task_id': task_id, 'message': '章节分析已开始' } ) # 发送完成信号 yield await tracker.done() break # 退出async for db_session循环 except GeneratorExit: # SSE连接断开 logger.warning("章节生成器被提前关闭(SSE断开)") if db_session and not db_committed: try: if db_session.in_transaction(): await db_session.rollback() logger.info("章节生成事务已回滚(GeneratorExit)") except Exception as e: logger.error(f"GeneratorExit回滚失败: {str(e)}") except Exception as e: logger.error(f"流式创作章节失败: {str(e)}") if db_session and not db_committed: try: if db_session.in_transaction(): await db_session.rollback() logger.info("章节生成事务已回滚(异常)") except Exception as rollback_error: logger.error(f"回滚失败: {str(rollback_error)}") yield await tracker.error(str(e)) finally: # 确保数据库会话被正确关闭 if db_session: try: # 最后检查:确保没有未提交的事务 if not db_committed and db_session.in_transaction(): await db_session.rollback() logger.warning("在finally中发现未提交事务,已回滚") await db_session.close() logger.info("数据库会话已关闭") except Exception as close_error: logger.error(f"关闭数据库会话失败: {str(close_error)}") # 强制关闭 try: await db_session.close() except: pass return create_sse_response(event_generator()) @router.get("/{chapter_id}/analysis/status", summary="查询章节分析任务状态") async def get_analysis_task_status( chapter_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """ 查询指定章节的最新分析任务状态 自动恢复机制: - 如果任务状态为running且超过1分钟未更新,自动标记为failed - 如果任务状态为pending且超过2分钟未启动,自动标记为failed 返回: - has_task: 是否存在分析任务 - task_id: 任务ID(如果存在) - status: pending/running/completed/failed/none(如果不存在则为none) - progress: 0-100 - error_message: 错误信息(如果失败) - auto_recovered: 是否被自动恢复 - created_at: 创建时间 - completed_at: 完成时间 注意:当章节不存在或无权访问时返回404,当没有分析任务时返回has_task=false """ from datetime import timedelta # 先获取章节以验证存在性和权限 chapter_result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = chapter_result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(chapter.project_id, user_id, db) # 获取该章节最新的分析任务 result = await db.execute( select(AnalysisTask) .where(AnalysisTask.chapter_id == chapter_id) .order_by(AnalysisTask.created_at.desc()) .limit(1) ) task = result.scalar_one_or_none() if not task: # 返回无任务状态,而不是抛出404错误 return { "has_task": False, "chapter_id": chapter_id, "status": "none", "progress": 0, "error_message": None, "auto_recovered": False, "task_id": None, "created_at": None, "started_at": None, "completed_at": None } auto_recovered = False current_time = datetime.now() # 自动恢复卡住的任务 # 注意:后端分析有3次重试机制,每次重试会重置 started_at # 所以超时时间需要足够长以支持完整的重试周期(约5分钟) if task.status == 'running': # 检查是否正在重试(error_message 包含"重试"信息) is_retrying = task.error_message and '重试' in task.error_message # 如果正在重试,给予更长的超时时间(5分钟),否则3分钟 timeout_minutes = 5 if is_retrying else 3 # 如果任务在running状态超过超时时间,标记为失败 if task.started_at and (current_time - task.started_at) > timedelta(minutes=timeout_minutes): task.status = 'failed' task.error_message = f'任务超时(超过{timeout_minutes}分钟未完成,已自动恢复)' task.completed_at = current_time task.progress = 0 auto_recovered = True await db.commit() await db.refresh(task) logger.warning(f"🔄 自动恢复卡住的任务: {task.id}, 章节: {chapter_id}") elif task.status == 'pending': # 如果任务在pending状态超过3分钟仍未开始,标记为失败 if task.created_at and (current_time - task.created_at) > timedelta(minutes=3): task.status = 'failed' task.error_message = '任务启动超时(超过3分钟未启动,已自动恢复)' task.completed_at = current_time task.progress = 0 auto_recovered = True await db.commit() await db.refresh(task) logger.warning(f"🔄 自动恢复未启动的任务: {task.id}, 章节: {chapter_id}") return { "has_task": True, "task_id": task.id, "chapter_id": task.chapter_id, "status": task.status, "progress": task.progress, "error_message": task.error_message, "auto_recovered": auto_recovered, "created_at": task.created_at.isoformat() if task.created_at else None, "started_at": task.started_at.isoformat() if task.started_at else None, "completed_at": task.completed_at.isoformat() if task.completed_at else None } @router.get("/{chapter_id}/analysis", summary="获取章节分析结果") async def get_chapter_analysis( chapter_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """ 获取章节的完整分析结果 返回: - analysis_data: 完整的分析数据(JSON) - summary: 分析摘要文本 - memories: 提取的记忆列表 - created_at: 分析时间 """ # 先获取章节以验证权限 chapter_result_check = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter_check = chapter_result_check.scalar_one_or_none() if chapter_check: # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(chapter_check.project_id, user_id, db) # 获取分析结果 analysis_result = await db.execute( select(PlotAnalysis) .where(PlotAnalysis.chapter_id == chapter_id) .order_by(PlotAnalysis.created_at.desc()) .limit(1) ) analysis = analysis_result.scalar_one_or_none() if not analysis: raise HTTPException(status_code=404, detail="该章节暂无分析结果") # 获取相关记忆 memories_result = await db.execute( select(StoryMemory) .where(StoryMemory.chapter_id == chapter_id) .order_by(StoryMemory.importance_score.desc()) ) memories = memories_result.scalars().all() return { "chapter_id": chapter_id, "analysis": analysis.to_dict(), # 使用to_dict()方法 "memories": [ { "id": mem.id, "type": mem.memory_type, "title": mem.title, "content": mem.content, "importance": mem.importance_score, "tags": mem.tags, "is_foreshadow": mem.is_foreshadow, "position": mem.chapter_position, "related_characters": mem.related_characters } for mem in memories ], "created_at": analysis.created_at.isoformat() if analysis.created_at else None } @router.get("/{chapter_id}/annotations", summary="获取章节标注数据") async def get_chapter_annotations( chapter_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """ 获取章节的标注数据(用于前端展示标注) 返回格式化的标注列表,包含精确位置信息 适用于章节内容的可视化标注展示 """ # 验证用户权限 user_id = getattr(request.state, 'user_id', None) # 获取章节 chapter_result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = chapter_result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") # 验证项目访问权限 await verify_project_access(chapter.project_id, user_id, db) # 获取分析结果 analysis_result = await db.execute( select(PlotAnalysis) .where(PlotAnalysis.chapter_id == chapter_id) .order_by(PlotAnalysis.created_at.desc()) .limit(1) ) analysis = analysis_result.scalar_one_or_none() # 获取记忆 memories_result = await db.execute( select(StoryMemory) .where(StoryMemory.chapter_id == chapter_id) .order_by(StoryMemory.importance_score.desc()) ) memories = memories_result.scalars().all() # 构建标注数据 annotations = [] for mem in memories: # 优先从数据库读取位置信息 position = mem.chapter_position if mem.chapter_position is not None else -1 length = mem.text_length if hasattr(mem, 'text_length') and mem.text_length is not None else 0 metadata_extra = {} # 如果数据库中没有位置信息,尝试从分析数据中重新计算 if position == -1 and analysis and chapter.content: # 根据记忆类型从分析数据中查找对应项 if mem.memory_type == 'hook' and analysis.hooks: for hook in analysis.hooks: # 通过标题或内容匹配 if mem.title and hook.get('type') in mem.title: keyword = hook.get('keyword', '') if keyword: pos = chapter.content.find(keyword) if pos != -1: position = pos length = len(keyword) metadata_extra["strength"] = hook.get('strength', 5) metadata_extra["position_desc"] = hook.get('position', '') break elif mem.memory_type == 'foreshadow' and analysis.foreshadows: for foreshadow in analysis.foreshadows: if foreshadow.get('content') in mem.content: keyword = foreshadow.get('keyword', '') if keyword: pos = chapter.content.find(keyword) if pos != -1: position = pos length = len(keyword) metadata_extra["foreshadow_type"] = foreshadow.get('type', 'planted') metadata_extra["strength"] = foreshadow.get('strength', 5) break elif mem.memory_type == 'plot_point' and analysis.plot_points: for plot_point in analysis.plot_points: if plot_point.get('content') in mem.content: keyword = plot_point.get('keyword', '') if keyword: pos = chapter.content.find(keyword) if pos != -1: position = pos length = len(keyword) break else: # 如果数据库有位置,也从分析数据中提取额外的元数据 if analysis: if mem.memory_type == 'hook' and analysis.hooks: for hook in analysis.hooks: if mem.title and hook.get('type') in mem.title: metadata_extra["strength"] = hook.get('strength', 5) metadata_extra["position_desc"] = hook.get('position', '') break elif mem.memory_type == 'foreshadow' and analysis.foreshadows: for foreshadow in analysis.foreshadows: if foreshadow.get('content') in mem.content: metadata_extra["foreshadow_type"] = foreshadow.get('type', 'planted') metadata_extra["strength"] = foreshadow.get('strength', 5) break annotation = { "id": mem.id, "type": mem.memory_type, "title": mem.title, "content": mem.content, "importance": mem.importance_score or 0.5, "position": position, "length": length, "tags": mem.tags or [], "metadata": { "is_foreshadow": mem.is_foreshadow, "related_characters": mem.related_characters or [], "related_locations": mem.related_locations or [], **metadata_extra } } annotations.append(annotation) return { "chapter_id": chapter_id, "chapter_number": chapter.chapter_number, "title": chapter.title, "word_count": chapter.word_count or 0, "annotations": annotations, "has_analysis": analysis is not None, "summary": { "total_annotations": len(annotations), "hooks": len([a for a in annotations if a["type"] == "hook"]), "foreshadows": len([a for a in annotations if a["type"] == "foreshadow"]), "plot_points": len([a for a in annotations if a["type"] == "plot_point"]), "character_events": len([a for a in annotations if a["type"] == "character_event"]) } } @router.post("/{chapter_id}/analyze", summary="手动触发章节分析") async def trigger_chapter_analysis( chapter_id: str, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), user_ai_service: AIService = Depends(get_user_ai_service) ): """ 手动触发章节分析(用于重新分析或分析旧章节) """ # 从请求中获取用户ID user_id = getattr(request.state, "user_id", None) if not user_id: raise HTTPException(status_code=401, detail="未登录") # 验证章节存在 chapter_result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = chapter_result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") if not chapter.content or chapter.content.strip() == "": raise HTTPException(status_code=400, detail="章节内容为空,无法分析") # 获取项目信息 project_result = await db.execute( select(Project).where(Project.id == chapter.project_id) ) project = project_result.scalar_one_or_none() if not project: raise HTTPException(status_code=404, detail="项目不存在") # 创建分析任务 analysis_task = AnalysisTask( chapter_id=chapter_id, user_id=user_id, project_id=project.id, status='pending', progress=0 ) db.add(analysis_task) await db.commit() task_id = analysis_task.id logger.info(f"📋 创建分析任务: {task_id}, 章节: {chapter_id}") # 刷新数据库会话,确保其他会话可以看到新任务 await db.refresh(analysis_task) # 短暂延迟确保SQLite WAL完成写入(让其他会话可见) await asyncio.sleep(3) # 直接启动后台分析(并发执行) background_tasks.add_task( analyze_chapter_background, chapter_id=chapter_id, user_id=user_id, project_id=project.id, task_id=task_id, ai_service=user_ai_service ) return { "task_id": task_id, "chapter_id": chapter_id, "status": "pending", "message": "分析任务已创建并开始执行" } def calculate_estimated_time( chapter_count: int, target_word_count: int, enable_analysis: bool ) -> int: """ 计算预估耗时(分钟) 基准: - 生成3000字约需2分钟 - 分析约需1分钟 """ generation_time_per_chapter = (target_word_count / 3000) * 2 analysis_time_per_chapter = 1 if enable_analysis else 0 total_time = chapter_count * (generation_time_per_chapter + analysis_time_per_chapter) return max(1, int(total_time)) @router.post("/project/{project_id}/batch-generate", response_model=BatchGenerateResponse, summary="批量顺序生成章节内容") async def batch_generate_chapters_in_order( project_id: str, batch_request: BatchGenerateRequest, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), user_ai_service: AIService = Depends(get_user_ai_service) ): """ 从指定章节开始,按顺序批量生成指定数量的章节 特性: 1. 严格按章节序号顺序生成(不可跳过) 2. 自动检测起始章节是否可生成 3. 可选同步分析(影响耗时和质量) 4. 失败后终止,不继续后续章节 """ user_id = getattr(request.state, "user_id", None) if not user_id: raise HTTPException(status_code=401, detail="未登录") # 验证项目存在和用户权限 project = await verify_project_access(project_id, user_id, db) # 获取项目的所有章节,按序号排序 result = await db.execute( select(Chapter) .where(Chapter.project_id == project_id) .order_by(Chapter.chapter_number) ) all_chapters = result.scalars().all() if not all_chapters: raise HTTPException(status_code=404, detail="项目没有章节") # 计算要生成的章节范围 start_number = batch_request.start_chapter_number end_number = start_number + batch_request.count - 1 # 筛选出要生成的章节 chapters_to_generate = [ ch for ch in all_chapters if start_number <= ch.chapter_number <= end_number ] if not chapters_to_generate: raise HTTPException(status_code=404, detail="指定范围内没有章节") # 验证起始章节的前置条件 first_chapter = chapters_to_generate[0] can_generate, error_msg, _ = await check_prerequisites(db, first_chapter) if not can_generate: raise HTTPException(status_code=400, detail=f"起始章节无法生成:{error_msg}") # 创建批量生成任务 batch_task = BatchGenerationTask( project_id=project_id, user_id=user_id, start_chapter_number=start_number, chapter_count=len(chapters_to_generate), chapter_ids=[ch.id for ch in chapters_to_generate], style_id=batch_request.style_id, target_word_count=batch_request.target_word_count, enable_analysis=batch_request.enable_analysis, max_retries=batch_request.max_retries, status='pending', total_chapters=len(chapters_to_generate), completed_chapters=0, failed_chapters=[], current_retry_count=0 ) db.add(batch_task) await db.commit() await db.refresh(batch_task) batch_id = batch_task.id # 计算预估耗时 estimated_time = calculate_estimated_time( chapter_count=len(chapters_to_generate), target_word_count=batch_request.target_word_count, enable_analysis=batch_request.enable_analysis ) logger.info(f"📦 创建批量生成任务: {batch_id}, 章节: 第{start_number}-{end_number}章, 预估耗时: {estimated_time}分钟") # 启动后台批量生成任务,传递model参数 background_tasks.add_task( execute_batch_generation_in_order, batch_id=batch_id, user_id=user_id, ai_service=user_ai_service, custom_model=batch_request.model ) return BatchGenerateResponse( batch_id=batch_id, message=f"批量生成任务已创建,将生成 {len(chapters_to_generate)} 个章节", chapters_to_generate=[ { "id": ch.id, "chapter_number": ch.chapter_number, "title": ch.title } for ch in chapters_to_generate ], estimated_time_minutes=estimated_time ) @router.get("/batch-generate/{batch_id}/status", response_model=BatchGenerateStatusResponse, summary="查询批量生成任务状态") async def get_batch_generation_status( batch_id: str, db: AsyncSession = Depends(get_db) ): """查询批量生成任务的状态和进度""" result = await db.execute( select(BatchGenerationTask).where(BatchGenerationTask.id == batch_id) ) task = result.scalar_one_or_none() if not task: raise HTTPException(status_code=404, detail="批量生成任务不存在") return BatchGenerateStatusResponse( batch_id=task.id, status=task.status, total=task.total_chapters, completed=task.completed_chapters, current_chapter_id=task.current_chapter_id, current_chapter_number=task.current_chapter_number, current_retry_count=task.current_retry_count, max_retries=task.max_retries, failed_chapters=task.failed_chapters or [], created_at=task.created_at.isoformat() if task.created_at else None, started_at=task.started_at.isoformat() if task.started_at else None, completed_at=task.completed_at.isoformat() if task.completed_at else None, error_message=task.error_message ) @router.get("/project/{project_id}/batch-generate/active", summary="获取项目当前运行中的批量生成任务") async def get_active_batch_generation( project_id: str, request: Request, db: AsyncSession = Depends(get_db) ): """ 获取项目当前运行中的批量生成任务 用于页面刷新后恢复任务状态 """ # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(project_id, user_id, db) result = await db.execute( select(BatchGenerationTask) .where(BatchGenerationTask.project_id == project_id) .where(BatchGenerationTask.status.in_(['pending', 'running'])) .order_by(BatchGenerationTask.created_at.desc()) .limit(1) ) task = result.scalar_one_or_none() if not task: return { "has_active_task": False, "task": None } return { "has_active_task": True, "task": { "batch_id": task.id, "status": task.status, "total": task.total_chapters, "completed": task.completed_chapters, "current_chapter_id": task.current_chapter_id, "current_chapter_number": task.current_chapter_number, "created_at": task.created_at.isoformat() if task.created_at else None, "started_at": task.started_at.isoformat() if task.started_at else None } } @router.post("/batch-generate/{batch_id}/cancel", summary="取消批量生成任务") async def cancel_batch_generation( batch_id: str, db: AsyncSession = Depends(get_db) ): """取消正在进行的批量生成任务""" result = await db.execute( select(BatchGenerationTask).where(BatchGenerationTask.id == batch_id) ) task = result.scalar_one_or_none() if not task: raise HTTPException(status_code=404, detail="批量生成任务不存在") if task.status in ['completed', 'failed', 'cancelled']: raise HTTPException(status_code=400, detail=f"任务已处于 {task.status} 状态,无法取消") task.status = 'cancelled' task.completed_at = datetime.now() await db.commit() logger.info(f"🛑 批量生成任务已取消: {batch_id}") return { "message": "批量生成任务已取消", "batch_id": batch_id, "completed_chapters": task.completed_chapters, "total_chapters": task.total_chapters } async def execute_batch_generation_in_order( batch_id: str, user_id: str, ai_service: AIService, custom_model: Optional[str] = None ): """ 按顺序执行批量生成任务(后台任务) - 严格按章节序号顺序 - 任一章节失败则终止后续生成 - 可选同步分析 """ db_session = None write_lock = await get_db_write_lock(user_id) try: logger.info(f"📦 开始执行顺序批量生成任务: {batch_id}") # 创建独立数据库会话 from app.database import get_engine from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession engine = await get_engine(user_id) AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) db_session = AsyncSessionLocal() # 获取任务 task_result = await db_session.execute( select(BatchGenerationTask).where(BatchGenerationTask.id == batch_id) ) task = task_result.scalar_one_or_none() if not task: logger.error(f"❌ 批量生成任务不存在: {batch_id}") return # 更新任务状态为运行中 async with write_lock: task.status = 'running' task.started_at = datetime.now() await db_session.commit() # 维护上一章的摘要,用于传递给下一章(防重复上下文) last_generated_summary = None # 按顺序生成每个章节 for idx, chapter_id in enumerate(task.chapter_ids, 1): # 检查任务是否被取消 await db_session.refresh(task) if task.status == 'cancelled': logger.info(f"🛑 批量生成任务已被取消: {batch_id}") return # 更新当前章节 async with write_lock: task.current_chapter_id = chapter_id task.current_retry_count = 0 # 重置重试计数 await db_session.commit() # 重试循环 retry_count = 0 chapter_success = False chapter = None last_error = None while retry_count <= task.max_retries and not chapter_success: try: # 获取章节信息 chapter_result = await db_session.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = chapter_result.scalar_one_or_none() if not chapter: raise Exception(f"章节 {chapter_id} 不存在") # 更新当前章节序号和重试次数 async with write_lock: task.current_chapter_number = chapter.chapter_number task.current_retry_count = retry_count await db_session.commit() if retry_count > 0: logger.info(f"🔄 [{idx}/{task.total_chapters}] 重试生成章节 (第{retry_count}次): 第{chapter.chapter_number}章 《{chapter.title}》") else: logger.info(f"📝 [{idx}/{task.total_chapters}] 开始生成章节: 第{chapter.chapter_number}章 《{chapter.title}》") # 检查前置条件(每次都检查,确保顺序性) can_generate, error_msg, _ = await check_prerequisites(db_session, chapter) if not can_generate: raise Exception(f"前置条件不满足: {error_msg}") # 生成章节内容(复用现有流式生成逻辑的核心部分),传递model参数 # 并获取生成后的摘要(如果生成函数支持返回) generated_summary = await generate_single_chapter_for_batch( db_session=db_session, chapter=chapter, user_id=user_id, style_id=task.style_id, target_word_count=task.target_word_count, ai_service=ai_service, write_lock=write_lock, custom_model=custom_model, previous_summary_context=last_generated_summary ) # 更新上一章摘要,供下一章使用 if generated_summary: last_generated_summary = f"第{chapter.chapter_number}章《{chapter.title}》:{generated_summary}" logger.info(f"📝 已更新上一章摘要上下文: {last_generated_summary[:50]}...") logger.info(f"✅ 章节生成完成: 第{chapter.chapter_number}章") # 如果启用同步分析 if task.enable_analysis: logger.info(f"🔍 开始同步分析章节: 第{chapter.chapter_number}章") # 分析重试机制(最多3次) analysis_retry_count = 0 analysis_success = False last_analysis_error = None while analysis_retry_count < 3 and not analysis_success: try: if analysis_retry_count > 0: logger.info(f"🔄 重试分析章节 (第{analysis_retry_count}次): 第{chapter.chapter_number}章") async with write_lock: analysis_task = AnalysisTask( chapter_id=chapter_id, user_id=user_id, project_id=task.project_id, status='pending', progress=0 ) db_session.add(analysis_task) await db_session.commit() await db_session.refresh(analysis_task) # 同步执行分析,直接使用返回值判断成功/失败 analysis_result = await analyze_chapter_background( chapter_id=chapter_id, user_id=user_id, project_id=task.project_id, task_id=analysis_task.id, ai_service=ai_service ) # 直接根据返回值判断 if not analysis_result: last_analysis_error = "分析函数返回失败" logger.error(f"❌ 章节分析失败: 第{chapter.chapter_number}章") raise Exception(f"章节分析失败") # 分析成功 analysis_success = True logger.info(f"✅ 章节分析成功: 第{chapter.chapter_number}章") except Exception as analysis_error: last_analysis_error = str(analysis_error) analysis_retry_count += 1 if analysis_retry_count < 3: # 还有重试机会,等待后重试 wait_time = min(2 ** analysis_retry_count, 10) logger.warning(f"⏳ 分析失败,等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) else: # 达到最大重试次数,必须终止整个批量任务 logger.error(f"❌ 章节分析失败,已达最大重试次数(3次): 第{chapter.chapter_number}章") # 记录失败信息 failed_info = { 'chapter_id': chapter_id, 'chapter_number': chapter.chapter_number, 'title': chapter.title, 'error': f"分析失败(重试3次): {last_analysis_error}", 'retry_count': 3 } async with write_lock: if task.failed_chapters is None: task.failed_chapters = [] task.failed_chapters.append(failed_info) # 标记任务失败并终止 task.status = 'failed' task.error_message = f"第{chapter.chapter_number}章分析失败(重试3次): {last_analysis_error}"[:500] task.completed_at = datetime.now() task.current_retry_count = 0 await db_session.commit() logger.error(f"🛑 批量生成中断: 第{chapter.chapter_number}章分析失败") return # 立即终止整个批量生成任务 # 标记成功 chapter_success = True # 更新完成数 async with write_lock: task.completed_chapters += 1 task.current_retry_count = 0 # 重置重试计数 await db_session.commit() logger.info(f"✅ 进度: {task.completed_chapters}/{task.total_chapters}") except Exception as e: last_error = str(e) error_msg = f"第{chapter.chapter_number if chapter else '?'}章出错: {last_error}" logger.error(f"❌ {error_msg}") retry_count += 1 # 如果还有重试机会,等待一小段时间后重试 if retry_count <= task.max_retries: wait_time = min(2 ** retry_count, 10) # 指数退避,最多等待10秒 logger.info(f"⏳ 等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) else: # 达到最大重试次数,记录失败信息 logger.error(f"❌ 章节生成失败,已达最大重试次数({task.max_retries}): 第{chapter.chapter_number if chapter else '?'}章") failed_info = { 'chapter_id': chapter_id, 'chapter_number': chapter.chapter_number if chapter else -1, 'title': chapter.title if chapter else '未知', 'error': last_error, 'retry_count': retry_count - 1 } async with write_lock: if task.failed_chapters is None: task.failed_chapters = [] task.failed_chapters.append(failed_info) # 标记任务失败并终止 task.status = 'failed' task.error_message = f"第{chapter.chapter_number}章生成失败(重试{retry_count-1}次): {last_error}"[:500] task.completed_at = datetime.now() task.current_retry_count = 0 await db_session.commit() # ⚠️ 如果启用了同步分析,任何错误都应该中断任务 # 因为章节生成或分析失败会影响后续章节的职业更新和剧情连贯性 if task.enable_analysis: logger.error(f"🛑 批量生成中断: 因启用同步分析,任何错误都会中断任务以确保职业信息和剧情连贯性") else: logger.error(f"🛑 批量生成终止于第{chapter.chapter_number}章") return # 全部完成 async with write_lock: task.status = 'completed' task.completed_at = datetime.now() task.current_chapter_id = None task.current_chapter_number = None await db_session.commit() logger.info(f"✅ 批量生成任务全部完成: {batch_id}, 成功生成 {task.completed_chapters} 章") except Exception as e: logger.error(f"❌ 批量生成任务异常: {str(e)}", exc_info=True) if db_session and task: try: async with write_lock: task.status = 'failed' task.error_message = str(e)[:500] task.completed_at = datetime.now() await db_session.commit() except Exception as commit_error: logger.error(f"❌ 更新任务失败状态失败: {str(commit_error)}") finally: if db_session: await db_session.close() async def generate_single_chapter_for_batch( db_session: AsyncSession, chapter: Chapter, user_id: str, style_id: Optional[int], target_word_count: int, ai_service: AIService, write_lock: Lock, custom_model: Optional[str] = None, previous_summary_context: Optional[str] = None ) -> Optional[str]: """ 为批量生成执行单个章节的生成(非流式) 复用现有生成逻辑的核心部分 Returns: 生成章节的摘要(前200字) """ # 获取项目信息 project_result = await db_session.execute( select(Project).where(Project.id == chapter.project_id) ) project = project_result.scalar_one_or_none() if not project: raise Exception("项目不存在") # 获取项目的大纲模式 outline_mode = project.outline_mode if project else 'one-to-many' logger.info(f"📋 批量生成 - 项目大纲模式: {outline_mode}") # 获取对应的大纲 outline_result = await db_session.execute( select(Outline) .where(Outline.project_id == chapter.project_id) .where(Outline.order_index == chapter.chapter_number) ) outline = outline_result.scalar_one_or_none() # 获取所有大纲用于上下文 all_outlines_result = await db_session.execute( select(Outline) .where(Outline.project_id == chapter.project_id) .order_by(Outline.order_index) ) all_outlines = all_outlines_result.scalars().all() outlines_context = "\n".join([ f"第{o.order_index}章 {o.title}: {o.content[:100]}..." for o in all_outlines ]) # 获取角色信息(包含职业信息) characters_result = await db_session.execute( select(Character).where(Character.project_id == chapter.project_id) ) characters = characters_result.scalars().all() # 📝 根据大纲模式智能筛选相关角色(批量生成) filter_character_names = None if outline_mode == 'one-to-one': # 1-1模式:从outline.structure中提取characters字段 if outline and outline.structure: try: structure = json.loads(outline.structure) filter_character_names = structure.get('characters', []) if filter_character_names: logger.info(f"📋 批量生成 - 1-1模式:从structure提取角色列表 {filter_character_names}") except json.JSONDecodeError: logger.warning(f"⚠️ 批量生成 - outline.structure解析失败,使用全部角色") else: # 1-n模式:从chapter.expansion_plan中提取character_focus字段 if chapter.expansion_plan: try: plan = json.loads(chapter.expansion_plan) filter_character_names = plan.get('character_focus', []) if filter_character_names: logger.info(f"📋 批量生成 - 1-n模式:从expansion_plan提取角色焦点 {filter_character_names}") except json.JSONDecodeError: logger.warning(f"⚠️ 批量生成 - expansion_plan解析失败,使用全部角色") characters_info = await build_characters_info_with_careers( db=db_session, project_id=chapter.project_id, characters=characters, filter_character_names=filter_character_names ) # 获取写作风格 style_content = "" if style_id: style_result = await db_session.execute( select(WritingStyle).where(WritingStyle.id == style_id) ) style = style_result.scalar_one_or_none() if style: if style.user_id is None or style.user_id == user_id: style_content = style.prompt_content or "" # 🚀 使用新的优化上下文构建器(含伏笔服务) logger.info(f"🔧 批量生成 - 使用优化的章节上下文构建器(V2 + 伏笔提醒)") context_builder = ChapterContextBuilder(foreshadow_service=foreshadow_service) chapter_context = await context_builder.build( chapter=chapter, project=project, outline=outline, user_id=user_id, db=db_session ) # 日志输出统计信息 logger.info(f"📊 批量生成 - 优化上下文统计:") logger.info(f" - 章节序号: {chapter.chapter_number}") logger.info(f" - 衔接锚点长度: {len(chapter_context.continuation_point or '')} 字符") logger.info(f" - 相关记忆: {chapter_context.context_stats.get('memory_count', 0)} 条") logger.info(f" - 总上下文长度: {chapter_context.context_stats.get('total_length', 0)} 字符") # 📋 根据大纲模式构建差异化的章节大纲上下文 chapter_outline_content = "" if outline_mode == 'one-to-one': # 一对一模式:使用大纲的 content chapter_outline_content = outline.content if outline else chapter.summary or '暂无大纲' logger.info(f"✏️ 批量生成 - 一对一模式:使用大纲内容") else: # 一对多模式:优先使用 expansion_plan 的详细规划 if chapter.expansion_plan: try: plan = json.loads(chapter.expansion_plan) chapter_outline_content = f"""【本章详细规划】 剧情摘要:{plan.get('plot_summary', '无')} 关键事件: {chr(10).join(f'- {event}' for event in plan.get('key_events', []))} 角色焦点:{', '.join(plan.get('character_focus', []))} 情感基调:{plan.get('emotional_tone', '未设定')} 叙事目标:{plan.get('narrative_goal', '未设定')} 冲突类型:{plan.get('conflict_type', '未设定')}""" # 可选:附加章节 summary if chapter.summary and chapter.summary.strip(): chapter_outline_content += f"\n\n【章节补充说明】\n{chapter.summary}" # 可选:附加大纲的背景信息(限制长度) if outline: outline_bg = outline.content if len(outline_bg) > 200: outline_bg = outline_bg[:200] + "..." chapter_outline_content += f"\n\n【大纲节点背景】\n{outline_bg}" logger.info(f"✏️ 批量生成 - 一对多模式:使用expansion_plan详细规划") except json.JSONDecodeError as e: logger.warning(f"⚠️ expansion_plan解析失败: {e},回退到大纲内容") chapter_outline_content = outline.content if outline else chapter.summary or '暂无大纲' else: # 没有expansion_plan,使用大纲内容 chapter_outline_content = outline.content if outline else chapter.summary or '暂无大纲' logger.warning(f"⚠️ 批量生成 - 一对多模式但无expansion_plan,使用大纲内容") # 🚀 使用 V2 优化模板构建提示词(批量生成) if chapter_context.continuation_point: # 有前置内容,使用 WITH_CONTEXT 模板 # 确定上一章摘要:优先使用传入的 previous_summary_context(批量生成的上一章), # 否则尝试从 chapter_context 中获取 final_prev_summary = "(无上一章摘要,请根据锚点续写)" if previous_summary_context: final_prev_summary = previous_summary_context elif hasattr(chapter_context, 'recent_summary') and chapter_context.recent_summary: lines = chapter_context.recent_summary.strip().split('\n') if lines: final_prev_summary = lines[-1] template = await PromptService.get_template("CHAPTER_GENERATION_V2_WITH_CONTEXT", user_id, db_session) base_prompt = PromptService.format_prompt( template, # P0 核心参数 project_title=project.title, chapter_number=chapter.chapter_number, chapter_title=chapter.title, chapter_outline=chapter_outline_content, target_word_count=target_word_count, continuation_point=chapter_context.continuation_point, # P1 重要参数 genre=project.genre or '未设定', narrative_perspective=project.narrative_perspective or '第三人称', characters_info=characters_info or '暂无角色信息', foreshadow_reminders=chapter_context.foreshadow_reminders or '暂无需要关注的伏笔', previous_chapter_summary=final_prev_summary, # P2 参考参数(动态裁剪后的) story_skeleton=chapter_context.story_skeleton or '', relevant_memories=chapter_context.relevant_memories or '' ) else: # 第一章,使用无前置内容模板 template = await PromptService.get_template("CHAPTER_GENERATION_V2", user_id, db_session) base_prompt = PromptService.format_prompt( template, # P0 核心参数 project_title=project.title, chapter_number=chapter.chapter_number, chapter_title=chapter.title, chapter_outline=chapter_outline_content, target_word_count=target_word_count, # P1 重要参数 genre=project.genre or '未设定', narrative_perspective=project.narrative_perspective or '第三人称', characters_info=characters_info or '暂无角色信息' ) # 应用写作风格 if style_content: prompt = WritingStyleManager.apply_style_to_prompt(base_prompt, style_content) else: prompt = base_prompt # 🎨 方案一:将写作风格注入到系统提示词(批量生成) system_prompt_with_style = None if style_content: system_prompt_with_style = f"""【🎨 写作风格要求 - 最高优先级】 {style_content} ⚠️ 请严格遵循上述写作风格要求进行创作,这是最重要的指令! 确保在整个章节创作过程中始终保持风格的一致性。""" logger.info(f"✅ 批量生成 - 已将写作风格注入系统提示词({len(style_content)}字符)") # 🔢 计算 max_tokens 限制(批量生成) # 中文字符约 1.5-2 个 token,使用 2.5 倍系数确保有足够空间完成段落 # 同时设置上限防止过长,下限确保基本可用 calculated_max_tokens = int(target_word_count * 3) calculated_max_tokens = max(2000, min(calculated_max_tokens, 16000)) # 限制在 2000-16000 之间 logger.info(f"📊 批量生成 - 目标字数: {target_word_count}, 计算 max_tokens: {calculated_max_tokens}") # 非流式生成内容 full_content = "" # 准备生成参数 generate_kwargs = { "prompt": prompt, "system_prompt": system_prompt_with_style, "tool_choice": "required", "max_tokens": calculated_max_tokens # 添加 max_tokens 限制 } # 如果传入了自定义模型,使用指定的模型 if custom_model: generate_kwargs["model"] = custom_model logger.info(f" 批量生成使用自定义模型: {custom_model}") # 批量生成中的流式生成(非SSE,不需要修改进度显示) async for chunk in ai_service.generate_text_stream(**generate_kwargs): full_content += chunk # 更新章节内容到数据库(使用锁保护) async with write_lock: old_word_count = chapter.word_count or 0 chapter.content = full_content new_word_count = len(full_content) chapter.word_count = new_word_count chapter.status = "completed" # 更新项目字数 project.current_words = project.current_words - old_word_count + new_word_count # 记录生成历史 history = GenerationHistory( project_id=chapter.project_id, chapter_id=chapter.id, prompt=f"批量生成: 第{chapter.chapter_number}章 {chapter.title}", generated_content=full_content[:500] if len(full_content) > 500 else full_content, model="default" ) db_session.add(history) await db_session.commit() await db_session.refresh(chapter) logger.info(f"✅ 单章节生成完成: 第{chapter.chapter_number}章,共 {new_word_count} 字") # 生成简短摘要返回 summary_preview = full_content[:300].replace('\n', ' ') if full_content else "" # 🔮 批量生成后自动标记计划在本章埋入的伏笔 try: async with write_lock: plant_result = await foreshadow_service.auto_plant_pending_foreshadows( db=db_session, project_id=chapter.project_id, chapter_id=chapter.id, chapter_number=chapter.chapter_number, chapter_content=full_content ) if plant_result.get('planted_count', 0) > 0: logger.info(f"🔮 批量生成 - 自动标记伏笔已埋入: {plant_result['planted_count']}个") except Exception as plant_error: logger.warning(f"⚠️ 批量生成 - 自动标记伏笔埋入失败: {str(plant_error)}") return summary_preview # ==================== 章节重新生成相关API ==================== @router.post("/{chapter_id}/regenerate-stream", summary="流式重新生成章节内容") async def regenerate_chapter_stream( chapter_id: str, request: Request, regenerate_request: ChapterRegenerateRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), user_ai_service: AIService = Depends(get_user_ai_service) ): """ 根据分析建议或自定义指令重新生成章节内容(流式返回) 工作流程: 1. 验证章节和分析结果 2. 创建重新生成任务 3. 构建修改指令 4. 流式生成新内容 5. 保存为版本历史 6. 可选自动应用 """ user_id = getattr(request.state, 'user_id', None) if not user_id: raise HTTPException(status_code=401, detail="未登录") # 验证章节存在 chapter_result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = chapter_result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") if not chapter.content or chapter.content.strip() == "": raise HTTPException(status_code=400, detail="章节内容为空,无法重新生成") # 验证用户权限 await verify_project_access(chapter.project_id, user_id, db) # 获取分析结果(如果使用分析建议) analysis = None if regenerate_request.modification_source in ['analysis_suggestions', 'mixed']: analysis_result = await db.execute( select(PlotAnalysis) .where(PlotAnalysis.chapter_id == chapter_id) .order_by(PlotAnalysis.created_at.desc()) .limit(1) ) analysis = analysis_result.scalar_one_or_none() if not analysis: raise HTTPException(status_code=404, detail="该章节暂无分析结果") # 预先获取项目上下文数据和写作风格 async for temp_db in get_db(request): try: # 获取项目信息 project_result = await temp_db.execute( select(Project).where(Project.id == chapter.project_id) ) project = project_result.scalar_one_or_none() # 获取角色信息(包含职业信息) characters_result = await temp_db.execute( select(Character).where(Character.project_id == chapter.project_id) ) characters = characters_result.scalars().all() # 📝 根据大纲模式智能筛选相关角色(重新生成) outline_mode_result = await temp_db.execute( select(Project.outline_mode).where(Project.id == chapter.project_id) ) outline_mode = outline_mode_result.scalar_one_or_none() or 'one-to-many' filter_character_names = None if outline_mode == 'one-to-one': # 1-1模式:从outline.structure中提取characters字段 outline_result_temp = await temp_db.execute( select(Outline.structure) .where(Outline.project_id == chapter.project_id) .where(Outline.order_index == chapter.chapter_number) ) outline_structure = outline_result_temp.scalar_one_or_none() if outline_structure: try: structure = json.loads(outline_structure) filter_character_names = structure.get('characters', []) if filter_character_names: logger.info(f"📋 重新生成 - 1-1模式:从structure提取角色列表 {filter_character_names}") except json.JSONDecodeError: logger.warning(f"⚠️ 重新生成 - outline.structure解析失败,使用全部角色") else: # 1-n模式:从chapter.expansion_plan中提取character_focus字段 if chapter.expansion_plan: try: plan = json.loads(chapter.expansion_plan) filter_character_names = plan.get('character_focus', []) if filter_character_names: logger.info(f"📋 重新生成 - 1-n模式:从expansion_plan提取角色焦点 {filter_character_names}") except json.JSONDecodeError: logger.warning(f"⚠️ 重新生成 - expansion_plan解析失败,使用全部角色") characters_info_with_careers = await build_characters_info_with_careers( db=temp_db, project_id=chapter.project_id, characters=characters, filter_character_names=filter_character_names ) # 获取章节大纲 outline_result = await temp_db.execute( select(Outline) .where(Outline.project_id == chapter.project_id) .where(Outline.order_index == chapter.chapter_number) ) outline = outline_result.scalar_one_or_none() # 获取写作风格 style_content = "" style_id = regenerate_request.style_id # 如果没有指定风格,尝试使用项目的默认风格 if not style_id: from app.models.project_default_style import ProjectDefaultStyle default_style_result = await temp_db.execute( select(ProjectDefaultStyle.style_id) .where(ProjectDefaultStyle.project_id == chapter.project_id) ) default_style_id = default_style_result.scalar_one_or_none() if default_style_id: style_id = default_style_id logger.info(f"📝 使用项目默认写作风格: {style_id}") # 获取风格内容 if style_id: style_result = await temp_db.execute( select(WritingStyle).where(WritingStyle.id == style_id) ) style = style_result.scalar_one_or_none() if style: # 验证风格是否可用:全局预设风格(user_id为NULL)或者当前用户的自定义风格 if style.user_id is None or style.user_id == user_id: style_content = style.prompt_content or "" style_type = "全局预设" if style.user_id is None else "用户自定义" logger.info(f"✅ 使用写作风格: {style.name} ({style_type})") else: logger.warning(f"⚠️ 风格 {style_id} 不属于当前项目,跳过") else: logger.warning(f"⚠️ 未找到风格 {style_id}") else: logger.info("ℹ️ 未指定写作风格,使用默认提示词") # 构建项目上下文 project_context = { 'project_title': project.title if project else '未知', 'genre': project.genre if project else '未设定', 'theme': project.theme if project else '未设定', 'narrative_perspective': project.narrative_perspective if project else '第三人称', 'time_period': project.world_time_period if project else '未设定', 'location': project.world_location if project else '未设定', 'atmosphere': project.world_atmosphere if project else '未设定', 'characters_info': characters_info_with_careers, 'chapter_outline': outline.content if outline else chapter.summary or '暂无大纲', 'previous_context': '' # 可以后续扩展添加前置章节上下文 } finally: await temp_db.close() break async def event_generator(): """流式生成事件生成器""" db_session = None db_committed = False # 初始化标准进度追踪器 from app.utils.sse_response import WizardProgressTracker tracker = WizardProgressTracker("章节重新生成") try: yield await tracker.start() # 创建独立数据库会话 async for db_session in get_db(request): yield await tracker.loading("加载章节信息...", 0.5) # 创建重新生成任务 regen_task = RegenerationTask( chapter_id=chapter_id, analysis_id=analysis.id if analysis else None, user_id=user_id, project_id=chapter.project_id, modification_instructions="", # 稍后填充 original_suggestions=analysis.suggestions if analysis else None, selected_suggestion_indices=regenerate_request.selected_suggestion_indices, custom_instructions=regenerate_request.custom_instructions, style_id=regenerate_request.style_id, target_word_count=regenerate_request.target_word_count, focus_areas=regenerate_request.focus_areas, preserve_elements=regenerate_request.preserve_elements.model_dump() if regenerate_request.preserve_elements else None, status='running', original_content=chapter.content, original_word_count=chapter.word_count or len(chapter.content), version_note=regenerate_request.version_note, started_at=datetime.now() ) db_session.add(regen_task) await db_session.commit() await db_session.refresh(regen_task) task_id = regen_task.id logger.info(f"📝 创建重新生成任务: {task_id}") yield await tracker.preparing("准备重新生成...") yield await SSEResponse.send_event( event='task_created', data={'task_id': task_id} ) # 初始化重新生成器 regenerator = ChapterRegenerator(user_ai_service) # === 生成阶段 === full_content = "" estimated_total = regenerate_request.target_word_count or len(chapter.content) yield await tracker.generating( current_chars=0, estimated_total=estimated_total ) async for event in regenerator.regenerate_with_feedback( chapter=chapter, analysis=analysis, regenerate_request=regenerate_request, project_context=project_context, style_content=style_content, user_id=user_id, db=db_session ): # 处理不同类型的事件 if event['type'] == 'chunk': # 内容块 chunk = event['content'] full_content += chunk yield await tracker.generating_chunk(chunk) # 定期更新进度 if len(full_content) % 500 == 0: yield await tracker.generating( current_chars=len(full_content), estimated_total=estimated_total, message=f'重新生成中... 已生成 {len(full_content)} 字' ) elif event['type'] == 'progress': # 进度更新 - 映射到对应阶段 progress = event.get('progress', 0) message = event.get('message', '') if progress < 20: yield await tracker.preparing(message) elif progress < 85: yield await tracker.generating( current_chars=len(full_content), estimated_total=estimated_total, message=message ) else: yield await tracker.parsing(message) await asyncio.sleep(0) # === 保存阶段 === yield await tracker.saving("保存重新生成的内容...", 0.5) # 更新任务状态 regen_task.status = 'completed' regen_task.regenerated_content = full_content regen_task.regenerated_word_count = len(full_content) regen_task.completed_at = datetime.now() # 计算差异统计 diff_stats = regenerator.calculate_content_diff(chapter.content, full_content) await db_session.commit() db_committed = True yield await tracker.saving("保存完成", 0.9) # === 完成阶段 === yield await tracker.complete("重新生成完成!") # 发送结果数据 yield await tracker.result({ 'task_id': task_id, 'word_count': len(full_content), 'version_number': regen_task.version_number, 'auto_applied': regenerate_request.auto_apply, 'diff_stats': diff_stats }) # 发送完成信号 yield await tracker.done() logger.info(f"✅ 章节重新生成完成: {chapter_id}, 任务: {task_id}") break except Exception as e: logger.error(f"❌ 重新生成失败: {str(e)}", exc_info=True) # 更新任务状态为失败 if db_session and not db_committed: try: task_result = await db_session.execute( select(RegenerationTask).where(RegenerationTask.chapter_id == chapter_id) .order_by(RegenerationTask.created_at.desc()).limit(1) ) task = task_result.scalar_one_or_none() if task: task.status = 'failed' task.error_message = str(e)[:500] task.completed_at = datetime.now() await db_session.commit() except Exception as update_error: logger.error(f"更新任务失败状态失败: {str(update_error)}") yield await tracker.error(str(e)) finally: if db_session: try: if not db_committed and db_session.in_transaction(): await db_session.rollback() await db_session.close() except Exception as close_error: logger.error(f"关闭数据库会话失败: {str(close_error)}") return create_sse_response(event_generator()) @router.get("/{chapter_id}/regeneration/tasks", summary="获取章节的重新生成任务列表") async def get_regeneration_tasks( chapter_id: str, request: Request, limit: int = Query(10, ge=1, le=50), db: AsyncSession = Depends(get_db) ): """获取指定章节的重新生成任务历史""" user_id = getattr(request.state, 'user_id', None) # 验证章节存在和权限 chapter_result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = chapter_result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") await verify_project_access(chapter.project_id, user_id, db) # 获取任务列表 result = await db.execute( select(RegenerationTask) .where(RegenerationTask.chapter_id == chapter_id) .order_by(RegenerationTask.created_at.desc()) .limit(limit) ) tasks = result.scalars().all() return { "chapter_id": chapter_id, "total": len(tasks), "tasks": [ { "task_id": task.id, "status": task.status, "version_number": task.version_number, "version_note": task.version_note, "original_word_count": task.original_word_count, "regenerated_word_count": task.regenerated_word_count, "created_at": task.created_at.isoformat() if task.created_at else None, "completed_at": task.completed_at.isoformat() if task.completed_at else None } for task in tasks ] } @router.put("/{chapter_id}/expansion-plan", response_model=dict, summary="更新章节规划信息") async def update_chapter_expansion_plan( chapter_id: str, expansion_plan: ExpansionPlanUpdate, request: Request, db: AsyncSession = Depends(get_db) ): """ 更新章节的展开规划信息和情节概要 Args: chapter_id: 章节ID expansion_plan: 规划信息更新数据(包含summary和expansion_plan字段) Returns: 更新后的章节规划信息 """ # 获取章节 result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") # 验证用户权限 user_id = getattr(request.state, 'user_id', None) await verify_project_access(chapter.project_id, user_id, db) # 准备更新数据(排除None值) plan_data = expansion_plan.model_dump(exclude_unset=True, exclude_none=True) # 分离summary和expansion_plan数据 summary_value = plan_data.pop('summary', None) # 更新summary字段(如果提供) if summary_value is not None: chapter.summary = summary_value logger.info(f"更新章节概要: {chapter_id}") # 更新expansion_plan字段(如果有其他字段) if plan_data: if chapter.expansion_plan: try: existing_plan = json.loads(chapter.expansion_plan) # 合并更新 existing_plan.update(plan_data) chapter.expansion_plan = json.dumps(existing_plan, ensure_ascii=False) except json.JSONDecodeError: logger.warning(f"章节 {chapter_id} 的expansion_plan格式错误,将覆盖") chapter.expansion_plan = json.dumps(plan_data, ensure_ascii=False) else: chapter.expansion_plan = json.dumps(plan_data, ensure_ascii=False) await db.commit() await db.refresh(chapter) logger.info(f"章节规划更新成功: {chapter_id}") # 返回更新后的规划数据 updated_plan = json.loads(chapter.expansion_plan) if chapter.expansion_plan else None return { "id": chapter.id, "summary": chapter.summary, "expansion_plan": updated_plan, "message": "规划信息更新成功" } # ==================== 局部重写相关API ==================== @router.post("/{chapter_id}/partial-regenerate-stream", summary="流式局部重写选中内容") async def partial_regenerate_stream( chapter_id: str, request: Request, partial_request: PartialRegenerateRequest, db: AsyncSession = Depends(get_db), user_ai_service: AIService = Depends(get_user_ai_service) ): """ 对章节中选中的部分内容进行流式重写 工作流程: 1. 验证章节和选中内容的有效性 2. 截取上下文(前后文) 3. 根据用户要求构建提示词 4. 流式生成重写内容 5. 返回重写结果(不自动保存,由前端决定是否应用) """ user_id = getattr(request.state, 'user_id', None) if not user_id: raise HTTPException(status_code=401, detail="未登录") # 验证章节存在 chapter_result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = chapter_result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") if not chapter.content or chapter.content.strip() == "": raise HTTPException(status_code=400, detail="章节内容为空") # 验证用户权限 await verify_project_access(chapter.project_id, user_id, db) # 验证位置参数 content_length = len(chapter.content) if partial_request.start_position >= content_length: raise HTTPException(status_code=400, detail="起始位置超出内容范围") if partial_request.end_position > content_length: raise HTTPException(status_code=400, detail="结束位置超出内容范围") if partial_request.start_position >= partial_request.end_position: raise HTTPException(status_code=400, detail="起始位置必须小于结束位置") # 验证选中的文本是否匹配 actual_selected = chapter.content[partial_request.start_position:partial_request.end_position] if actual_selected != partial_request.selected_text: # 位置可能有偏差,尝试在附近查找 search_start = max(0, partial_request.start_position - 50) search_end = min(content_length, partial_request.end_position + 50) search_area = chapter.content[search_start:search_end] if partial_request.selected_text in search_area: # 找到了,更新位置 offset = search_area.find(partial_request.selected_text) partial_request.start_position = search_start + offset partial_request.end_position = partial_request.start_position + len(partial_request.selected_text) logger.info(f"⚠️ 选中文本位置校正: {partial_request.start_position}-{partial_request.end_position}") else: raise HTTPException( status_code=400, detail="选中的文本与章节内容不匹配,请刷新页面后重试" ) # 预先获取项目信息和写作风格 project_result = await db.execute( select(Project).where(Project.id == chapter.project_id) ) project = project_result.scalar_one_or_none() # 获取写作风格 style_content = "" style_id = partial_request.style_id # 如果没有指定风格,尝试使用项目的默认风格 if not style_id: from app.models.project_default_style import ProjectDefaultStyle default_style_result = await db.execute( select(ProjectDefaultStyle.style_id) .where(ProjectDefaultStyle.project_id == chapter.project_id) ) default_style_id = default_style_result.scalar_one_or_none() if default_style_id: style_id = default_style_id logger.info(f"📝 局部重写 - 使用项目默认写作风格: {style_id}") # 获取风格内容 if style_id: style_result = await db.execute( select(WritingStyle).where(WritingStyle.id == style_id) ) style = style_result.scalar_one_or_none() if style: if style.user_id is None or style.user_id == user_id: style_content = style.prompt_content or "" style_type = "全局预设" if style.user_id is None else "用户自定义" logger.info(f"✅ 局部重写 - 使用写作风格: {style.name} ({style_type})") else: logger.warning(f"⚠️ 风格 {style_id} 不属于当前用户,跳过") async def event_generator(): """流式生成事件生成器""" from app.utils.sse_response import WizardProgressTracker tracker = WizardProgressTracker("局部重写") try: yield await tracker.start() yield await tracker.loading("准备重写上下文...", 0.3) # 截取上下文 context_chars = partial_request.context_chars start_pos = partial_request.start_position end_pos = partial_request.end_position # 前文:从start_pos往前截取context_chars个字符 context_before_start = max(0, start_pos - context_chars) context_before = chapter.content[context_before_start:start_pos] # 后文:从end_pos往后截取context_chars个字符 context_after_end = min(content_length, end_pos + context_chars) context_after = chapter.content[end_pos:context_after_end] # 原文 original_text = partial_request.selected_text original_word_count = len(original_text) logger.info(f"📝 局部重写 - 原文: {original_word_count}字, 前文: {len(context_before)}字, 后文: {len(context_after)}字") yield await tracker.loading("构建提示词...", 0.5) # 构建字数要求 length_requirement = "" if partial_request.length_mode == "similar": min_words = int(original_word_count * 0.8) max_words = int(original_word_count * 1.2) length_requirement = f"保持与原文相近的字数(约{original_word_count}字,允许{min_words}-{max_words}字浮动)" elif partial_request.length_mode == "expand": min_words = int(original_word_count * 1.2) max_words = int(original_word_count * 2.0) length_requirement = f"适当扩展内容(目标{min_words}-{max_words}字)" elif partial_request.length_mode == "condense": min_words = int(original_word_count * 0.5) max_words = int(original_word_count * 0.8) length_requirement = f"精简压缩内容(目标{min_words}-{max_words}字)" elif partial_request.length_mode == "custom" and partial_request.target_word_count: length_requirement = f"目标字数:约{partial_request.target_word_count}字(允许±20%浮动)" else: length_requirement = f"保持与原文相近的字数(约{original_word_count}字)" # 获取提示词模板 template = await PromptService.get_template("PARTIAL_REGENERATE", user_id, db) if not template: template = PromptService.PARTIAL_REGENERATE # 构建提示词 prompt = PromptService.format_prompt( template, context_before=context_before if context_before else "(这是章节开头)", original_word_count=original_word_count, selected_text=original_text, context_after=context_after if context_after else "(这是章节结尾)", user_instructions=partial_request.user_instructions, length_requirement=length_requirement, style_content=style_content if style_content else "保持与原文一致的叙事风格" ) yield await tracker.preparing("开始生成...") # 计算 max_tokens if partial_request.length_mode == "expand": target_words = int(original_word_count * 2.0) elif partial_request.length_mode == "custom" and partial_request.target_word_count: target_words = partial_request.target_word_count else: target_words = int(original_word_count * 1.5) calculated_max_tokens = max(500, min(int(target_words * 3), 8000)) # 流式生成 full_content = "" chunk_count = 0 yield await tracker.generating( current_chars=0, estimated_total=target_words ) async for chunk in user_ai_service.generate_text_stream( prompt=prompt, max_tokens=calculated_max_tokens ): full_content += chunk chunk_count += 1 # 发送内容块 yield await tracker.generating_chunk(chunk) # 每5个chunk发送一次进度更新 if chunk_count % 5 == 0: yield await tracker.generating( current_chars=len(full_content), estimated_total=target_words, message=f'正在重写中... 已生成 {len(full_content)} 字' ) await asyncio.sleep(0) # 清理输出(移除可能的前后缀) full_content = full_content.strip() # 移除常见的AI输出前缀 prefixes_to_remove = [ "重写后:", "重写后:", "改写后:", "改写后:", "以下是重写后的内容:", "以下是重写后的内容:", "重写内容:", "重写内容:" ] for prefix in prefixes_to_remove: if full_content.startswith(prefix): full_content = full_content[len(prefix):].strip() break # 移除首尾可能的引号 if (full_content.startswith('"') and full_content.endswith('"')) or \ (full_content.startswith("'") and full_content.endswith("'")): full_content = full_content[1:-1] if (full_content.startswith('「') and full_content.endswith('」')) or \ (full_content.startswith('『') and full_content.endswith('』')): full_content = full_content[1:-1] new_word_count = len(full_content) logger.info(f"✅ 局部重写完成: 原文{original_word_count}字 -> 新文{new_word_count}字") # 完成 yield await tracker.complete("重写完成!") # 发送结果数据 yield await tracker.result({ 'new_text': full_content, 'word_count': new_word_count, 'original_word_count': original_word_count, 'start_position': partial_request.start_position, 'end_position': partial_request.end_position }) yield await tracker.done() except Exception as e: logger.error(f"❌ 局部重写失败: {str(e)}", exc_info=True) yield await tracker.error(str(e)) return create_sse_response(event_generator()) @router.post("/{chapter_id}/apply-partial-regenerate", summary="应用局部重写结果") async def apply_partial_regenerate( chapter_id: str, request: Request, apply_request: dict, db: AsyncSession = Depends(get_db) ): """ 将局部重写的结果应用到章节内容中 请求体: - new_text: 重写后的新内容 - start_position: 原文起始位置 - end_position: 原文结束位置 """ user_id = getattr(request.state, 'user_id', None) if not user_id: raise HTTPException(status_code=401, detail="未登录") # 验证章节存在 chapter_result = await db.execute( select(Chapter).where(Chapter.id == chapter_id) ) chapter = chapter_result.scalar_one_or_none() if not chapter: raise HTTPException(status_code=404, detail="章节不存在") # 验证用户权限 await verify_project_access(chapter.project_id, user_id, db) # 获取参数 new_text = apply_request.get('new_text', '') start_position = apply_request.get('start_position', 0) end_position = apply_request.get('end_position', 0) if not new_text: raise HTTPException(status_code=400, detail="新内容不能为空") # 验证位置有效性 content_length = len(chapter.content) if start_position < 0 or end_position > content_length or start_position >= end_position: raise HTTPException(status_code=400, detail="位置参数无效") # 构建新内容 old_word_count = chapter.word_count or 0 new_content = chapter.content[:start_position] + new_text + chapter.content[end_position:] new_word_count = len(new_content) # 更新章节 chapter.content = new_content chapter.word_count = new_word_count # 更新项目字数 project_result = await db.execute( select(Project).where(Project.id == chapter.project_id) ) project = project_result.scalar_one_or_none() if project: project.current_words = project.current_words - old_word_count + new_word_count await db.commit() await db.refresh(chapter) logger.info(f"✅ 局部重写已应用: 章节{chapter_id}, {old_word_count}字 -> {new_word_count}字") return { "success": True, "chapter_id": chapter_id, "word_count": new_word_count, "old_word_count": old_word_count, "message": "局部重写已应用" }