feature:新增拆书导入续写功能,给当年的ta一个结局。

This commit is contained in:
xiamuceer-j
2026-03-04 16:28:16 +08:00
parent ad19c773f0
commit 536bd198b4
8 changed files with 3859 additions and 15 deletions
+262
View File
@@ -0,0 +1,262 @@
"""拆书导入 API"""
from __future__ import annotations
import asyncio
from typing import AsyncGenerator
from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.logger import get_logger
from app.schemas.book_import import (
BookImportApplyRequest,
BookImportApplyResponse,
BookImportPreviewResponse,
BookImportRetryRequest,
BookImportTaskCreateResponse,
BookImportTaskStatusResponse,
)
from app.services.book_import_service import book_import_service
from app.utils.sse_response import SSEResponse, create_sse_response
router = APIRouter(prefix="/book-import", tags=["拆书导入"])
logger = get_logger(__name__)
MAX_TXT_SIZE = 50 * 1024 * 1024 # 50MB
@router.post("/tasks", response_model=BookImportTaskCreateResponse, summary="创建拆书任务(上传TXT")
async def create_book_import_task(
request: Request,
file: UploadFile = File(..., description="TXT 文件"),
project_id: str | None = Form(default=None, description="兼容参数:当前版本固定新建项目,不支持传入"),
create_new_project: bool = Form(default=True, description="兼容参数:当前版本仅支持 true"),
import_mode: str = Form(default="append", description="导入模式:append/overwrite"),
):
user_id = getattr(request.state, "user_id", None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
if not file.filename or not file.filename.lower().endswith(".txt"):
raise HTTPException(status_code=400, detail="仅支持 .txt 文件")
if import_mode not in {"append", "overwrite"}:
raise HTTPException(status_code=400, detail="import_mode 仅支持 append 或 overwrite")
if project_id:
raise HTTPException(status_code=400, detail="当前仅支持新建项目导入,不支持指定 project_id")
if not create_new_project:
raise HTTPException(status_code=400, detail="当前仅支持新建项目导入")
content = await file.read()
if len(content) > MAX_TXT_SIZE:
raise HTTPException(status_code=413, detail="文件大小超过 50MB 限制")
task = await book_import_service.create_task(
user_id=user_id,
filename=file.filename,
file_content=content,
project_id=None,
create_new_project=True,
import_mode=import_mode,
)
return task
@router.get("/tasks/{task_id}", response_model=BookImportTaskStatusResponse, summary="查询拆书任务状态")
async def get_book_import_task_status(task_id: str, request: Request):
user_id = getattr(request.state, "user_id", None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
return await book_import_service.get_task_status(task_id=task_id, user_id=user_id)
@router.get("/tasks/{task_id}/preview", response_model=BookImportPreviewResponse, summary="获取拆书预览")
async def get_book_import_preview(task_id: str, request: Request):
user_id = getattr(request.state, "user_id", None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
return await book_import_service.get_preview(task_id=task_id, user_id=user_id)
@router.post("/tasks/{task_id}/apply", response_model=BookImportApplyResponse, summary="确认并导入")
async def apply_book_import(
task_id: str,
payload: BookImportApplyRequest,
request: Request,
db: AsyncSession = Depends(get_db),
):
user_id = getattr(request.state, "user_id", None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
return await book_import_service.apply_import(
task_id=task_id,
user_id=user_id,
payload=payload,
db=db,
)
@router.delete("/tasks/{task_id}", summary="取消拆书任务")
async def cancel_book_import_task(task_id: str, request: Request):
user_id = getattr(request.state, "user_id", None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
return await book_import_service.cancel_task(task_id=task_id, user_id=user_id)
@router.post("/tasks/{task_id}/apply-stream", summary="确认并导入(SSE流式进度)")
async def apply_book_import_stream(
task_id: str,
payload: BookImportApplyRequest,
request: Request,
db: AsyncSession = Depends(get_db),
):
"""
SSE 流式接口:执行基础导入后,分步生成世界观/职业/角色并实时推送进度。
使用 asyncio.Queue 在服务与 SSE 生成器之间传递进度消息。
"""
user_id = getattr(request.state, "user_id", None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
# 使用 asyncio.Queue 实现实时进度推送
progress_queue: asyncio.Queue[str | None] = asyncio.Queue()
async def _progress_callback(message: str, progress: int, status: str = "processing") -> None:
"""进度回调:放入队列供 SSE 生成器消费"""
sse_msg = SSEResponse.format_sse({
"type": "progress",
"message": message,
"progress": progress,
"status": status,
})
await progress_queue.put(sse_msg)
async def _run_import() -> None:
"""在后台任务中执行导入并通过队列推送进度"""
try:
result = await book_import_service.apply_import_stream(
task_id=task_id,
user_id=user_id,
payload=payload,
db=db,
progress_callback=_progress_callback,
)
# 发送结果
await progress_queue.put(await SSEResponse.send_result({
"success": result.success,
"project_id": result.project_id,
"statistics": result.statistics,
}))
await progress_queue.put(await SSEResponse.send_progress("导入完成!", 100, "success"))
await progress_queue.put(await SSEResponse.send_done())
except HTTPException as exc:
await progress_queue.put(await SSEResponse.send_error(exc.detail, exc.status_code))
except Exception as exc:
logger.error(f"拆书SSE导入失败: {exc}", exc_info=True)
await progress_queue.put(await SSEResponse.send_error(str(exc), 500))
finally:
# 发送终止信号
await progress_queue.put(None)
async def _streaming_generator() -> AsyncGenerator[str, None]:
yield await SSEResponse.send_progress("开始导入拆书数据...", 0, "processing")
# 启动后台导入任务
import_task = asyncio.create_task(_run_import())
try:
while True:
msg = await progress_queue.get()
if msg is None:
break
yield msg
except GeneratorExit:
import_task.cancel()
except Exception as exc:
logger.error(f"SSE生成器异常: {exc}", exc_info=True)
yield await SSEResponse.send_error(str(exc), 500)
return create_sse_response(_streaming_generator())
@router.post("/tasks/{task_id}/retry-stream", summary="重试失败的生成步骤(SSE流式进度)")
async def retry_failed_steps_stream(
task_id: str,
payload: BookImportRetryRequest,
request: Request,
db: AsyncSession = Depends(get_db),
):
"""
SSE 流式接口:仅重试之前导入过程中失败的AI生成步骤(世界观/职业/角色)。
"""
user_id = getattr(request.state, "user_id", None)
if not user_id:
raise HTTPException(status_code=401, detail="未登录")
progress_queue: asyncio.Queue[str | None] = asyncio.Queue()
async def _progress_callback(message: str, progress: int, status: str = "processing") -> None:
sse_msg = SSEResponse.format_sse({
"type": "progress",
"message": message,
"progress": progress,
"status": status,
})
await progress_queue.put(sse_msg)
async def _run_retry() -> None:
try:
result = await book_import_service.retry_failed_steps_stream(
task_id=task_id,
user_id=user_id,
steps_to_retry=payload.steps,
db=db,
progress_callback=_progress_callback,
)
await progress_queue.put(await SSEResponse.send_result(result))
if result.get("still_failed"):
await progress_queue.put(await SSEResponse.send_progress(
f"重试完成,仍有 {len(result['still_failed'])} 个步骤失败",
100,
"warning",
))
else:
await progress_queue.put(await SSEResponse.send_progress("所有步骤重试成功!", 100, "success"))
await progress_queue.put(await SSEResponse.send_done())
except HTTPException as exc:
await progress_queue.put(await SSEResponse.send_error(exc.detail, exc.status_code))
except Exception as exc:
logger.error(f"拆书SSE重试失败: {exc}", exc_info=True)
await progress_queue.put(await SSEResponse.send_error(str(exc), 500))
finally:
await progress_queue.put(None)
async def _streaming_generator() -> AsyncGenerator[str, None]:
yield await SSEResponse.send_progress("开始重试失败的生成步骤...", 0, "processing")
retry_task = asyncio.create_task(_run_retry())
try:
while True:
msg = await progress_queue.get()
if msg is None:
break
yield msg
except GeneratorExit:
retry_task.cancel()
except Exception as exc:
logger.error(f"SSE重试生成器异常: {exc}", exc_info=True)
yield await SSEResponse.send_error(str(exc), 500)
return create_sse_response(_streaming_generator())
+2 -1
View File
@@ -130,7 +130,7 @@ from app.api import (
wizard_stream, relationships, organizations,
auth, users, settings, writing_styles, memories,
mcp_plugins, admin, inspiration, prompt_templates,
changelog, careers, foreshadows, prompt_workshop
changelog, careers, foreshadows, prompt_workshop, book_import
)
app.include_router(auth.router, prefix="/api")
@@ -154,6 +154,7 @@ app.include_router(mcp_plugins.router, prefix="/api") # MCP插件管理API
app.include_router(prompt_templates.router, prefix="/api") # 提示词模板管理API
app.include_router(changelog.router, prefix="/api") # 更新日志API
app.include_router(prompt_workshop.router, prefix="/api") # 提示词工坊API
app.include_router(book_import.router, prefix="/api") # 拆书导入API
static_dir = Path(__file__).parent.parent / "static"
if static_dir.exists():
+92
View File
@@ -0,0 +1,92 @@
"""拆书导入相关的 Pydantic Schema"""
from datetime import datetime
from typing import Any, Literal, Optional
from pydantic import BaseModel, Field
TaskStatus = Literal["pending", "running", "completed", "failed", "cancelled"]
ImportMode = Literal["append", "overwrite"]
ExtractLevel = Literal["basic", "standard", "deep"]
WarningLevel = Literal["info", "warning", "error"]
class BookImportWarning(BaseModel):
"""导入告警信息"""
code: str = Field(..., description="告警编码")
message: str = Field(..., description="告警内容")
level: WarningLevel = Field(default="warning", description="告警等级")
class ProjectSuggestion(BaseModel):
"""项目建议信息(可在预览页修改)"""
title: str = Field(..., min_length=1, max_length=200, description="项目标题")
description: Optional[str] = Field(None, description="项目简介")
theme: Optional[str] = Field(None, description="主题")
genre: Optional[str] = Field(None, description="类型")
narrative_perspective: str = Field(default="第三人称", description="叙事视角")
target_words: int = Field(default=100000, ge=1000, description="目标字数(默认10万字)")
class BookImportChapter(BaseModel):
"""预览章节"""
title: str = Field(..., min_length=1, max_length=200, description="章节标题")
content: str = Field(default="", description="章节正文")
summary: Optional[str] = Field(None, description="章节摘要")
chapter_number: int = Field(..., ge=1, description="章节序号")
outline_title: Optional[str] = Field(None, description="关联大纲标题(可选)")
class BookImportOutline(BaseModel):
"""预览大纲"""
title: str = Field(..., min_length=1, max_length=200, description="大纲标题")
content: Optional[str] = Field(None, description="大纲内容")
order_index: int = Field(..., ge=1, description="排序序号")
structure: Optional[dict[str, Any]] = Field(None, description="结构化大纲(与系统大纲生成结构一致)")
class BookImportTaskCreateResponse(BaseModel):
"""创建任务响应"""
task_id: str
status: TaskStatus
class BookImportTaskStatusResponse(BaseModel):
"""任务状态响应"""
task_id: str
status: TaskStatus
progress: int = Field(..., ge=0, le=100)
message: Optional[str] = None
error: Optional[str] = None
created_at: datetime
updated_at: datetime
class BookImportPreviewResponse(BaseModel):
"""预览数据响应"""
task_id: str
project_suggestion: ProjectSuggestion
chapters: list[BookImportChapter]
outlines: list[BookImportOutline]
warnings: list[BookImportWarning]
class BookImportApplyRequest(BaseModel):
"""确认导入请求(支持前端修订后的数据)"""
project_suggestion: ProjectSuggestion
chapters: list[BookImportChapter]
outlines: list[BookImportOutline] = Field(default_factory=list)
import_mode: ImportMode = Field(default="append", description="导入模式")
class BookImportApplyResponse(BaseModel):
"""确认导入响应"""
success: bool
project_id: str
statistics: dict[str, int]
warnings: list[BookImportWarning] = Field(default_factory=list)
class BookImportRetryRequest(BaseModel):
"""重试失败步骤请求"""
steps: list[str] = Field(..., min_length=1, description="需要重试的步骤名列表,如 world_building / career_system / characters")
File diff suppressed because it is too large Load Diff
+145
View File
@@ -2416,6 +2416,136 @@ class PromptService:
❌ 添加任何元信息或说明
❌ 改变叙事人称或视角
❌ 偏离用户的修改要求
</constraints>"""
# 拆书导入-反向项目提炼提示词
BOOK_IMPORT_REVERSE_PROJECT_SUGGESTION = """<system>
你是资深网文策划编辑,擅长从小说正文中反向提炼项目立项信息。
</system>
<task>
【任务】
基于提供的前3章内容,提炼该小说的核心立项信息,用于创建新项目。
【目标】
在不偏离原文的前提下,输出可直接用于项目初始化的结构化信息。
</task>
<input priority="P0">
【输入信息】
书名:{title}
前3章内容:
{sampled_text}
</input>
<output priority="P0">
【输出格式】
仅输出一个纯JSON对象(不要markdown、不要代码块、不要解释):
{{
"description": "小说简介",
"theme": "核心主题",
"genre": "小说类型",
"narrative_perspective": "第一人称/第三人称/全知视角",
"target_words": 100000
}}
【字段要求】
1) description120-260字,聚焦主角、核心冲突、主线目标与故事张力。
2) theme120-260字,提炼作品想表达的核心命题。
3) genre:2-12字,如都市、玄幻、悬疑、科幻、言情等。
4) narrative_perspective:只能是“第一人称”或“第三人称”或“全知视角”。
5) target_words:整数。按网文体量合理预估;无法判断时返回100000。
</output>
<constraints>
【必须遵守】
✅ 严格基于已给正文内容,不凭空添加关键设定
✅ 保持信息自洽,避免互相矛盾
✅ 输出必须是可解析JSON对象
✅ 小说的genre可以由多个类型组成
【禁止事项】
❌ 输出JSON以外的任何文字
❌ 使用markdown标记或代码块包裹
❌ narrative_perspective输出枚举值之外的内容
❌ target_words输出非整数
</constraints>"""
# 拆书导入-反向生成章节大纲(严格对齐 OUTLINE_CREATE 结构)
BOOK_IMPORT_REVERSE_OUTLINES = """<system>
你是资深网文总编与剧情策划,擅长基于已完成章节反向提炼标准化章节大纲。
</system>
<task>
【任务】
基于给定的章节正文(每批最多5章),为每章反向生成对应大纲结构。
【核心目标】
输出结构必须与系统现有大纲生成结构严格一致(与 OUTLINE_CREATE 字段一致),用于直接入库。
</task>
<project priority="P0">
【项目信息】
书名:{title}
类型:{genre}
主题:{theme}
叙事视角:{narrative_perspective}
</project>
<input priority="P0">
【批次范围】
{start_chapter}章 - 第{end_chapter}章(共{expected_count}章)
【章节内容】
{chapters_text}
</input>
<output priority="P0">
【输出格式】
仅输出纯JSON数组(不要markdown、不要代码块、不要解释)。
数组长度必须严格等于 {expected_count}
每个对象字段必须严格为:
[
{{
"chapter_number": 1,
"title": "章节标题",
"summary": "章节概要(200-600字):主要情节、角色互动、关键事件、冲突与转折",
"scenes": ["场景1描述", "场景2描述"],
"characters": [
{{"name": "角色名1", "type": "character"}},
{{"name": "组织/势力名1", "type": "organization"}}
],
"key_points": ["情节要点1", "情节要点2"],
"emotion": "本章情感基调",
"goal": "本章叙事目标"
}}
]
【字段约束】
- chapter_number:必须与输入章节号一致
- title:必须与输入章节标题一致
- summary:根据本章正文反向提炼,不得臆造未出现关键事件
- scenes2-6条
- characters:可为空;type 仅允许 character 或 organization
- key_points2-6条
- emotion:一句话
- goal:一句话
</output>
<constraints>
【必须遵守】
✅ 严格一章对应一个对象,数量与顺序完全一致
✅ 字段名、字段层级、字段类型严格一致
✅ 仅基于输入正文提炼,不擅自扩展设定
✅ 输出必须可被JSON直接解析
【禁止事项】
❌ 输出JSON之外任何文本
❌ 缺失字段或新增字段
❌ chapter_number/title 与输入不一致
❌ 使用 markdown 或代码块
</constraints>"""
@staticmethod
@@ -2689,6 +2819,21 @@ class PromptService:
"description": "用于生成小说世界观设定,包括时间背景、地理位置、氛围基调和世界规则",
"parameters": ["title", "theme", "genre", "description"]
},
"BOOK_IMPORT_REVERSE_PROJECT_SUGGESTION": {
"name": "拆书导入-反向项目提炼",
"category": "拆书导入",
"description": "基于前3章内容反向提炼简介、主题、类型、叙事视角与目标字数",
"parameters": ["title", "sampled_text"]
},
"BOOK_IMPORT_REVERSE_OUTLINES": {
"name": "拆书导入-反向章节大纲",
"category": "拆书导入",
"description": "基于章节正文反向生成与OUTLINE_CREATE一致结构的大纲(单批次5章)",
"parameters": [
"title", "genre", "theme", "narrative_perspective",
"start_chapter", "end_chapter", "expected_count", "chapters_text"
]
},
"CHARACTERS_BATCH_GENERATION": {
"name": "批量角色生成",
"category": "角色生成",
+171
View File
@@ -0,0 +1,171 @@
"""TXT 解析服务:编码识别、文本清洗与章节切分"""
from __future__ import annotations
import re
from typing import Optional
from app.logger import get_logger
logger = get_logger(__name__)
class TxtParserService:
"""TXT 解析服务(规则优先)"""
STRONG_CHAPTER_PATTERNS = [
re.compile(r"^第[一二三四五六七八九十百千万零〇两\d]+[章节回卷集部篇].*$"),
re.compile(r"^chapter\s*\d+.*$", re.IGNORECASE),
re.compile(r"^chap\.\s*\d+.*$", re.IGNORECASE),
]
def decode_bytes(self, content: bytes) -> tuple[str, str]:
"""
尝试解码 TXT 字节流
Returns:
(text, encoding)
"""
encodings = ["utf-8", "utf-8-sig", "gb18030", "gbk", "big5"]
for enc in encodings:
try:
return content.decode(enc), enc
except UnicodeDecodeError:
continue
# 最后兜底:不抛错,尽量读出内容
logger.warning("TXT 编码自动识别失败,使用 utf-8(ignore) 兜底")
return content.decode("utf-8", errors="ignore"), "utf-8(ignore)"
def clean_text(self, text: str) -> str:
"""基础清洗:换行归一、去除异常空白、压缩多余空行"""
normalized = text.replace("\r\n", "\n").replace("\r", "\n").replace("\ufeff", "")
normalized = normalized.replace("\u3000", " ")
normalized = re.sub(r"[ \t]+\n", "\n", normalized)
normalized = re.sub(r"\n{4,}", "\n\n\n", normalized)
return normalized.strip()
def split_chapters(self, text: str) -> list[dict]:
"""
章节切分(规则优先,失败兜底)
Returns:
[{title, content, chapter_number}]
"""
if not text.strip():
return []
lines = text.split("\n")
heading_indexes: list[int] = []
for idx, line in enumerate(lines):
stripped = line.strip()
if not stripped:
continue
if self._is_strong_heading(stripped) or self._is_weak_heading(lines, idx):
heading_indexes.append(idx)
# 去重并排序
heading_indexes = sorted(set(heading_indexes))
# 如果一个标题都识别不到,走固定窗口兜底
if not heading_indexes:
return self._fallback_split(text)
# 如果第一个标题前有较长正文,作为前言章节保留
chapters: list[dict] = []
chapter_no = 1
first_heading = heading_indexes[0]
if first_heading > 0:
preface = "\n".join(lines[:first_heading]).strip()
if len(preface) >= 200:
chapters.append(
{
"title": "前言",
"content": preface,
"chapter_number": chapter_no,
}
)
chapter_no += 1
for i, start_idx in enumerate(heading_indexes):
end_idx = heading_indexes[i + 1] if i + 1 < len(heading_indexes) else len(lines)
title = lines[start_idx].strip()[:200] or f"{chapter_no}"
body = "\n".join(lines[start_idx + 1 : end_idx]).strip()
# 防止空标题/空正文完全丢失
if not body and i + 1 < len(heading_indexes):
next_line = lines[start_idx + 1].strip() if start_idx + 1 < len(lines) else ""
body = next_line
chapters.append(
{
"title": title,
"content": body,
"chapter_number": chapter_no,
}
)
chapter_no += 1
# 过滤掉明显噪音章节
filtered = [c for c in chapters if c["title"] or c["content"]]
if filtered:
return filtered
return self._fallback_split(text)
def _is_strong_heading(self, line: str) -> bool:
return any(pattern.match(line) for pattern in self.STRONG_CHAPTER_PATTERNS)
def _is_weak_heading(self, lines: list[str], idx: int) -> bool:
"""
弱模式:短行 + 前后空行 + 避免普通句子误判
"""
line = lines[idx].strip()
if not line:
return False
if len(line) > 25:
return False
if re.search(r"[,。!?;:,.!?;:]", line):
return False
prev_blank = idx == 0 or not lines[idx - 1].strip()
next_blank = idx == len(lines) - 1 or not lines[idx + 1].strip()
return prev_blank and next_blank
def _fallback_split(self, text: str, min_window: int = 3000, max_window: int = 5000) -> list[dict]:
"""
固定窗口 + 标点边界切分
"""
chapters: list[dict] = []
n = len(text)
start = 0
chapter_no = 1
boundary_punctuations = "。!?!?\n"
while start < n:
ideal_end = min(start + max_window, n)
if ideal_end >= n:
end = n
else:
search_from = min(start + min_window, n)
segment = text[search_from:ideal_end]
offset = max(segment.rfind(p) for p in boundary_punctuations)
end = search_from + offset + 1 if offset >= 0 else ideal_end
chunk = text[start:end].strip()
if chunk:
chapters.append(
{
"title": f"{chapter_no}",
"content": chunk,
"chapter_number": chapter_no,
}
)
chapter_no += 1
start = end
return chapters
txt_parser_service = TxtParserService()