feature:新增章节分析一键分析功能,支持拆书后一键批量分析。

This commit is contained in:
xiamuceer-j
2026-03-04 16:27:49 +08:00
parent ec5398d60a
commit ad19c773f0
3 changed files with 363 additions and 23 deletions
+256 -23
View File
@@ -32,6 +32,11 @@ from app.schemas.chapter import (
ChapterUpdate,
ChapterResponse,
ChapterListResponse,
AnalysisTaskStatusResponse,
BatchAnalysisStatusRequest,
BatchAnalysisStatusResponse,
BatchAnalyzeUnanalyzedRequest,
BatchAnalyzeUnanalyzedResponse,
ChapterGenerateRequest,
BatchGenerateRequest,
BatchGenerateResponse,
@@ -1810,7 +1815,41 @@ async def generate_chapter_content_stream(
return create_sse_response(event_generator())
@router.get("/{chapter_id}/analysis/status", summary="查询章节分析任务状态")
def _build_analysis_task_status_payload(
chapter_id: str,
task: Optional[AnalysisTask],
auto_recovered: bool = False
) -> dict:
"""统一构建分析任务状态响应"""
if not task:
return {
"has_task": False,
"chapter_id": chapter_id,
"status": "none",
"progress": 0,
"error_message": None,
"auto_recovered": False,
"task_id": None,
"created_at": None,
"started_at": None,
"completed_at": None
}
return {
"has_task": True,
"task_id": task.id,
"chapter_id": task.chapter_id,
"status": task.status,
"progress": task.progress,
"error_message": task.error_message,
"auto_recovered": auto_recovered,
"created_at": task.created_at.isoformat() if task.created_at else None,
"started_at": task.started_at.isoformat() if task.started_at else None,
"completed_at": task.completed_at.isoformat() if task.completed_at else None
}
@router.get("/{chapter_id}/analysis/status", summary="查询章节分析任务状态", response_model=AnalysisTaskStatusResponse)
async def get_analysis_task_status(
chapter_id: str,
request: Request,
@@ -1861,18 +1900,7 @@ async def get_analysis_task_status(
if not task:
# 返回无任务状态,而不是抛出404错误
return {
"has_task": False,
"chapter_id": chapter_id,
"status": "none",
"progress": 0,
"error_message": None,
"auto_recovered": False,
"task_id": None,
"created_at": None,
"started_at": None,
"completed_at": None
}
return _build_analysis_task_status_payload(chapter_id, None)
auto_recovered = False
current_time = datetime.now()
@@ -1909,17 +1937,222 @@ async def get_analysis_task_status(
await db.refresh(task)
logger.warning(f"🔄 自动恢复未启动的任务: {task.id}, 章节: {chapter_id}")
return _build_analysis_task_status_payload(chapter_id, task, auto_recovered)
@router.post("/project/{project_id}/analysis/statuses", summary="批量查询章节分析任务状态", response_model=BatchAnalysisStatusResponse)
async def get_project_analysis_task_statuses(
project_id: str,
payload: BatchAnalysisStatusRequest,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""批量查询项目章节分析状态,避免前端逐章节请求造成请求风暴"""
# 验证用户权限
user_id = getattr(request.state, 'user_id', None)
await verify_project_access(project_id, user_id, db)
# 先取项目章节列表
chapter_query = select(Chapter.id).where(Chapter.project_id == project_id)
if payload.chapter_ids and len(payload.chapter_ids) > 0:
chapter_query = chapter_query.where(Chapter.id.in_(payload.chapter_ids))
chapter_result = await db.execute(chapter_query)
chapter_ids = [row[0] for row in chapter_result.all()]
if not chapter_ids:
return {
"project_id": project_id,
"total": 0,
"items": {}
}
# 批量查询这些章节对应的所有分析任务,随后在内存中取最新一条
tasks_result = await db.execute(
select(AnalysisTask)
.where(AnalysisTask.chapter_id.in_(chapter_ids))
.order_by(AnalysisTask.chapter_id, AnalysisTask.created_at.desc())
)
all_tasks = tasks_result.scalars().all()
latest_task_map: dict[str, AnalysisTask] = {}
for task in all_tasks:
if task.chapter_id not in latest_task_map:
latest_task_map[task.chapter_id] = task
items: dict[str, dict] = {}
for chapter_id in chapter_ids:
task = latest_task_map.get(chapter_id)
items[chapter_id] = _build_analysis_task_status_payload(chapter_id, task)
return {
"has_task": True,
"task_id": task.id,
"chapter_id": task.chapter_id,
"status": task.status,
"progress": task.progress,
"error_message": task.error_message,
"auto_recovered": auto_recovered,
"created_at": task.created_at.isoformat() if task.created_at else None,
"started_at": task.started_at.isoformat() if task.started_at else None,
"completed_at": task.completed_at.isoformat() if task.completed_at else None
"project_id": project_id,
"total": len(chapter_ids),
"items": items
}
async def _run_batch_analysis_in_sequence(
tasks_queue: list[dict[str, int | str]],
user_id: str,
project_id: str,
ai_service: AIService
) -> None:
"""按章节顺序逐个执行分析任务。"""
for index, task_item in enumerate(tasks_queue, start=1):
chapter_id = str(task_item["chapter_id"])
chapter_number = int(task_item["chapter_number"])
task_id = str(task_item["task_id"])
logger.info(f"🔁 一键分析顺序执行中 [{index}/{len(tasks_queue)}]:第{chapter_number}")
try:
success = await analyze_chapter_background(
chapter_id=chapter_id,
user_id=user_id,
project_id=project_id,
task_id=task_id,
ai_service=ai_service
)
if not success:
logger.warning(f"⚠️ 一键顺序分析返回失败: chapter_id={chapter_id}, task_id={task_id}")
except Exception as e:
# analyze_chapter_background 内部已处理任务失败状态,这里仅保护顺序队列不中断
logger.error(
f"❌ 一键顺序分析异常(已继续后续章节) chapter_id={chapter_id}, task_id={task_id}: {str(e)}",
exc_info=True
)
@router.post(
"/project/{project_id}/analysis/analyze-unanalyzed",
summary="一键按章节顺序分析未分析章节",
response_model=BatchAnalyzeUnanalyzedResponse
)
async def batch_analyze_unanalyzed_chapters(
project_id: str,
payload: BatchAnalyzeUnanalyzedRequest,
request: Request,
db: AsyncSession = Depends(get_db),
user_ai_service: AIService = Depends(get_user_ai_service)
):
"""自动识别项目中未完成分析的章节,并按章节顺序逐个启动分析。"""
user_id = getattr(request.state, "user_id", None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
# 验证项目权限
await verify_project_access(project_id, user_id, db)
# 查询目标章节(可选限制 chapter_ids
chapter_query = select(Chapter).where(Chapter.project_id == project_id).order_by(Chapter.chapter_number)
if payload.chapter_ids and len(payload.chapter_ids) > 0:
chapter_query = chapter_query.where(Chapter.id.in_(payload.chapter_ids))
chapter_result = await db.execute(chapter_query)
chapters = chapter_result.scalars().all()
if not chapters:
return {
"project_id": project_id,
"total_candidates": 0,
"total_started": 0,
"total_skipped_no_content": 0,
"total_skipped_running": 0,
"total_already_completed": 0,
"started_tasks": {}
}
chapter_ids = [chapter.id for chapter in chapters]
# 查询每个章节最新分析任务
tasks_result = await db.execute(
select(AnalysisTask)
.where(AnalysisTask.chapter_id.in_(chapter_ids))
.order_by(AnalysisTask.chapter_id, AnalysisTask.created_at.desc())
)
all_tasks = tasks_result.scalars().all()
latest_task_map: dict[str, AnalysisTask] = {}
for task in all_tasks:
if task.chapter_id not in latest_task_map:
latest_task_map[task.chapter_id] = task
total_candidates = 0
total_skipped_no_content = 0
total_skipped_running = 0
total_already_completed = 0
started_tasks: dict[str, dict] = {}
tasks_to_start: list[tuple[Chapter, AnalysisTask]] = []
for chapter in chapters:
# 无内容章节直接跳过
if not chapter.content or chapter.content.strip() == "":
total_skipped_no_content += 1
continue
total_candidates += 1
latest_task = latest_task_map.get(chapter.id)
# 已在队列/分析中,跳过
if latest_task and latest_task.status in ("pending", "running"):
total_skipped_running += 1
continue
# 已分析完成,跳过
if latest_task and latest_task.status == "completed":
total_already_completed += 1
continue
# 无任务/失败/未知状态,重新发起分析
analysis_task = AnalysisTask(
chapter_id=chapter.id,
user_id=user_id,
project_id=project_id,
status='pending',
progress=0
)
db.add(analysis_task)
tasks_to_start.append((chapter, analysis_task))
if tasks_to_start:
try:
await db.flush()
for chapter, analysis_task in tasks_to_start:
started_tasks[chapter.id] = _build_analysis_task_status_payload(chapter.id, analysis_task)
await db.commit()
except Exception as e:
await db.rollback()
logger.error(f"❌ 一键分析创建任务失败: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"一键分析创建任务失败: {str(e)}")
# 提交后立即按章节顺序调度后台分析(逐章执行)
tasks_queue = [
{
"chapter_id": chapter.id,
"chapter_number": chapter.chapter_number,
"task_id": analysis_task.id
}
for chapter, analysis_task in tasks_to_start
]
asyncio.create_task(
_run_batch_analysis_in_sequence(
tasks_queue=tasks_queue,
user_id=user_id,
project_id=project_id,
ai_service=user_ai_service
)
)
return {
"project_id": project_id,
"total_candidates": total_candidates,
"total_started": len(tasks_to_start),
"total_skipped_no_content": total_skipped_no_content,
"total_skipped_running": total_skipped_running,
"total_already_completed": total_already_completed,
"started_tasks": started_tasks
}
+42
View File
@@ -67,6 +67,48 @@ class ChapterListResponse(BaseModel):
items: list[ChapterResponse]
class AnalysisTaskStatusResponse(BaseModel):
"""单章节分析任务状态响应"""
has_task: bool
task_id: Optional[str] = None
chapter_id: str
status: str
progress: int = 0
error_message: Optional[str] = None
auto_recovered: bool = False
created_at: Optional[str] = None
started_at: Optional[str] = None
completed_at: Optional[str] = None
class BatchAnalysisStatusRequest(BaseModel):
"""批量查询分析状态请求"""
chapter_ids: Optional[List[str]] = Field(None, description="待查询章节ID列表;为空时查询项目下全部章节")
class BatchAnalysisStatusResponse(BaseModel):
"""批量查询分析状态响应"""
project_id: str
total: int
items: Dict[str, AnalysisTaskStatusResponse]
class BatchAnalyzeUnanalyzedRequest(BaseModel):
"""一键分析未分析章节请求"""
chapter_ids: Optional[List[str]] = Field(None, description="可选:限定待分析章节ID列表;为空则自动识别项目内全部未分析章节")
class BatchAnalyzeUnanalyzedResponse(BaseModel):
"""一键分析未分析章节响应"""
project_id: str
total_candidates: int = Field(0, description="候选章节总数(有内容章节)")
total_started: int = Field(0, description="本次已启动分析任务数")
total_skipped_no_content: int = Field(0, description="跳过:无内容章节数")
total_skipped_running: int = Field(0, description="跳过:已在分析中的章节数")
total_already_completed: int = Field(0, description="跳过:已完成分析章节数")
started_tasks: Dict[str, AnalysisTaskStatusResponse] = Field(default_factory=dict, description="本次启动的分析任务状态映射")
class ChapterGenerateRequest(BaseModel):
"""AI生成章节内容的请求模型"""
style_id: Optional[int] = Field(None, description="写作风格ID,不提供则不使用任何风格")
+65
View File
@@ -50,6 +50,14 @@ import type {
PresetUpdateRequest,
PresetListResponse,
ChapterPlanItem,
BookImportTask,
BookImportPreview,
BookImportApplyPayload,
BookImportResult,
BookImportRetryResult,
BatchAnalysisStatusResponse,
BatchAnalyzeUnanalyzedRequest,
BatchAnalyzeUnanalyzedResponse,
} from '../types';
interface MCPPluginSimpleCreate {
@@ -364,6 +372,53 @@ export const projectApi = {
},
};
export const bookImportApi = {
createTask: (params: {
file: File;
}) => {
const formData = new FormData();
formData.append('file', params.file);
return api.post<unknown, { task_id: string; status: BookImportTask['status'] }>(
'/book-import/tasks',
formData,
{ headers: { 'Content-Type': 'multipart/form-data' } }
);
},
getTaskStatus: (taskId: string) =>
api.get<unknown, BookImportTask>(`/book-import/tasks/${taskId}`),
getPreview: (taskId: string) =>
api.get<unknown, BookImportPreview>(`/book-import/tasks/${taskId}/preview`),
applyImport: (taskId: string, payload: BookImportApplyPayload) =>
api.post<unknown, BookImportResult>(`/book-import/tasks/${taskId}/apply`, payload),
applyImportStream: (
taskId: string,
payload: BookImportApplyPayload,
options?: SSEClientOptions,
) => ssePost<BookImportResult>(
`/api/book-import/tasks/${taskId}/apply-stream`,
payload,
options,
),
retryFailedStepsStream: (
taskId: string,
steps: string[],
options?: SSEClientOptions,
) => ssePost<BookImportRetryResult>(
`/api/book-import/tasks/${taskId}/retry-stream`,
{ steps },
options,
),
cancelTask: (taskId: string) =>
api.delete<unknown, { success: boolean; message: string }>(`/book-import/tasks/${taskId}`),
};
export const outlineApi = {
getOutlines: (projectId: string) =>
api.get<unknown, { total: number; items: Outline[] }>(`/outlines/project/${projectId}`).then(res => res.items),
@@ -571,6 +626,16 @@ export const chapterApi = {
checkCanGenerate: (chapterId: string) =>
api.get<unknown, import('../types').ChapterCanGenerateResponse>(`/chapters/${chapterId}/can-generate`),
getBatchAnalysisStatuses: (projectId: string, chapterIds?: string[]) =>
api.post<unknown, BatchAnalysisStatusResponse>(`/chapters/project/${projectId}/analysis/statuses`, {
chapter_ids: chapterIds && chapterIds.length > 0 ? chapterIds : undefined,
}),
batchAnalyzeUnanalyzed: (projectId: string, data?: BatchAnalyzeUnanalyzedRequest) =>
api.post<unknown, BatchAnalyzeUnanalyzedResponse>(`/chapters/project/${projectId}/analysis/analyze-unanalyzed`, {
chapter_ids: data?.chapter_ids && data.chapter_ids.length > 0 ? data.chapter_ids : undefined,
}),
// 章节重新生成相关
getRegenerationTasks: (chapterId: string, limit?: number) =>
api.get<unknown, {