update: 更新1-N模式展开章节/批量展开作为后台任务

This commit is contained in:
xiamuceer
2026-04-30 10:56:54 +08:00
parent 88ee65f068
commit d384a08a59
4 changed files with 393 additions and 525 deletions
+355
View File
@@ -2451,6 +2451,284 @@ async def expand_outline_generator(
yield await tracker.error(f"展开失败: {str(e)}")
async def _save_background_task_result(db: AsyncSession, task_id: str, result_data: Dict[str, Any]) -> None:
"""保存后台任务结果到 background_tasks.task_result。"""
from app.models.background_task import BackgroundTask
task_result = await db.execute(select(BackgroundTask).where(BackgroundTask.id == task_id))
task = task_result.scalar_one_or_none()
if task:
task.task_result = result_data
await db.commit()
async def _run_outline_expansion_background(
task_id: str,
user_id: str,
outline_id: str,
data: Dict[str, Any]
):
"""后台执行单个大纲展开并可直接创建章节。"""
from app.database import get_engine
from app.api.settings import get_user_ai_service_from_db
from app.services.background_task_service import TaskProgressTracker
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession as BgAsyncSession
engine = await get_engine(user_id)
AsyncSessionLocal = async_sessionmaker(engine, class_=BgAsyncSession, expire_on_commit=False)
async with AsyncSessionLocal() as bg_db:
tracker = TaskProgressTracker(task_id, user_id, "大纲展开")
try:
await tracker.start("开始大纲展开任务...")
target_chapter_count = int(data.get("target_chapter_count", 3))
expansion_strategy = data.get("expansion_strategy", "balanced")
enable_scene_analysis = data.get("enable_scene_analysis", True)
auto_create_chapters = data.get("auto_create_chapters", True)
batch_size = int(data.get("batch_size", 5))
await tracker.loading("加载大纲信息...", 0.3)
outline_result = await bg_db.execute(select(Outline).where(Outline.id == outline_id))
outline = outline_result.scalar_one_or_none()
if not outline:
raise ValueError("大纲不存在")
await tracker.loading("加载项目信息...", 0.7)
project_result = await bg_db.execute(select(Project).where(Project.id == outline.project_id))
project = project_result.scalar_one_or_none()
if not project:
raise ValueError("项目不存在")
if await tracker.check_cancelled():
return
await tracker.preparing(f"准备展开《{outline.title}》为 {target_chapter_count} 章...")
bg_ai_service = await get_user_ai_service_from_db(user_id, bg_db)
expansion_service = PlotExpansionService(bg_ai_service)
await tracker.generating(
current_chars=0,
estimated_total=target_chapter_count * 500,
message=f"AI分析大纲《{outline.title}》,生成章节规划..."
)
chapter_plans = await expansion_service.analyze_outline_for_chapters(
outline=outline,
project=project,
db=bg_db,
target_chapter_count=target_chapter_count,
expansion_strategy=expansion_strategy,
enable_scene_analysis=enable_scene_analysis,
provider=data.get("provider"),
model=data.get("model"),
batch_size=batch_size,
progress_callback=None
)
if await tracker.check_cancelled():
return
if not chapter_plans:
raise ValueError("AI分析失败,未能生成章节规划")
await tracker.parsing(f"规划生成完成,共 {len(chapter_plans)} 个章节")
created_chapters = None
if auto_create_chapters:
await tracker.saving("创建章节记录...", 0.3)
created_chapters = await expansion_service.create_chapters_from_plans(
outline_id=outline_id,
chapter_plans=chapter_plans,
project_id=outline.project_id,
db=bg_db,
start_chapter_number=None
)
await tracker.saving(f"成功创建 {len(created_chapters)} 个章节记录", 0.8)
result_data = {
"outline_id": outline_id,
"outline_title": outline.title,
"target_chapter_count": target_chapter_count,
"actual_chapter_count": len(chapter_plans),
"expansion_strategy": expansion_strategy,
"chapter_plans": chapter_plans,
"created_chapters": [
{
"id": ch.id,
"chapter_number": ch.chapter_number,
"title": ch.title,
"summary": ch.summary,
"outline_id": ch.outline_id,
"sub_index": ch.sub_index,
"status": ch.status
}
for ch in created_chapters
] if created_chapters else None
}
await _save_background_task_result(bg_db, task_id, result_data)
await tracker.complete(f"{outline.title}》展开完成")
except Exception as e:
logger.error(f"后台大纲展开失败: {str(e)}", exc_info=True)
try:
if bg_db.in_transaction():
await bg_db.rollback()
except Exception:
pass
await tracker.error(str(e))
async def _run_batch_outline_expansion_background(
task_id: str,
user_id: str,
data: Dict[str, Any]
):
"""后台执行批量大纲展开并可直接创建章节。"""
from app.database import get_engine
from app.api.settings import get_user_ai_service_from_db
from app.services.background_task_service import TaskProgressTracker
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession as BgAsyncSession
engine = await get_engine(user_id)
AsyncSessionLocal = async_sessionmaker(engine, class_=BgAsyncSession, expire_on_commit=False)
async with AsyncSessionLocal() as bg_db:
tracker = TaskProgressTracker(task_id, user_id, "批量大纲展开")
try:
await tracker.start("开始批量大纲展开任务...")
project_id = data.get("project_id")
chapters_per_outline = int(data.get("chapters_per_outline", 3))
expansion_strategy = data.get("expansion_strategy", "balanced")
auto_create_chapters = data.get("auto_create_chapters", True)
outline_ids = data.get("outline_ids")
await tracker.loading("加载项目信息...", 0.4)
project_result = await bg_db.execute(select(Project).where(Project.id == project_id))
project = project_result.scalar_one_or_none()
if not project:
raise ValueError("项目不存在")
await tracker.loading("获取大纲列表...", 0.8)
if outline_ids:
outlines_result = await bg_db.execute(
select(Outline)
.where(Outline.project_id == project_id, Outline.id.in_(outline_ids))
.order_by(Outline.order_index)
)
else:
outlines_result = await bg_db.execute(
select(Outline)
.where(Outline.project_id == project_id)
.order_by(Outline.order_index)
)
outlines = outlines_result.scalars().all()
if not outlines:
raise ValueError("没有找到要展开的大纲")
total_outlines = len(outlines)
await tracker.preparing(f"共找到 {total_outlines} 个大纲,准备批量展开...")
bg_ai_service = await get_user_ai_service_from_db(user_id, bg_db)
expansion_service = PlotExpansionService(bg_ai_service)
expansion_results = []
skipped_outlines = []
total_chapters_created = 0
for idx, outline in enumerate(outlines):
if await tracker.check_cancelled():
return
await tracker.generating(
current_chars=idx * chapters_per_outline * 500,
estimated_total=total_outlines * chapters_per_outline * 500,
message=f"处理第 {idx + 1}/{total_outlines} 个大纲:《{outline.title}"
)
existing_chapters_result = await bg_db.execute(
select(Chapter).where(Chapter.outline_id == outline.id).limit(1)
)
existing_chapter = existing_chapters_result.scalar_one_or_none()
if existing_chapter:
skipped_outlines.append({
"outline_id": outline.id,
"outline_title": outline.title,
"reason": "已展开"
})
await tracker.warning(f"{outline.title}》已展开过,已跳过")
continue
chapter_plans = await expansion_service.analyze_outline_for_chapters(
outline=outline,
project=project,
db=bg_db,
target_chapter_count=chapters_per_outline,
expansion_strategy=expansion_strategy,
enable_scene_analysis=data.get("enable_scene_analysis", True),
provider=data.get("provider"),
model=data.get("model")
)
created_chapters = None
if auto_create_chapters:
created_chapters = await expansion_service.create_chapters_from_plans(
outline_id=outline.id,
chapter_plans=chapter_plans,
project_id=outline.project_id,
db=bg_db,
start_chapter_number=None
)
total_chapters_created += len(created_chapters)
expansion_results.append({
"outline_id": outline.id,
"outline_title": outline.title,
"target_chapter_count": chapters_per_outline,
"actual_chapter_count": len(chapter_plans),
"expansion_strategy": expansion_strategy,
"chapter_plans": chapter_plans,
"created_chapters": [
{
"id": ch.id,
"chapter_number": ch.chapter_number,
"title": ch.title,
"summary": ch.summary,
"outline_id": ch.outline_id,
"sub_index": ch.sub_index,
"status": ch.status
}
for ch in created_chapters
] if created_chapters else None
})
await tracker.generating(
current_chars=(idx + 1) * chapters_per_outline * 500,
estimated_total=total_outlines * chapters_per_outline * 500,
message=f"{outline.title}》展开完成 ({len(chapter_plans)} 章)"
)
await tracker.parsing("整理批量展开结果...")
result_data = {
"project_id": project_id,
"total_outlines_expanded": len(expansion_results),
"total_chapters_created": total_chapters_created,
"skipped_count": len(skipped_outlines),
"skipped_outlines": skipped_outlines,
"expansion_results": expansion_results
}
await _save_background_task_result(bg_db, task_id, result_data)
await tracker.complete(f"批量展开完成,共创建 {total_chapters_created} 个章节")
except Exception as e:
logger.error(f"后台批量大纲展开失败: {str(e)}", exc_info=True)
try:
if bg_db.in_transaction():
await bg_db.rollback()
except Exception:
pass
await tracker.error(str(e))
@router.post("/{outline_id}/create-single-chapter", summary="一对一创建章节(传统模式)")
async def create_single_chapter_from_outline(
outline_id: str,
@@ -2549,6 +2827,48 @@ async def create_single_chapter_from_outline(
raise HTTPException(status_code=500, detail=f"创建章节失败: {str(e)}")
@router.post("/{outline_id}/expand-background", summary="后台展开单个大纲为多章")
async def expand_outline_to_chapters_background(
outline_id: str,
data: Dict[str, Any],
request: Request,
db: AsyncSession = Depends(get_db)
):
"""创建后台任务展开单个大纲,任务完成后可在右下角后台任务面板查看结果。"""
result = await db.execute(select(Outline).where(Outline.id == outline_id))
outline = result.scalar_one_or_none()
if not outline:
raise HTTPException(status_code=404, detail="大纲不存在")
user_id = getattr(request.state, 'user_id', None)
await verify_project_access(outline.project_id, user_id, db)
from app.services.background_task_service import background_task_service
task_input = dict(data or {})
task_input["outline_id"] = outline_id
task_input.setdefault("auto_create_chapters", True)
task = await background_task_service.create_task(
user_id=user_id,
project_id=outline.project_id,
task_type="outline_expand",
task_input=task_input,
db=db
)
await background_task_service.spawn_background_task(
task.id, user_id, _run_outline_expansion_background, outline_id, task_input
)
return {
"task_id": task.id,
"task_type": "outline_expand",
"status": "pending",
"message": "大纲展开任务已创建,请通过后台任务面板查看进度"
}
@router.post("/{outline_id}/expand-stream", summary="展开单个大纲为多章(SSE流式)")
async def expand_outline_to_chapters_stream(
outline_id: str,
@@ -2901,6 +3221,41 @@ async def batch_expand_outlines_generator(
yield await SSEResponse.send_error(f"批量展开失败: {str(e)}")
@router.post("/batch-expand-background", summary="后台批量展开大纲为多章")
async def batch_expand_outlines_background(
data: Dict[str, Any],
request: Request,
db: AsyncSession = Depends(get_db)
):
"""创建后台任务批量展开大纲,任务完成后可在右下角后台任务面板查看结果。"""
user_id = getattr(request.state, 'user_id', None)
project = await verify_project_access(data.get("project_id"), user_id, db)
from app.services.background_task_service import background_task_service
task_input = dict(data or {})
task_input.setdefault("auto_create_chapters", True)
task = await background_task_service.create_task(
user_id=user_id,
project_id=project.id,
task_type="outline_batch_expand",
task_input=task_input,
db=db
)
await background_task_service.spawn_background_task(
task.id, user_id, _run_batch_outline_expansion_background, task_input
)
return {
"task_id": task.id,
"task_type": "outline_batch_expand",
"status": "pending",
"message": "批量大纲展开任务已创建,请通过后台任务面板查看进度"
}
@router.post("/batch-expand-stream", summary="批量展开大纲为多章(SSE流式)")
async def batch_expand_outlines_stream(
data: Dict[str, Any],