feat: 后台任务系统 + JSON容错解析 + SSE心跳保活 + 多项Bug修复

新功能:
- 大纲/章节生成改为服务端后台任务,支持断线续传
- 后台任务队列排队执行,按用户排队(同用户串行不同用户并发)
- 章节管理页面添加后台任务列表弹窗和进度面板
- 章节状态添加 pending(待处理)选项
- 集成json5容错解析器 + 上下文感知JSON修复
- SSE流式生成添加心跳保活,防止连接超时
- SSEPostClient添加credentials:include修复network error
- 每章最大伏笔数从2调整为5
- 添加大纲读区伏笔的功能

Bug修复:
- 修复AI生成JSON中未转义引号/中文标点/多对象属性值未合并
- 修复JSON非法转义字符清洗和中文引号处理
- 修复MCP插件TimeoutError/连接失败上下文清理
- MCP插件后台注册添加重试机制
- 续写模式添加缺失的mcp_references参数
- 修复Alembic迁移链分叉
- 使用torch CPU版本加速Docker构建
This commit is contained in:
未来
2026-04-29 08:31:07 +08:00
parent 1f80a58994
commit 2bd8b61e91
20 changed files with 2873 additions and 151 deletions
+445
View File
@@ -27,6 +27,7 @@ from app.models.analysis_task import AnalysisTask
from app.models.memory import PlotAnalysis, StoryMemory
from app.models.batch_generation_task import BatchGenerationTask
from app.models.regeneration_task import RegenerationTask
from app.models.background_task import BackgroundTask
from app.schemas.chapter import (
ChapterCreate,
ChapterUpdate,
@@ -1815,6 +1816,450 @@ async def generate_chapter_content_stream(
return create_sse_response(event_generator())
@router.post("/{chapter_id}/generate-background", summary="AI创作章节内容(后台任务)")
async def generate_chapter_content_background(
chapter_id: str,
request: Request,
generate_request: ChapterGenerateRequest = ChapterGenerateRequest(),
db: AsyncSession = Depends(get_db)
):
"""
创建后台任务来生成章节内容。
任务创建后立即返回task_id,前端通过 GET /api/tasks/{task_id} 轮询进度。
关闭浏览器不影响生成,生成完成后内容自动保存到数据库。
"""
user_id = getattr(request.state, 'user_id', None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
# 验证章节存在
result = await db.execute(
select(Chapter).where(Chapter.id == chapter_id)
)
chapter = result.scalar_one_or_none()
if not chapter:
raise HTTPException(status_code=404, detail="章节不存在")
# 验证项目权限
project = await verify_project_access(chapter.project_id, user_id, db)
# 检查前置条件
can_generate, error_msg, _ = await check_prerequisites(db, chapter)
if not can_generate:
raise HTTPException(status_code=400, detail=error_msg)
# 创建后台任务
from app.services.background_task_service import background_task_service, TaskProgressTracker
task = await background_task_service.create_task(
user_id=user_id,
project_id=chapter.project_id,
task_type="chapter_generate",
task_input={
"chapter_id": chapter_id,
"style_id": generate_request.style_id,
"target_word_count": generate_request.target_word_count or 3000,
"enable_mcp": generate_request.enable_mcp,
"model": generate_request.model,
"narrative_perspective": generate_request.narrative_perspective,
},
db=db
)
# 后台执行的函数
async def _run_chapter_generation(task_id: str, bg_user_id: str):
from app.database import get_engine
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession as BgAsyncSession
engine = await get_engine(bg_user_id)
AsyncSessionLocal = async_sessionmaker(engine, class_=BgAsyncSession, expire_on_commit=False)
async with AsyncSessionLocal() as bg_db:
tracker = TaskProgressTracker(task_id, bg_user_id, "章节")
try:
await tracker.start()
# 获取AI服务
from app.api.settings import get_user_ai_service_from_db
bg_ai_service = await get_user_ai_service_from_db(bg_user_id, bg_db)
await _run_chapter_generation_bg(
task_input={
"chapter_id": chapter_id,
"style_id": generate_request.style_id,
"target_word_count": generate_request.target_word_count or 3000,
"enable_mcp": generate_request.enable_mcp,
"model": generate_request.model,
"narrative_perspective": generate_request.narrative_perspective,
},
db=bg_db,
ai_service=bg_ai_service,
tracker=tracker,
user_id=bg_user_id,
task_id=task_id,
)
except Exception as e:
logger.error(f"❌ 后台章节生成失败: {e}", exc_info=True)
await tracker.error(str(e))
await background_task_service.spawn_background_task(
task.id, user_id, _run_chapter_generation
)
return {
"task_id": task.id,
"task_type": "chapter_generate",
"status": "pending",
"message": "任务已创建,请通过 GET /api/tasks/{task_id} 查询进度"
}
async def _run_chapter_generation_bg(
task_input: dict,
db: AsyncSession,
ai_service: AIService,
tracker,
user_id: str,
task_id: str,
):
"""后台执行章节生成(不使用SSE,直接生成并保存)"""
from app.services.chapter_context_service import (
OneToManyContextBuilder,
OneToOneContextBuilder
)
chapter_id = task_input["chapter_id"]
style_id = task_input.get("style_id")
target_word_count = task_input.get("target_word_count", 3000)
custom_model = task_input.get("model")
temp_narrative_perspective = task_input.get("narrative_perspective")
write_lock = await get_db_write_lock(user_id)
# === 加载阶段 ===
await tracker.loading("加载章节信息...", 0.2)
chapter_result = await db.execute(
select(Chapter).where(Chapter.id == chapter_id)
)
current_chapter = chapter_result.scalar_one_or_none()
if not current_chapter:
await tracker.error("章节不存在")
return
await tracker.loading("加载项目信息...", 0.4)
project_result = await db.execute(
select(Project).where(Project.id == current_chapter.project_id)
)
project = project_result.scalar_one_or_none()
if not project:
await tracker.error("项目不存在")
return
outline_mode = project.outline_mode if project else 'one-to-many'
# 获取大纲
if current_chapter.outline_id:
outline_result = await db.execute(
select(Outline).where(Outline.id == current_chapter.outline_id)
)
else:
outline_result = await db.execute(
select(Outline)
.where(Outline.project_id == current_chapter.project_id)
.where(Outline.order_index == current_chapter.chapter_number)
)
outline = outline_result.scalar_one_or_none()
# 获取写作风格
style_content = ""
if style_id:
style_result = await db.execute(
select(WritingStyle).where(WritingStyle.id == style_id)
)
style = style_result.scalar_one_or_none()
if style and (style.user_id is None or style.user_id == user_id):
style_content = style.prompt_content or ""
# === 构建上下文 ===
if outline_mode == 'one-to-one':
context_builder = OneToOneContextBuilder(
memory_service=memory_service,
foreshadow_service=foreshadow_service
)
chapter_context = await context_builder.build(
chapter=current_chapter,
project=project,
outline=outline,
user_id=user_id,
db=db,
target_word_count=target_word_count
)
else:
context_builder = OneToManyContextBuilder(
memory_service=memory_service,
foreshadow_service=foreshadow_service
)
chapter_context = await context_builder.build(
chapter=current_chapter,
project=project,
outline=outline,
user_id=user_id,
db=db,
style_content=style_content,
target_word_count=target_word_count,
temp_narrative_perspective=temp_narrative_perspective
)
await tracker.loading("上下文构建完成", 0.8)
# 确定叙事人称
chapter_perspective = (
temp_narrative_perspective or
project.narrative_perspective or
'第三人称'
)
# === 准备提示词 ===
if outline_mode == 'one-to-one':
if chapter_context.continuation_point:
template = await PromptService.get_template("CHAPTER_GENERATION_ONE_TO_ONE_NEXT", user_id, db)
base_prompt = PromptService.format_prompt(
template,
project_title=project.title,
chapter_number=current_chapter.chapter_number,
chapter_title=current_chapter.title,
chapter_outline=chapter_context.chapter_outline,
target_word_count=target_word_count,
genre=project.genre or '未设定',
narrative_perspective=chapter_perspective,
previous_chapter_content=chapter_context.continuation_point,
previous_chapter_summary=chapter_context.previous_chapter_summary or '(无上一章摘要)',
characters_info=chapter_context.chapter_characters or '暂无角色信息',
chapter_careers=chapter_context.chapter_careers or '暂无职业信息',
foreshadow_reminders=chapter_context.foreshadow_reminders or '暂无需要关注的伏笔',
relevant_memories=chapter_context.relevant_memories or '暂无相关记忆'
)
else:
template = await PromptService.get_template("CHAPTER_GENERATION_ONE_TO_ONE", user_id, db)
base_prompt = PromptService.format_prompt(
template,
project_title=project.title,
chapter_number=current_chapter.chapter_number,
chapter_title=current_chapter.title,
chapter_outline=chapter_context.chapter_outline,
target_word_count=target_word_count,
genre=project.genre or '未设定',
narrative_perspective=chapter_perspective,
characters_info=chapter_context.chapter_characters or '暂无角色信息',
chapter_careers=chapter_context.chapter_careers or '暂无职业信息',
foreshadow_reminders=chapter_context.foreshadow_reminders or '暂无需要关注的伏笔',
relevant_memories=chapter_context.relevant_memories or '暂无相关记忆'
)
else:
if chapter_context.continuation_point:
previous_summary = chapter_context.previous_chapter_summary or "(无上一章摘要,请根据锚点续写)"
template = await PromptService.get_template("CHAPTER_GENERATION_ONE_TO_MANY_NEXT", user_id, db)
base_prompt = PromptService.format_prompt(
template,
project_title=project.title,
chapter_number=current_chapter.chapter_number,
chapter_title=current_chapter.title,
chapter_outline=chapter_context.chapter_outline,
target_word_count=target_word_count,
continuation_point=chapter_context.continuation_point,
genre=project.genre or '未设定',
narrative_perspective=chapter_perspective,
characters_info=chapter_context.chapter_characters or '暂无角色信息',
chapter_careers=chapter_context.chapter_careers or '暂无职业信息',
foreshadow_reminders=chapter_context.foreshadow_reminders or '暂无需要关注的伏笔',
previous_chapter_summary=previous_summary,
recent_chapters_context=chapter_context.recent_chapters_context or '',
relevant_memories=chapter_context.relevant_memories or ''
)
else:
template = await PromptService.get_template("CHAPTER_GENERATION_ONE_TO_MANY", user_id, db)
base_prompt = PromptService.format_prompt(
template,
project_title=project.title,
chapter_number=current_chapter.chapter_number,
chapter_title=current_chapter.title,
chapter_outline=chapter_context.chapter_outline,
target_word_count=target_word_count,
genre=project.genre or '未设定',
narrative_perspective=chapter_perspective,
characters_info=chapter_context.chapter_characters or '暂无角色信息',
chapter_careers=chapter_context.chapter_careers or '暂无职业信息',
foreshadow_reminders=chapter_context.foreshadow_reminders or '暂无需要关注的伏笔',
relevant_memories=chapter_context.relevant_memories or '暂无相关记忆'
)
# 应用写作风格
if style_content:
prompt = WritingStyleManager.apply_style_to_prompt(base_prompt, style_content)
else:
prompt = base_prompt
# === 准备阶段 ===
await tracker.preparing("准备AI提示词...")
system_prompt_with_style = None
if style_content:
system_prompt_with_style = f"""【🎨 写作风格要求 - 最高优先级】
{style_content}
⚠️ 请严格遵循上述写作风格要求进行创作,这是最重要的指令!
确保在整个章节创作过程中始终保持风格的一致性。"""
calculated_max_tokens = int(target_word_count * 3)
calculated_max_tokens = max(2000, min(calculated_max_tokens, 16000))
generate_kwargs = {
"prompt": prompt,
"system_prompt": system_prompt_with_style,
"tool_choice": "required",
"max_tokens": calculated_max_tokens
}
if custom_model:
generate_kwargs["model"] = custom_model
# === 生成阶段 ===
full_content = ""
chunk_count = 0
await tracker.generating(
current_chars=0,
estimated_total=target_word_count
)
async for chunk in ai_service.generate_text_stream(**generate_kwargs):
# 检查是否被取消
if chunk_count % 10 == 0 and await tracker.check_cancelled():
logger.info(f"🚫 后台章节生成被取消: {chapter_id}")
return
full_content += chunk
chunk_count += 1
# 每10个chunk更新一次进度
if chunk_count % 10 == 0:
await tracker.generating(
current_chars=len(full_content),
estimated_total=target_word_count,
message=f'正在创作中... 已生成 {len(full_content)}'
)
await asyncio.sleep(0)
# === 保存阶段 ===
await tracker.saving("正在保存章节...", 0.3)
async with write_lock:
# 重新获取章节(确保最新状态)
chapter_result = await db.execute(
select(Chapter).where(Chapter.id == chapter_id)
)
current_chapter = chapter_result.scalar_one_or_none()
if not current_chapter:
await tracker.error("保存时章节不存在")
return
old_word_count = current_chapter.word_count or 0
current_chapter.content = full_content
new_word_count = len(full_content)
current_chapter.word_count = new_word_count
current_chapter.status = "completed"
# 更新项目字数
project_result = await db.execute(
select(Project).where(Project.id == current_chapter.project_id)
)
project_obj = project_result.scalar_one_or_none()
if project_obj:
project_obj.current_words = (project_obj.current_words or 0) - old_word_count + new_word_count
# 记录生成历史
history = GenerationHistory(
project_id=current_chapter.project_id,
chapter_id=current_chapter.id,
prompt=f"创作章节: 第{current_chapter.chapter_number}{current_chapter.title}",
generated_content=full_content[:500] if len(full_content) > 500 else full_content,
model="default"
)
db.add(history)
await db.commit()
logger.info(f"✅ 后台创作章节 {chapter_id} 完成,共 {new_word_count}")
# 🔮 自动标记伏笔
try:
plant_result = await foreshadow_service.auto_plant_pending_foreshadows(
db=db,
project_id=current_chapter.project_id,
chapter_id=chapter_id,
chapter_number=current_chapter.chapter_number,
chapter_content=full_content
)
if plant_result.get('planted_count', 0) > 0:
logger.info(f"🔮 自动标记伏笔已埋入: {plant_result['planted_count']}")
except Exception as plant_error:
logger.warning(f"⚠️ 自动标记伏笔埋入失败: {str(plant_error)}")
# 创建分析任务
analysis_task = AnalysisTask(
chapter_id=chapter_id,
user_id=user_id,
project_id=current_chapter.project_id,
status='pending',
progress=0
)
db.add(analysis_task)
await db.commit()
await db.refresh(analysis_task)
logger.info(f"📋 后台生成:已创建分析任务: {analysis_task.id}")
await asyncio.sleep(0.05)
# 启动后台分析
asyncio.create_task(
analyze_chapter_background(
chapter_id=chapter_id,
user_id=user_id,
project_id=current_chapter.project_id,
task_id=analysis_task.id,
ai_service=ai_service
)
)
# === 完成 ===
await tracker.complete(f"创作完成!共 {new_word_count}")
# 更新任务结果
from app.services.background_task_service import background_task_service
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession as BgAsyncSession
from app.database import get_engine as bg_get_engine
try:
engine = await bg_get_engine(user_id)
AsyncSessionLocal = async_sessionmaker(engine, class_=BgAsyncSession, expire_on_commit=False)
async with AsyncSessionLocal() as result_db:
from sqlalchemy import update as sql_update
await result_db.execute(
sql_update(BackgroundTask)
.where(BackgroundTask.id == task_id)
.values(task_result={
"chapter_id": chapter_id,
"word_count": new_word_count,
"analysis_task_id": analysis_task.id
})
)
await result_db.commit()
except Exception as e:
logger.warning(f"⚠️ 更新任务结果失败: {e}")
def _build_analysis_task_status_payload(
chapter_id: str,
task: Optional[AnalysisTask],
+517
View File
@@ -1717,6 +1717,523 @@ async def continue_outline_generator(
yield await tracker.error(f"续写失败: {str(e)}")
@router.post("/generate", summary="AI生成/续写大纲(后台任务)")
async def generate_outline_task(
data: Dict[str, Any],
request: Request,
db: AsyncSession = Depends(get_db),
user_ai_service: AIService = Depends(get_user_ai_service)
):
"""
使用后台任务生成或续写小说大纲(不怕断连,关闭浏览器也继续运行)
返回task_id,前端通过 GET /api/tasks/{task_id} 轮询进度
支持模式:
- auto/new/continue(同 generate-stream
"""
from app.services.background_task_service import background_task_service, TaskProgressTracker
from app.database import get_engine
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession as NewAsyncSession
user_id = getattr(request.state, 'user_id', None)
project = await verify_project_access(data.get("project_id"), user_id, db)
# 判断模式
mode = data.get("mode", "auto")
existing_result = await db.execute(
select(Outline)
.where(Outline.project_id == data.get("project_id"))
.order_by(Outline.order_index)
)
existing_outlines = existing_result.scalars().all()
if mode == "auto":
mode = "continue" if existing_outlines else "new"
data["user_id"] = user_id
data["mode"] = mode
if mode == "continue" and not existing_outlines:
raise HTTPException(status_code=400, detail="续写模式需要已有大纲")
# 创建后台任务
task_type = "outline_new" if mode == "new" else "outline_continue"
task = await background_task_service.create_task(
user_id=user_id,
project_id=data.get("project_id"),
task_type=task_type,
task_input=data,
db=db
)
# 后台执行的函数
async def _run_outline_generation(task_id: str, user_id: str):
engine = await get_engine(user_id)
AsyncSessionLocal = async_sessionmaker(engine, class_=NewAsyncSession, expire_on_commit=False)
async with AsyncSessionLocal() as bg_db:
tracker = TaskProgressTracker(task_id, user_id, "大纲")
try:
await tracker.start()
# 获取AI服务(需要在后台创建新实例)
from app.api.settings import get_user_ai_service_from_db
bg_ai_service = await get_user_ai_service_from_db(user_id, bg_db)
if mode == "new":
await _run_new_outline_bg(data, bg_db, bg_ai_service, tracker)
else:
await _run_continue_outline_bg(data, bg_db, bg_ai_service, tracker, user_id)
except Exception as e:
logger.error(f"❌ 后台大纲生成失败: {e}", exc_info=True)
await tracker.error(str(e))
await background_task_service.spawn_background_task(
task.id, user_id, _run_outline_generation
)
return {
"task_id": task.id,
"task_type": task_type,
"status": "pending",
"message": "任务已创建,请通过 GET /api/tasks/{task_id} 查询进度"
}
async def _run_new_outline_bg(
data: Dict[str, Any],
db: AsyncSession,
user_ai_service: AIService,
tracker
):
"""后台执行全新大纲生成"""
from app.database import get_engine
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession as BgAsyncSession
project_id = data.get("project_id")
chapter_count = int(data.get("chapter_count", 10))
user_id_for_mcp = data.get("user_id")
await tracker.loading("加载项目信息...", 0.3)
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
await tracker.error("项目不存在")
return
await tracker.loading(f"准备生成{chapter_count}章大纲...", 0.6)
characters_result = await db.execute(select(Character).where(Character.project_id == project_id))
characters = characters_result.scalars().all()
characters_info = _build_characters_info(characters)
if user_id_for_mcp:
user_ai_service.user_id = user_id_for_mcp
user_ai_service.db_session = db
await tracker.preparing("准备AI提示词...")
template = await PromptService.get_template("OUTLINE_CREATE", user_id_for_mcp, db)
prompt = PromptService.format_prompt(
template,
title=project.title,
theme=data.get("theme") or project.theme or "未设定",
genre=data.get("genre") or project.genre or "通用",
chapter_count=chapter_count,
narrative_perspective=data.get("narrative_perspective") or "第三人称",
time_period=project.world_time_period or "未设定",
location=project.world_location or "未设定",
atmosphere=project.world_atmosphere or "未设定",
rules=project.world_rules or "未设定",
characters_info=characters_info or "暂无角色信息",
requirements=data.get("requirements") or "",
mcp_references=""
)
model_param = data.get("model")
provider_param = data.get("provider")
estimated_total = chapter_count * 1000
accumulated_text = ""
chunk_count = 0
await tracker.generating(current_chars=0, estimated_total=estimated_total)
async for chunk in user_ai_service.generate_text_stream(
prompt=prompt, provider=provider_param, model=model_param
):
chunk_count += 1
accumulated_text += chunk
if chunk_count % 10 == 0:
if await tracker.check_cancelled():
await tracker.error("任务已取消")
return
await tracker.generating(
current_chars=len(accumulated_text),
estimated_total=estimated_total
)
await tracker.parsing("解析大纲数据...")
ai_content = accumulated_text
# 解析响应(带重试)
max_retries = 2
retry_count = 0
outline_data = None
while retry_count <= max_retries:
try:
outline_data = _parse_ai_response(ai_content, raise_on_error=True)
break
except JSONParseError:
retry_count += 1
if retry_count > max_retries:
outline_data = _parse_ai_response(ai_content, raise_on_error=False)
break
await tracker.retry(retry_count, max_retries, "JSON解析失败")
tracker.reset_generating_progress()
accumulated_text = ""
retry_prompt = prompt + "\n\n【重要提醒】请确保返回完整的JSON数组。"
async for chunk in user_ai_service.generate_text_stream(
prompt=retry_prompt, provider=provider_param, model=model_param
):
accumulated_text += chunk
ai_content = accumulated_text
# ✅ P0修复:先保存新数据,再删除旧数据
await tracker.saving("保存新大纲到数据库...", 0.2)
outlines = await _save_outlines(project_id, outline_data, db, start_index=1)
await db.commit() # 先提交新数据!
logger.info(f"✅ 新大纲已保存: {len(outlines)}")
# 新数据安全后,再清理旧数据
await tracker.saving("清理旧数据...", 0.6)
try:
from sqlalchemy import delete as sql_delete
# 获取旧大纲(不包括刚保存的)
new_outline_ids = [o.id for o in outlines]
old_outlines_result = await db.execute(
select(Outline).where(
Outline.project_id == project_id,
~Outline.id.in_(new_outline_ids)
)
)
old_outlines = old_outlines_result.scalars().all()
if old_outlines:
old_outline_ids = [o.id for o in old_outlines]
# 清理旧章节
old_chapters_result = await db.execute(
select(Chapter).where(
Chapter.project_id == project_id,
~Chapter.id.in_([ch.id for ch in await db.execute(
select(Chapter).where(Chapter.outline_id.in_(new_outline_ids) if new_outline_ids else False)
).scalars().all()] if new_outline_ids else [])
)
)
# 简化:删除不属于新大纲的旧章节
# 先获取新大纲对应的章节(one-to-one模式下通过chapter_number匹配)
new_order_indexes = [o.order_index for o in outlines]
if project.outline_mode == 'one-to-one':
old_chapters_result = await db.execute(
select(Chapter).where(
Chapter.project_id == project_id,
~Chapter.chapter_number.in_(new_order_indexes)
)
)
else:
old_chapters_result = await db.execute(
select(Chapter).where(
Chapter.project_id == project_id,
Chapter.outline_id.in_(old_outline_ids)
)
)
old_chapters = old_chapters_result.scalars().all()
deleted_word_count = sum(ch.word_count or 0 for ch in old_chapters)
# 清理伏笔和记忆
for ch in old_chapters:
try:
await memory_service.delete_chapter_memories(
user_id=user_id_for_mcp, project_id=project_id, chapter_id=ch.id
)
except Exception:
pass
try:
await foreshadow_service.delete_chapter_foreshadows(
db=db, project_id=project_id, chapter_id=ch.id, only_analysis_source=True
)
except Exception:
pass
# 删除旧章节
if project.outline_mode == 'one-to-one':
await db.execute(
sql_delete(Chapter).where(
Chapter.project_id == project_id,
~Chapter.chapter_number.in_(new_order_indexes)
)
)
else:
await db.execute(
sql_delete(Chapter).where(Chapter.outline_id.in_(old_outline_ids))
)
if deleted_word_count > 0:
project.current_words = max(0, project.current_words - deleted_word_count)
# 清理伏笔
try:
await foreshadow_service.clear_project_foreshadows_for_reset(db, project_id)
except Exception:
pass
# 清理分析
try:
from app.models.memory import PlotAnalysis
await db.execute(sql_delete(PlotAnalysis).where(PlotAnalysis.project_id == project_id))
except Exception:
pass
# 删除旧大纲
await db.execute(
sql_delete(Outline).where(Outline.id.in_(old_outline_ids))
)
await db.commit()
logger.info(f"✅ 旧数据清理完成: 删除 {len(old_outlines)} 个旧大纲, {len(old_chapters)} 个旧章节")
except Exception as e:
logger.error(f"❌ 清理旧数据失败(新数据已安全保存): {e}")
# 新数据已保存,旧数据清理失败不影响
# 角色校验
await tracker.saving("🎭 校验角色信息...", 0.7)
try:
await _check_and_create_missing_characters_from_outlines(
outline_data=outline_data, project_id=project_id, db=db,
user_ai_service=user_ai_service, user_id=data.get("user_id"),
enable_mcp=data.get("enable_mcp", True), tracker=tracker
)
except Exception:
pass
# 组织校验
try:
await _check_and_create_missing_organizations_from_outlines(
outline_data=outline_data, project_id=project_id, db=db,
user_ai_service=user_ai_service, user_id=data.get("user_id"),
enable_mcp=data.get("enable_mcp", True), tracker=tracker
)
except Exception:
pass
# 保存结果到任务记录
result_data = {
"message": f"成功生成{len(outlines)}章大纲",
"total_chapters": len(outlines),
"outline_ids": [o.id for o in outlines]
}
# 更新任务结果
from app.models.background_task import BackgroundTask
task_result = await db.execute(select(BackgroundTask).where(BackgroundTask.id == tracker.task_id))
bg_task = task_result.scalar_one_or_none()
if bg_task:
bg_task.task_result = result_data
await db.commit()
await tracker.complete(f"成功生成{len(outlines)}章大纲")
logger.info(f"✅ 后台大纲生成完成: {len(outlines)}")
async def _run_continue_outline_bg(
data: Dict[str, Any],
db: AsyncSession,
user_ai_service: AIService,
tracker,
user_id: str
):
"""后台执行大纲续写"""
project_id = data.get("project_id")
total_chapters = int(data.get("chapter_count", 5))
await tracker.loading("加载项目信息...", 0.2)
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
await tracker.error("项目不存在")
return
existing_result = await db.execute(
select(Outline).where(Outline.project_id == project_id).order_by(Outline.order_index)
)
existing_outlines = existing_result.scalars().all()
if not existing_outlines:
await tracker.error("续写模式需要已有大纲")
return
last_chapter_number = existing_outlines[-1].order_index
characters_result = await db.execute(select(Character).where(Character.project_id == project_id))
characters = characters_result.scalars().all()
batch_size = 5
total_batches = (total_chapters + batch_size - 1) // batch_size
all_new_outlines = []
current_start_chapter = last_chapter_number + 1
stage_instructions = {
"development": "继续展开情节,深化角色关系",
"climax": "进入故事高潮,矛盾激化",
"ending": "解决主要冲突,给出结局"
}
stage_instruction = stage_instructions.get(data.get("plot_stage", "development"), "")
for batch_num in range(total_batches):
if await tracker.check_cancelled():
await tracker.error("任务已取消")
return
remaining = total_chapters - len(all_new_outlines)
current_batch_size = min(batch_size, remaining)
tracker.reset_generating_progress()
await tracker.generating(
message=f"📝 第{batch_num + 1}/{total_batches}批: 生成第{current_start_chapter}-{current_start_chapter + current_batch_size - 1}"
)
latest_result = await db.execute(
select(Outline).where(Outline.project_id == project_id).order_by(Outline.order_index)
)
latest_outlines = latest_result.scalars().all()
context = await _build_outline_continue_context(
project=project, latest_outlines=latest_outlines, characters=characters,
chapter_count=current_batch_size,
plot_stage=data.get("plot_stage", "development"),
story_direction=data.get("story_direction", "自然延续"),
requirements=data.get("requirements", ""), db=db
)
user_ai_service.user_id = user_id
user_ai_service.db_session = db
# 获取伏笔提醒
foreshadow_reminders_text = "暂无需要关注的伏笔"
try:
foreshadow_context = await foreshadow_service.build_chapter_context(
db=db, project_id=project_id, chapter_number=current_start_chapter,
include_pending=False, include_overdue=True, lookahead=10
)
if foreshadow_context and foreshadow_context.get("context_text"):
foreshadow_reminders_text = foreshadow_context["context_text"]
except Exception:
pass
template = await PromptService.get_template("OUTLINE_CONTINUE", user_id, db)
prompt = PromptService.format_prompt(
template,
title=project.title, theme=project.theme or "未设定",
genre=project.genre or "通用",
narrative_perspective=project.narrative_perspective or "第三人称",
time_period=project.world_time_period or "未设定",
location=project.world_location or "未设定",
atmosphere=project.world_atmosphere or "未设定",
rules=project.world_rules or "未设定",
recent_outlines=context['recent_outlines'],
characters_info=context['characters_info'],
foreshadow_reminders=foreshadow_reminders_text,
chapter_count=current_batch_size,
start_chapter=current_start_chapter,
end_chapter=current_start_chapter + current_batch_size - 1,
current_chapter_count=len(latest_outlines),
plot_stage_instruction=stage_instruction,
story_direction=data.get("story_direction", "自然延续"),
requirements=data.get("requirements", ""),
mcp_references=""
)
accumulated_text = ""
chunk_count = 0
estimated_chars = current_batch_size * 1000
async for chunk in user_ai_service.generate_text_stream(
prompt=prompt, provider=data.get("provider"), model=data.get("model")
):
chunk_count += 1
accumulated_text += chunk
if chunk_count % 10 == 0:
await tracker.generating(
current_chars=len(accumulated_text), estimated_total=estimated_chars,
message=f"📝 第{batch_num + 1}/{total_batches}批生成中..."
)
await tracker.parsing(f"解析第{batch_num + 1}批数据...")
# 解析
max_retries = 2
retry_count = 0
outline_data = None
while retry_count <= max_retries:
try:
outline_data = _parse_ai_response(accumulated_text, raise_on_error=True)
break
except JSONParseError:
retry_count += 1
if retry_count > max_retries:
outline_data = _parse_ai_response(accumulated_text, raise_on_error=False)
break
await tracker.retry(retry_count, max_retries, "JSON解析失败")
tracker.reset_generating_progress()
accumulated_text = ""
retry_prompt = prompt + "\n\n【重要提醒】请确保返回完整的JSON数组。"
async for chunk in user_ai_service.generate_text_stream(
prompt=retry_prompt, provider=data.get("provider"), model=data.get("model")
):
accumulated_text += chunk
# 保存当前批次
await tracker.saving(f"保存第{batch_num + 1}批大纲...", 0.5)
batch_outlines = await _save_outlines(
project_id, outline_data, db, start_index=current_start_chapter
)
await db.commit()
all_new_outlines.extend(batch_outlines)
current_start_chapter += current_batch_size
# 角色校验
try:
await _check_and_create_missing_characters_from_outlines(
outline_data=outline_data, project_id=project_id, db=db,
user_ai_service=user_ai_service, user_id=user_id,
enable_mcp=data.get("enable_mcp", True), tracker=tracker
)
await db.commit()
except Exception:
pass
# 保存结果
result_data = {
"message": f"成功续写{len(all_new_outlines)}章大纲",
"total_chapters": len(all_new_outlines),
"outline_ids": [o.id for o in all_new_outlines]
}
from app.models.background_task import BackgroundTask
task_result = await db.execute(select(BackgroundTask).where(BackgroundTask.id == tracker.task_id))
bg_task = task_result.scalar_one_or_none()
if bg_task:
bg_task.task_result = result_data
await db.commit()
await tracker.complete(f"成功续写{len(all_new_outlines)}章大纲")
logger.info(f"✅ 后台大纲续写完成: {len(all_new_outlines)}")
@router.post("/generate-stream", summary="AI生成/续写大纲(SSE流式)")
async def generate_outline_stream(
data: Dict[str, Any],
+38
View File
@@ -160,6 +160,44 @@ async def get_user_ai_service(
)
async def get_user_ai_service_from_db(user_id: str, db: AsyncSession) -> AIService:
"""
从数据库直接创建用户AI服务实例(用于后台任务,不依赖FastAPI的Depends
"""
from app.models.mcp_plugin import MCPPlugin
result = await db.execute(
select(Settings).where(Settings.user_id == user_id)
)
settings = result.scalar_one_or_none()
if not settings:
env_defaults = read_env_defaults()
settings = Settings(user_id=user_id, **env_defaults)
db.add(settings)
await db.commit()
await db.refresh(settings)
mcp_result = await db.execute(
select(MCPPlugin).where(MCPPlugin.user_id == user_id)
)
mcp_plugins = mcp_result.scalars().all()
enable_mcp = any(plugin.enabled for plugin in mcp_plugins) if mcp_plugins else False
return create_user_ai_service_with_mcp(
api_provider=settings.api_provider,
api_key=settings.api_key,
api_base_url=settings.api_base_url or "",
model_name=settings.llm_model,
temperature=settings.temperature,
max_tokens=settings.max_tokens,
user_id=user_id,
db_session=db,
system_prompt=settings.system_prompt,
enable_mcp=enable_mcp,
)
@router.get("", response_model=SettingsResponse)
async def get_settings(
user: User = Depends(require_login),
+122
View File
@@ -0,0 +1,122 @@
"""后台任务API - 查询状态、取消任务"""
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional
from app.database import get_db
from app.models.background_task import BackgroundTask
from app.services.background_task_service import background_task_service
from app.logger import get_logger
router = APIRouter(prefix="/tasks", tags=["后台任务"])
logger = get_logger(__name__)
@router.get("/{task_id}", summary="获取任务状态")
async def get_task_status(
task_id: str,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""获取后台任务的状态和进度"""
user_id = getattr(request.state, 'user_id', None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
task = await background_task_service.get_task(task_id, user_id, db)
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
return {
"id": task.id,
"task_type": task.task_type,
"project_id": task.project_id,
"status": task.status,
"progress": task.progress,
"status_message": task.status_message,
"progress_details": task.progress_details,
"error_message": task.error_message,
"task_result": task.task_result,
"retry_count": task.retry_count,
"cancel_requested": task.cancel_requested,
"created_at": task.created_at.isoformat() if task.created_at else None,
"started_at": task.started_at.isoformat() if task.started_at else None,
"completed_at": task.completed_at.isoformat() if task.completed_at else None,
"updated_at": task.updated_at.isoformat() if task.updated_at else None,
}
@router.get("", summary="获取任务列表")
async def get_tasks(
project_id: str,
request: Request,
task_type: Optional[str] = None,
limit: int = 20,
db: AsyncSession = Depends(get_db)
):
"""获取项目的后台任务列表"""
user_id = getattr(request.state, 'user_id', None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
tasks = await background_task_service.get_project_tasks(
project_id, user_id, db, task_type=task_type, limit=limit
)
return {
"items": [
{
"id": t.id,
"task_type": t.task_type,
"status": t.status,
"progress": t.progress,
"status_message": t.status_message,
"progress_details": t.progress_details,
"error_message": t.error_message,
"created_at": t.created_at.isoformat() if t.created_at else None,
"completed_at": t.completed_at.isoformat() if t.completed_at else None,
}
for t in tasks
]
}
@router.post("/{task_id}/cancel", summary="取消任务")
async def cancel_task(
task_id: str,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""请求取消后台任务"""
user_id = getattr(request.state, 'user_id', None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
success = await background_task_service.cancel_task(task_id, user_id, db)
if not success:
raise HTTPException(status_code=400, detail="无法取消任务(不存在或已完成)")
return {"message": "任务已取消", "task_id": task_id}
@router.delete("/{task_id}", summary="删除任务记录")
async def delete_task(
task_id: str,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""删除已完成/失败的任务记录"""
user_id = getattr(request.state, 'user_id', None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
task = await background_task_service.get_task(task_id, user_id, db)
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
if task.status in ("pending", "running"):
raise HTTPException(status_code=400, detail="无法删除进行中的任务,请先取消")
await db.delete(task)
await db.commit()
return {"message": "任务记录已删除"}