1786 lines
72 KiB
Python
1786 lines
72 KiB
Python
"""导入导出服务"""
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, or_
|
|
from app.models.project import Project
|
|
from app.models.chapter import Chapter
|
|
from app.models.character import Character
|
|
from app.models.outline import Outline
|
|
from app.models.relationship import CharacterRelationship, Organization, OrganizationMember
|
|
from app.models.writing_style import WritingStyle
|
|
from app.models.generation_history import GenerationHistory
|
|
from app.models.career import Career, CharacterCareer
|
|
from app.models.memory import StoryMemory, PlotAnalysis
|
|
from app.models.analysis_task import AnalysisTask
|
|
from app.models.project_default_style import ProjectDefaultStyle
|
|
from app.schemas.import_export import (
|
|
ProjectExportData,
|
|
ChapterExportData,
|
|
CharacterExportData,
|
|
OutlineExportData,
|
|
RelationshipExportData,
|
|
OrganizationExportData,
|
|
OrganizationMemberExportData,
|
|
WritingStyleExportData,
|
|
GenerationHistoryExportData,
|
|
CareerExportData,
|
|
CharacterCareerExportData,
|
|
StoryMemoryExportData,
|
|
PlotAnalysisExportData,
|
|
ProjectDefaultStyleExportData,
|
|
ImportValidationResult,
|
|
ImportResult
|
|
)
|
|
from app.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class ImportExportService:
|
|
"""导入导出服务类"""
|
|
|
|
SUPPORTED_VERSIONS = ["1.0.0", "1.1.0"] # 支持的版本列表
|
|
CURRENT_VERSION = "1.1.0" # 当前导出版本
|
|
|
|
@staticmethod
|
|
async def export_project(
|
|
project_id: str,
|
|
db: AsyncSession,
|
|
include_generation_history: bool = False,
|
|
include_writing_styles: bool = True,
|
|
include_careers: bool = True,
|
|
include_memories: bool = False,
|
|
include_plot_analysis: bool = False
|
|
) -> ProjectExportData:
|
|
"""
|
|
导出项目完整数据
|
|
|
|
Args:
|
|
project_id: 项目ID
|
|
db: 数据库会话
|
|
include_generation_history: 是否包含生成历史
|
|
include_writing_styles: 是否包含写作风格
|
|
include_careers: 是否包含职业系统
|
|
include_memories: 是否包含故事记忆
|
|
include_plot_analysis: 是否包含剧情分析
|
|
|
|
Returns:
|
|
ProjectExportData: 导出的项目数据
|
|
"""
|
|
logger.info(f"开始导出项目: {project_id}")
|
|
|
|
# 获取项目基本信息
|
|
result = await db.execute(select(Project).where(Project.id == project_id))
|
|
project = result.scalar_one_or_none()
|
|
if not project:
|
|
raise ValueError(f"项目不存在: {project_id}")
|
|
|
|
# 项目基本信息
|
|
project_data = {
|
|
"title": project.title,
|
|
"description": project.description,
|
|
"theme": project.theme,
|
|
"genre": project.genre,
|
|
"target_words": project.target_words,
|
|
"current_words": project.current_words,
|
|
"status": project.status,
|
|
"world_time_period": project.world_time_period,
|
|
"world_location": project.world_location,
|
|
"world_atmosphere": project.world_atmosphere,
|
|
"world_rules": project.world_rules,
|
|
"chapter_count": project.chapter_count,
|
|
"narrative_perspective": project.narrative_perspective,
|
|
"character_count": project.character_count,
|
|
"outline_mode": project.outline_mode,
|
|
"user_id": project.user_id,
|
|
"created_at": project.created_at.isoformat() if project.created_at else None,
|
|
}
|
|
|
|
# 导出章节
|
|
chapters = await ImportExportService._export_chapters(project_id, db)
|
|
logger.info(f"导出章节数: {len(chapters)}")
|
|
|
|
# 导出角色
|
|
characters = await ImportExportService._export_characters(project_id, db)
|
|
logger.info(f"导出角色数: {len(characters)}")
|
|
|
|
# 导出大纲
|
|
outlines = await ImportExportService._export_outlines(project_id, db)
|
|
logger.info(f"导出大纲数: {len(outlines)}")
|
|
|
|
# 导出关系
|
|
relationships = await ImportExportService._export_relationships(project_id, db)
|
|
logger.info(f"导出关系数: {len(relationships)}")
|
|
|
|
# 导出组织详情
|
|
organizations = await ImportExportService._export_organizations(project_id, db)
|
|
logger.info(f"导出组织数: {len(organizations)}")
|
|
|
|
# 导出组织成员
|
|
org_members = await ImportExportService._export_organization_members(project_id, db)
|
|
logger.info(f"导出组织成员数: {len(org_members)}")
|
|
|
|
# 导出写作风格(可选)
|
|
writing_styles = []
|
|
if include_writing_styles:
|
|
writing_styles = await ImportExportService._export_writing_styles(project_id, db)
|
|
logger.info(f"导出写作风格数: {len(writing_styles)}")
|
|
|
|
# 导出生成历史(可选)
|
|
generation_history = []
|
|
if include_generation_history:
|
|
generation_history = await ImportExportService._export_generation_history(project_id, db)
|
|
logger.info(f"导出生成历史数: {len(generation_history)}")
|
|
|
|
# 导出职业系统(可选)
|
|
careers = []
|
|
character_careers = []
|
|
if include_careers:
|
|
careers = await ImportExportService._export_careers(project_id, db)
|
|
logger.info(f"导出职业数: {len(careers)}")
|
|
character_careers = await ImportExportService._export_character_careers(project_id, db)
|
|
logger.info(f"导出角色职业关联数: {len(character_careers)}")
|
|
|
|
# 导出故事记忆(可选)
|
|
story_memories = []
|
|
if include_memories:
|
|
story_memories = await ImportExportService._export_story_memories(project_id, db)
|
|
logger.info(f"导出故事记忆数: {len(story_memories)}")
|
|
|
|
# 导出剧情分析(可选)
|
|
plot_analysis = []
|
|
if include_plot_analysis:
|
|
plot_analysis = await ImportExportService._export_plot_analysis(project_id, db)
|
|
logger.info(f"导出剧情分析数: {len(plot_analysis)}")
|
|
|
|
# 导出项目默认风格
|
|
project_default_style = await ImportExportService._export_project_default_style(project_id, db)
|
|
if project_default_style:
|
|
logger.info(f"导出项目默认风格: {project_default_style.style_name}")
|
|
|
|
export_data = ProjectExportData(
|
|
version=ImportExportService.CURRENT_VERSION,
|
|
export_time=datetime.utcnow().isoformat(),
|
|
project=project_data,
|
|
chapters=chapters,
|
|
characters=characters,
|
|
outlines=outlines,
|
|
relationships=relationships,
|
|
organizations=organizations,
|
|
organization_members=org_members,
|
|
writing_styles=writing_styles,
|
|
generation_history=generation_history,
|
|
careers=careers,
|
|
character_careers=character_careers,
|
|
story_memories=story_memories,
|
|
plot_analysis=plot_analysis,
|
|
project_default_style=project_default_style
|
|
)
|
|
|
|
logger.info(f"项目导出完成: {project_id}")
|
|
return export_data
|
|
|
|
@staticmethod
|
|
async def _export_chapters(project_id: str, db: AsyncSession) -> List[ChapterExportData]:
|
|
"""导出章节"""
|
|
result = await db.execute(
|
|
select(Chapter)
|
|
.where(Chapter.project_id == project_id)
|
|
.order_by(Chapter.chapter_number)
|
|
)
|
|
chapters = result.scalars().all()
|
|
|
|
# 构建大纲ID到标题的映射
|
|
outline_mapping = {}
|
|
if chapters:
|
|
outline_ids = [ch.outline_id for ch in chapters if ch.outline_id]
|
|
if outline_ids:
|
|
outline_result = await db.execute(
|
|
select(Outline).where(Outline.id.in_(outline_ids))
|
|
)
|
|
outlines = outline_result.scalars().all()
|
|
outline_mapping = {ol.id: ol.title for ol in outlines}
|
|
|
|
exported_chapters = []
|
|
for ch in chapters:
|
|
# 解析expansion_plan JSON
|
|
expansion_plan = None
|
|
if ch.expansion_plan:
|
|
try:
|
|
expansion_plan = json.loads(ch.expansion_plan) if isinstance(ch.expansion_plan, str) else ch.expansion_plan
|
|
except:
|
|
expansion_plan = None
|
|
|
|
exported_chapters.append(ChapterExportData(
|
|
title=ch.title,
|
|
content=ch.content,
|
|
summary=ch.summary,
|
|
chapter_number=ch.chapter_number,
|
|
word_count=ch.word_count or 0,
|
|
status=ch.status,
|
|
created_at=ch.created_at.isoformat() if ch.created_at else None,
|
|
outline_title=outline_mapping.get(ch.outline_id) if ch.outline_id else None,
|
|
sub_index=ch.sub_index,
|
|
expansion_plan=expansion_plan
|
|
))
|
|
|
|
return exported_chapters
|
|
|
|
@staticmethod
|
|
async def _export_characters(project_id: str, db: AsyncSession) -> List[CharacterExportData]:
|
|
"""导出角色"""
|
|
result = await db.execute(
|
|
select(Character).where(Character.project_id == project_id)
|
|
)
|
|
characters = result.scalars().all()
|
|
|
|
exported = []
|
|
for char in characters:
|
|
# 解析traits JSON
|
|
traits = None
|
|
if char.traits:
|
|
try:
|
|
traits = json.loads(char.traits) if isinstance(char.traits, str) else char.traits
|
|
except:
|
|
traits = None
|
|
|
|
exported.append(CharacterExportData(
|
|
name=char.name,
|
|
age=char.age,
|
|
gender=char.gender,
|
|
is_organization=char.is_organization or False,
|
|
role_type=char.role_type,
|
|
personality=char.personality,
|
|
background=char.background,
|
|
appearance=char.appearance,
|
|
traits=traits,
|
|
organization_type=char.organization_type,
|
|
organization_purpose=char.organization_purpose,
|
|
created_at=char.created_at.isoformat() if char.created_at else None
|
|
))
|
|
|
|
return exported
|
|
|
|
@staticmethod
|
|
async def _export_outlines(project_id: str, db: AsyncSession) -> List[OutlineExportData]:
|
|
"""导出大纲"""
|
|
result = await db.execute(
|
|
select(Outline)
|
|
.where(Outline.project_id == project_id)
|
|
.order_by(Outline.order_index)
|
|
)
|
|
outlines = result.scalars().all()
|
|
|
|
return [
|
|
OutlineExportData(
|
|
title=ol.title,
|
|
content=ol.content,
|
|
structure=ol.structure,
|
|
order_index=ol.order_index,
|
|
created_at=ol.created_at.isoformat() if ol.created_at else None
|
|
)
|
|
for ol in outlines
|
|
]
|
|
|
|
@staticmethod
|
|
async def _export_relationships(project_id: str, db: AsyncSession) -> List[RelationshipExportData]:
|
|
"""导出关系"""
|
|
result = await db.execute(
|
|
select(CharacterRelationship, Character)
|
|
.join(Character, CharacterRelationship.character_from_id == Character.id)
|
|
.where(CharacterRelationship.project_id == project_id)
|
|
)
|
|
relationships = result.all()
|
|
|
|
exported = []
|
|
for rel, char_from in relationships:
|
|
# 获取目标角色名称
|
|
target_result = await db.execute(
|
|
select(Character).where(Character.id == rel.character_to_id)
|
|
)
|
|
char_to = target_result.scalar_one_or_none()
|
|
|
|
if char_to:
|
|
exported.append(RelationshipExportData(
|
|
source_name=char_from.name,
|
|
target_name=char_to.name,
|
|
relationship_name=rel.relationship_name,
|
|
intimacy_level=rel.intimacy_level or 50,
|
|
status=rel.status or "active",
|
|
description=rel.description,
|
|
started_at=rel.started_at
|
|
))
|
|
|
|
return exported
|
|
|
|
@staticmethod
|
|
async def _export_organizations(project_id: str, db: AsyncSession) -> List[OrganizationExportData]:
|
|
"""导出组织详情"""
|
|
result = await db.execute(
|
|
select(Organization, Character)
|
|
.join(Character, Organization.character_id == Character.id)
|
|
.where(Organization.project_id == project_id)
|
|
)
|
|
organizations = result.all()
|
|
|
|
exported = []
|
|
for org, char in organizations:
|
|
# 获取父组织名称
|
|
parent_name = None
|
|
if org.parent_org_id:
|
|
parent_result = await db.execute(
|
|
select(Organization, Character)
|
|
.join(Character, Organization.character_id == Character.id)
|
|
.where(Organization.id == org.parent_org_id)
|
|
)
|
|
parent_data = parent_result.first()
|
|
if parent_data:
|
|
parent_name = parent_data[1].name
|
|
|
|
exported.append(OrganizationExportData(
|
|
character_name=char.name,
|
|
parent_org_name=parent_name,
|
|
power_level=org.power_level or 50,
|
|
member_count=org.member_count or 0,
|
|
location=org.location,
|
|
motto=org.motto,
|
|
color=org.color
|
|
))
|
|
|
|
return exported
|
|
|
|
@staticmethod
|
|
async def _export_organization_members(project_id: str, db: AsyncSession) -> List[OrganizationMemberExportData]:
|
|
"""导出组织成员"""
|
|
result = await db.execute(
|
|
select(OrganizationMember, Organization, Character)
|
|
.join(Organization, OrganizationMember.organization_id == Organization.id)
|
|
.join(Character, Organization.character_id == Character.id)
|
|
.where(Organization.project_id == project_id)
|
|
)
|
|
members = result.all()
|
|
|
|
exported = []
|
|
for member, org, org_char in members:
|
|
# 获取成员角色名称
|
|
char_result = await db.execute(
|
|
select(Character).where(Character.id == member.character_id)
|
|
)
|
|
member_char = char_result.scalar_one_or_none()
|
|
|
|
if member_char:
|
|
exported.append(OrganizationMemberExportData(
|
|
organization_name=org_char.name,
|
|
character_name=member_char.name,
|
|
position=member.position,
|
|
rank=member.rank or 0,
|
|
status=member.status or "active",
|
|
joined_at=member.joined_at,
|
|
loyalty=member.loyalty or 50,
|
|
contribution=member.contribution or 0,
|
|
notes=member.notes
|
|
))
|
|
|
|
return exported
|
|
|
|
@staticmethod
|
|
async def _export_writing_styles(project_id: str, db: AsyncSession) -> List[WritingStyleExportData]:
|
|
"""导出写作风格(用户自定义风格)"""
|
|
# 获取项目所属用户
|
|
project_result = await db.execute(
|
|
select(Project).where(Project.id == project_id)
|
|
)
|
|
project = project_result.scalar_one_or_none()
|
|
if not project:
|
|
return []
|
|
|
|
# 导出该用户的自定义风格(不包括全局预设)
|
|
result = await db.execute(
|
|
select(WritingStyle)
|
|
.where(WritingStyle.user_id == project.user_id)
|
|
.order_by(WritingStyle.order_index)
|
|
)
|
|
styles = result.scalars().all()
|
|
|
|
return [
|
|
WritingStyleExportData(
|
|
name=style.name,
|
|
style_type=style.style_type,
|
|
preset_id=style.preset_id,
|
|
description=style.description,
|
|
prompt_content=style.prompt_content,
|
|
order_index=style.order_index or 0
|
|
)
|
|
for style in styles
|
|
]
|
|
|
|
@staticmethod
|
|
async def _export_generation_history(project_id: str, db: AsyncSession) -> List[GenerationHistoryExportData]:
|
|
"""导出生成历史"""
|
|
result = await db.execute(
|
|
select(GenerationHistory, Chapter)
|
|
.outerjoin(Chapter, GenerationHistory.chapter_id == Chapter.id)
|
|
.where(GenerationHistory.project_id == project_id)
|
|
.order_by(GenerationHistory.created_at.desc())
|
|
.limit(100) # 限制最多导出100条历史记录
|
|
)
|
|
histories = result.all()
|
|
|
|
return [
|
|
GenerationHistoryExportData(
|
|
chapter_title=chapter.title if chapter else None,
|
|
prompt=history.prompt,
|
|
generated_content=history.generated_content,
|
|
model=history.model,
|
|
tokens_used=history.tokens_used,
|
|
generation_time=history.generation_time,
|
|
created_at=history.created_at.isoformat() if history.created_at else None
|
|
)
|
|
for history, chapter in histories
|
|
]
|
|
|
|
@staticmethod
|
|
async def _export_careers(project_id: str, db: AsyncSession) -> List[CareerExportData]:
|
|
"""导出职业系统"""
|
|
result = await db.execute(
|
|
select(Career)
|
|
.where(Career.project_id == project_id)
|
|
.order_by(Career.type, Career.created_at)
|
|
)
|
|
careers = result.scalars().all()
|
|
|
|
return [
|
|
CareerExportData(
|
|
name=career.name,
|
|
type=career.type,
|
|
description=career.description,
|
|
category=career.category,
|
|
stages=career.stages,
|
|
max_stage=career.max_stage or 10,
|
|
requirements=career.requirements,
|
|
special_abilities=career.special_abilities,
|
|
worldview_rules=career.worldview_rules,
|
|
attribute_bonuses=career.attribute_bonuses,
|
|
source=career.source or "ai",
|
|
created_at=career.created_at.isoformat() if career.created_at else None
|
|
)
|
|
for career in careers
|
|
]
|
|
|
|
@staticmethod
|
|
async def _export_character_careers(project_id: str, db: AsyncSession) -> List[CharacterCareerExportData]:
|
|
"""导出角色职业关联"""
|
|
# 查询所有属于该项目的角色职业关联
|
|
result = await db.execute(
|
|
select(CharacterCareer, Character, Career)
|
|
.join(Character, CharacterCareer.character_id == Character.id)
|
|
.join(Career, CharacterCareer.career_id == Career.id)
|
|
.where(Character.project_id == project_id)
|
|
)
|
|
character_careers = result.all()
|
|
|
|
return [
|
|
CharacterCareerExportData(
|
|
character_name=char.name,
|
|
career_name=career.name,
|
|
career_type=cc.career_type,
|
|
current_stage=cc.current_stage or 1,
|
|
stage_progress=cc.stage_progress or 0,
|
|
started_at=cc.started_at,
|
|
reached_current_stage_at=cc.reached_current_stage_at,
|
|
notes=cc.notes
|
|
)
|
|
for cc, char, career in character_careers
|
|
]
|
|
|
|
@staticmethod
|
|
async def _export_story_memories(project_id: str, db: AsyncSession) -> List[StoryMemoryExportData]:
|
|
"""导出故事记忆"""
|
|
# 构建章节ID到标题的映射
|
|
chapter_result = await db.execute(
|
|
select(Chapter).where(Chapter.project_id == project_id)
|
|
)
|
|
chapters = chapter_result.scalars().all()
|
|
chapter_mapping = {ch.id: ch.title for ch in chapters}
|
|
|
|
# 构建角色ID到名称的映射
|
|
char_result = await db.execute(
|
|
select(Character).where(Character.project_id == project_id)
|
|
)
|
|
characters = char_result.scalars().all()
|
|
char_mapping = {char.id: char.name for char in characters}
|
|
|
|
result = await db.execute(
|
|
select(StoryMemory)
|
|
.where(StoryMemory.project_id == project_id)
|
|
.order_by(StoryMemory.story_timeline, StoryMemory.chapter_position)
|
|
)
|
|
memories = result.scalars().all()
|
|
|
|
exported = []
|
|
for mem in memories:
|
|
# 将角色ID列表转换为名称列表
|
|
related_char_names = None
|
|
if mem.related_characters:
|
|
related_char_names = [
|
|
char_mapping.get(char_id, char_id)
|
|
for char_id in mem.related_characters
|
|
]
|
|
|
|
exported.append(StoryMemoryExportData(
|
|
chapter_title=chapter_mapping.get(mem.chapter_id) if mem.chapter_id else None,
|
|
memory_type=mem.memory_type,
|
|
title=mem.title,
|
|
content=mem.content,
|
|
full_context=mem.full_context,
|
|
related_characters=related_char_names,
|
|
related_locations=mem.related_locations,
|
|
tags=mem.tags,
|
|
importance_score=mem.importance_score or 0.5,
|
|
story_timeline=mem.story_timeline,
|
|
chapter_position=mem.chapter_position or 0,
|
|
text_length=mem.text_length or 0,
|
|
is_foreshadow=mem.is_foreshadow or 0,
|
|
foreshadow_strength=mem.foreshadow_strength,
|
|
created_at=mem.created_at.isoformat() if mem.created_at else None
|
|
))
|
|
|
|
return exported
|
|
|
|
@staticmethod
|
|
async def _export_plot_analysis(project_id: str, db: AsyncSession) -> List[PlotAnalysisExportData]:
|
|
"""导出剧情分析"""
|
|
# 构建章节ID到标题的映射
|
|
chapter_result = await db.execute(
|
|
select(Chapter).where(Chapter.project_id == project_id)
|
|
)
|
|
chapters = chapter_result.scalars().all()
|
|
chapter_mapping = {ch.id: ch.title for ch in chapters}
|
|
|
|
result = await db.execute(
|
|
select(PlotAnalysis)
|
|
.where(PlotAnalysis.project_id == project_id)
|
|
)
|
|
analyses = result.scalars().all()
|
|
|
|
exported = []
|
|
for analysis in analyses:
|
|
chapter_title = chapter_mapping.get(analysis.chapter_id)
|
|
if not chapter_title:
|
|
continue # 跳过没有关联章节的分析
|
|
|
|
exported.append(PlotAnalysisExportData(
|
|
chapter_title=chapter_title,
|
|
plot_stage=analysis.plot_stage,
|
|
conflict_level=analysis.conflict_level,
|
|
conflict_types=analysis.conflict_types,
|
|
emotional_tone=analysis.emotional_tone,
|
|
emotional_intensity=analysis.emotional_intensity,
|
|
emotional_curve=analysis.emotional_curve,
|
|
hooks=analysis.hooks,
|
|
hooks_count=analysis.hooks_count or 0,
|
|
hooks_avg_strength=analysis.hooks_avg_strength,
|
|
foreshadows=analysis.foreshadows,
|
|
foreshadows_planted=analysis.foreshadows_planted or 0,
|
|
foreshadows_resolved=analysis.foreshadows_resolved or 0,
|
|
plot_points=analysis.plot_points,
|
|
plot_points_count=analysis.plot_points_count or 0,
|
|
character_states=analysis.character_states,
|
|
scenes=analysis.scenes,
|
|
pacing=analysis.pacing,
|
|
overall_quality_score=analysis.overall_quality_score,
|
|
pacing_score=analysis.pacing_score,
|
|
engagement_score=analysis.engagement_score,
|
|
coherence_score=analysis.coherence_score,
|
|
analysis_report=analysis.analysis_report,
|
|
suggestions=analysis.suggestions,
|
|
word_count=analysis.word_count,
|
|
dialogue_ratio=analysis.dialogue_ratio,
|
|
description_ratio=analysis.description_ratio,
|
|
created_at=analysis.created_at.isoformat() if analysis.created_at else None
|
|
))
|
|
|
|
return exported
|
|
|
|
@staticmethod
|
|
async def _export_project_default_style(project_id: str, db: AsyncSession) -> Optional[ProjectDefaultStyleExportData]:
|
|
"""导出项目默认风格"""
|
|
result = await db.execute(
|
|
select(ProjectDefaultStyle, WritingStyle)
|
|
.join(WritingStyle, ProjectDefaultStyle.style_id == WritingStyle.id)
|
|
.where(ProjectDefaultStyle.project_id == project_id)
|
|
)
|
|
row = result.first()
|
|
|
|
if row:
|
|
_, style = row
|
|
return ProjectDefaultStyleExportData(style_name=style.name)
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def validate_import_data(data: Dict) -> ImportValidationResult:
|
|
"""
|
|
验证导入数据
|
|
|
|
Args:
|
|
data: 导入的JSON数据
|
|
|
|
Returns:
|
|
ImportValidationResult: 验证结果
|
|
"""
|
|
errors = []
|
|
warnings = []
|
|
statistics = {}
|
|
|
|
# 检查版本
|
|
version = data.get("version", "")
|
|
if not version:
|
|
errors.append("缺少版本信息")
|
|
elif version not in ImportExportService.SUPPORTED_VERSIONS:
|
|
warnings.append(f"版本不匹配: 导入文件版本为 {version}, 当前支持版本为 {', '.join(ImportExportService.SUPPORTED_VERSIONS)}")
|
|
|
|
# 检查必需字段
|
|
if "project" not in data:
|
|
errors.append("缺少项目信息")
|
|
else:
|
|
project = data["project"]
|
|
if not project.get("title"):
|
|
errors.append("项目标题不能为空")
|
|
|
|
# 统计数据(包含新增字段)
|
|
statistics = {
|
|
"chapters": len(data.get("chapters", [])),
|
|
"characters": len(data.get("characters", [])),
|
|
"outlines": len(data.get("outlines", [])),
|
|
"relationships": len(data.get("relationships", [])),
|
|
"organizations": len(data.get("organizations", [])),
|
|
"organization_members": len(data.get("organization_members", [])),
|
|
"writing_styles": len(data.get("writing_styles", [])),
|
|
"generation_history": len(data.get("generation_history", [])),
|
|
"careers": len(data.get("careers", [])),
|
|
"character_careers": len(data.get("character_careers", [])),
|
|
"story_memories": len(data.get("story_memories", [])),
|
|
"plot_analysis": len(data.get("plot_analysis", [])),
|
|
"has_default_style": data.get("project_default_style") is not None
|
|
}
|
|
|
|
# 检查数据完整性
|
|
if statistics["chapters"] == 0:
|
|
warnings.append("项目没有章节数据")
|
|
|
|
if statistics["characters"] == 0:
|
|
warnings.append("项目没有角色数据")
|
|
|
|
project_name = data.get("project", {}).get("title", "未知项目")
|
|
|
|
return ImportValidationResult(
|
|
valid=len(errors) == 0,
|
|
version=version,
|
|
project_name=project_name,
|
|
statistics=statistics,
|
|
errors=errors,
|
|
warnings=warnings
|
|
)
|
|
|
|
@staticmethod
|
|
async def import_project(
|
|
data: Dict,
|
|
db: AsyncSession,
|
|
user_id: str
|
|
) -> ImportResult:
|
|
"""
|
|
导入项目数据(创建新项目)
|
|
|
|
Args:
|
|
data: 导入的JSON数据
|
|
db: 数据库会话
|
|
user_id: 目标用户ID(导入后的项目归属)
|
|
|
|
Returns:
|
|
ImportResult: 导入结果
|
|
"""
|
|
warnings = []
|
|
statistics = {}
|
|
|
|
try:
|
|
# 验证数据
|
|
validation = ImportExportService.validate_import_data(data)
|
|
if not validation.valid:
|
|
return ImportResult(
|
|
success=False,
|
|
message=f"数据验证失败: {', '.join(validation.errors)}",
|
|
statistics={},
|
|
warnings=validation.warnings
|
|
)
|
|
|
|
warnings.extend(validation.warnings)
|
|
|
|
logger.info(f"开始导入项目: {validation.project_name}")
|
|
|
|
# 创建项目
|
|
project_data = data["project"]
|
|
new_project = Project(
|
|
user_id=user_id, # 设置为当前用户ID
|
|
title=project_data.get("title"),
|
|
description=project_data.get("description"),
|
|
theme=project_data.get("theme"),
|
|
genre=project_data.get("genre"),
|
|
target_words=project_data.get("target_words"),
|
|
status=project_data.get("status", "planning"),
|
|
world_time_period=project_data.get("world_time_period"),
|
|
world_location=project_data.get("world_location"),
|
|
world_atmosphere=project_data.get("world_atmosphere"),
|
|
world_rules=project_data.get("world_rules"),
|
|
chapter_count=project_data.get("chapter_count"),
|
|
narrative_perspective=project_data.get("narrative_perspective"),
|
|
character_count=project_data.get("character_count"),
|
|
outline_mode=project_data.get("outline_mode", "one-to-many"), # ✅ 导入大纲模式,默认为一对多
|
|
current_words=project_data.get("current_words", 0), # 保留原项目的字数
|
|
wizard_step=4, # 导入的项目设置为向导完成状态
|
|
wizard_status="completed" # 标记向导已完成
|
|
)
|
|
db.add(new_project)
|
|
await db.flush() # 获取project_id
|
|
|
|
logger.info(f"创建项目成功: {new_project.id}")
|
|
|
|
# 导入角色(包括组织)- 需要先导入角色,因为大纲可能需要角色信息
|
|
char_mapping = await ImportExportService._import_characters(
|
|
new_project.id, data.get("characters", []), db
|
|
)
|
|
statistics["characters"] = len(char_mapping)
|
|
logger.info(f"导入角色数: {len(char_mapping)}")
|
|
|
|
# 导入大纲 - 需要在章节之前导入,以便建立关联
|
|
outline_mapping = await ImportExportService._import_outlines(
|
|
new_project.id, data.get("outlines", []), db
|
|
)
|
|
statistics["outlines"] = len(outline_mapping)
|
|
logger.info(f"导入大纲数: {len(outline_mapping)}")
|
|
|
|
# 导入章节 - 使用大纲映射重建关联关系
|
|
chapters_count = await ImportExportService._import_chapters(
|
|
new_project.id, data.get("chapters", []), outline_mapping, db
|
|
)
|
|
statistics["chapters"] = chapters_count
|
|
logger.info(f"导入章节数: {chapters_count}")
|
|
|
|
# 导入关系
|
|
relationships_count = await ImportExportService._import_relationships(
|
|
new_project.id, data.get("relationships", []), char_mapping, db
|
|
)
|
|
statistics["relationships"] = relationships_count
|
|
logger.info(f"导入关系数: {relationships_count}")
|
|
|
|
# 导入组织详情
|
|
org_mapping = await ImportExportService._import_organizations(
|
|
new_project.id, data.get("organizations", []), char_mapping, db
|
|
)
|
|
statistics["organizations"] = len(org_mapping)
|
|
logger.info(f"导入组织数: {len(org_mapping)}")
|
|
|
|
# 导入组织成员
|
|
org_members_count = await ImportExportService._import_organization_members(
|
|
data.get("organization_members", []), char_mapping, org_mapping, db
|
|
)
|
|
statistics["organization_members"] = org_members_count
|
|
logger.info(f"导入组织成员数: {org_members_count}")
|
|
|
|
# 导入写作风格
|
|
styles_count = await ImportExportService._import_writing_styles(
|
|
new_project.id, data.get("writing_styles", []), db
|
|
)
|
|
statistics["writing_styles"] = styles_count
|
|
logger.info(f"导入写作风格数: {styles_count}")
|
|
|
|
# 导入职业系统
|
|
career_mapping = await ImportExportService._import_careers(
|
|
new_project.id, data.get("careers", []), db
|
|
)
|
|
statistics["careers"] = len(career_mapping)
|
|
logger.info(f"导入职业数: {len(career_mapping)}")
|
|
|
|
# 导入角色职业关联
|
|
char_careers_count = await ImportExportService._import_character_careers(
|
|
data.get("character_careers", []), char_mapping, career_mapping, db
|
|
)
|
|
statistics["character_careers"] = char_careers_count
|
|
logger.info(f"导入角色职业关联数: {char_careers_count}")
|
|
|
|
# 导入故事记忆
|
|
# 需要先构建章节标题到ID的映射(使用章节号+标题组合确保唯一性)
|
|
chapter_title_to_id = {}
|
|
chapter_result = await db.execute(
|
|
select(Chapter).where(Chapter.project_id == new_project.id)
|
|
)
|
|
imported_chapters = chapter_result.scalars().all()
|
|
for ch in imported_chapters:
|
|
# 使用标题作为key,如果有重复标题则取第一个(已导入的顺序)
|
|
if ch.title and ch.title not in chapter_title_to_id:
|
|
chapter_title_to_id[ch.title] = ch.id
|
|
|
|
memories_count = await ImportExportService._import_story_memories(
|
|
new_project.id, data.get("story_memories", []), chapter_title_to_id, char_mapping, db
|
|
)
|
|
statistics["story_memories"] = memories_count
|
|
logger.info(f"导入故事记忆数: {memories_count}")
|
|
|
|
# 导入剧情分析(传入user_id以便创建分析任务记录)
|
|
plot_analysis_count = await ImportExportService._import_plot_analysis(
|
|
new_project.id, data.get("plot_analysis", []), chapter_title_to_id, db, user_id
|
|
)
|
|
statistics["plot_analysis"] = plot_analysis_count
|
|
logger.info(f"导入剧情分析数: {plot_analysis_count}")
|
|
|
|
# 导入项目默认风格
|
|
default_style_imported = await ImportExportService._import_project_default_style(
|
|
new_project.id, data.get("project_default_style"), db
|
|
)
|
|
statistics["project_default_style"] = 1 if default_style_imported else 0
|
|
if default_style_imported:
|
|
logger.info("导入项目默认风格成功")
|
|
|
|
# 提交事务
|
|
await db.commit()
|
|
|
|
logger.info(f"项目导入完成: {new_project.id}")
|
|
|
|
return ImportResult(
|
|
success=True,
|
|
project_id=new_project.id,
|
|
message="项目导入成功",
|
|
statistics=statistics,
|
|
warnings=warnings
|
|
)
|
|
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(f"导入项目失败: {str(e)}", exc_info=True)
|
|
return ImportResult(
|
|
success=False,
|
|
message=f"导入失败: {str(e)}",
|
|
statistics=statistics,
|
|
warnings=warnings
|
|
)
|
|
|
|
@staticmethod
|
|
async def _import_chapters(
|
|
project_id: str,
|
|
chapters_data: List[Dict],
|
|
outline_mapping: Dict[str, str],
|
|
db: AsyncSession
|
|
) -> int:
|
|
"""导入章节"""
|
|
count = 0
|
|
for ch_data in chapters_data:
|
|
# 根据大纲标题查找对应的新大纲ID
|
|
outline_id = None
|
|
outline_title = ch_data.get("outline_title")
|
|
if outline_title and outline_title in outline_mapping:
|
|
outline_id = outline_mapping[outline_title]
|
|
|
|
# 处理expansion_plan
|
|
expansion_plan = ch_data.get("expansion_plan")
|
|
if expansion_plan and isinstance(expansion_plan, dict):
|
|
expansion_plan = json.dumps(expansion_plan, ensure_ascii=False)
|
|
|
|
chapter = Chapter(
|
|
project_id=project_id,
|
|
title=ch_data.get("title"),
|
|
content=ch_data.get("content"),
|
|
summary=ch_data.get("summary"),
|
|
chapter_number=ch_data.get("chapter_number"),
|
|
word_count=ch_data.get("word_count", 0),
|
|
status=ch_data.get("status", "draft"),
|
|
outline_id=outline_id,
|
|
sub_index=ch_data.get("sub_index"),
|
|
expansion_plan=expansion_plan
|
|
)
|
|
db.add(chapter)
|
|
count += 1
|
|
|
|
return count
|
|
|
|
@staticmethod
|
|
async def _import_characters(
|
|
project_id: str,
|
|
characters_data: List[Dict],
|
|
db: AsyncSession
|
|
) -> Dict[str, str]:
|
|
"""导入角色,返回名称到ID的映射"""
|
|
char_mapping = {}
|
|
|
|
for char_data in characters_data:
|
|
# 处理traits
|
|
traits = char_data.get("traits")
|
|
if isinstance(traits, list):
|
|
traits = json.dumps(traits, ensure_ascii=False)
|
|
|
|
character = Character(
|
|
project_id=project_id,
|
|
name=char_data.get("name"),
|
|
age=char_data.get("age"),
|
|
gender=char_data.get("gender"),
|
|
is_organization=char_data.get("is_organization", False),
|
|
role_type=char_data.get("role_type"),
|
|
personality=char_data.get("personality"),
|
|
background=char_data.get("background"),
|
|
appearance=char_data.get("appearance"),
|
|
traits=traits,
|
|
organization_type=char_data.get("organization_type"),
|
|
organization_purpose=char_data.get("organization_purpose")
|
|
)
|
|
db.add(character)
|
|
await db.flush() # 获取ID
|
|
char_mapping[char_data.get("name")] = character.id
|
|
|
|
return char_mapping
|
|
|
|
@staticmethod
|
|
async def _import_outlines(
|
|
project_id: str,
|
|
outlines_data: List[Dict],
|
|
db: AsyncSession
|
|
) -> Dict[str, str]:
|
|
"""导入大纲,返回标题到ID的映射"""
|
|
outline_mapping = {}
|
|
|
|
for ol_data in outlines_data:
|
|
outline = Outline(
|
|
project_id=project_id,
|
|
title=ol_data.get("title"),
|
|
content=ol_data.get("content"),
|
|
structure=ol_data.get("structure"),
|
|
order_index=ol_data.get("order_index")
|
|
)
|
|
db.add(outline)
|
|
await db.flush() # 获取ID
|
|
outline_mapping[ol_data.get("title")] = outline.id
|
|
|
|
return outline_mapping
|
|
|
|
@staticmethod
|
|
async def _import_relationships(
|
|
project_id: str,
|
|
relationships_data: List[Dict],
|
|
char_mapping: Dict[str, str],
|
|
db: AsyncSession
|
|
) -> int:
|
|
"""导入关系"""
|
|
count = 0
|
|
for rel_data in relationships_data:
|
|
source_name = rel_data.get("source_name")
|
|
target_name = rel_data.get("target_name")
|
|
|
|
# 查找角色ID
|
|
source_id = char_mapping.get(source_name)
|
|
target_id = char_mapping.get(target_name)
|
|
|
|
if source_id and target_id:
|
|
relationship = CharacterRelationship(
|
|
project_id=project_id,
|
|
character_from_id=source_id,
|
|
character_to_id=target_id,
|
|
relationship_name=rel_data.get("relationship_name"),
|
|
intimacy_level=rel_data.get("intimacy_level", 50),
|
|
status=rel_data.get("status", "active"),
|
|
description=rel_data.get("description"),
|
|
started_at=rel_data.get("started_at")
|
|
)
|
|
db.add(relationship)
|
|
count += 1
|
|
|
|
return count
|
|
|
|
@staticmethod
|
|
async def _import_organizations(
|
|
project_id: str,
|
|
organizations_data: List[Dict],
|
|
char_mapping: Dict[str, str],
|
|
db: AsyncSession
|
|
) -> Dict[str, str]:
|
|
"""导入组织详情,返回名称到ID的映射"""
|
|
org_mapping = {}
|
|
|
|
# 第一遍:创建所有组织(不设置父组织)
|
|
temp_orgs = []
|
|
for org_data in organizations_data:
|
|
char_name = org_data.get("character_name")
|
|
char_id = char_mapping.get(char_name)
|
|
|
|
if char_id:
|
|
organization = Organization(
|
|
project_id=project_id,
|
|
character_id=char_id,
|
|
power_level=org_data.get("power_level", 50),
|
|
member_count=org_data.get("member_count", 0),
|
|
location=org_data.get("location"),
|
|
motto=org_data.get("motto"),
|
|
color=org_data.get("color")
|
|
)
|
|
db.add(organization)
|
|
temp_orgs.append((organization, org_data.get("parent_org_name")))
|
|
|
|
await db.flush() # 获取所有组织的ID
|
|
|
|
# 建立名称到ID的映射
|
|
for org, _ in temp_orgs:
|
|
# 通过character_id查找角色名
|
|
result = await db.execute(
|
|
select(Character).where(Character.id == org.character_id)
|
|
)
|
|
char = result.scalar_one_or_none()
|
|
if char:
|
|
org_mapping[char.name] = org.id
|
|
|
|
# 第二遍:设置父组织关系
|
|
for org, parent_name in temp_orgs:
|
|
if parent_name:
|
|
parent_id = org_mapping.get(parent_name)
|
|
if parent_id:
|
|
org.parent_org_id = parent_id
|
|
|
|
return org_mapping
|
|
|
|
@staticmethod
|
|
async def _import_organization_members(
|
|
org_members_data: List[Dict],
|
|
char_mapping: Dict[str, str],
|
|
org_mapping: Dict[str, str],
|
|
db: AsyncSession
|
|
) -> int:
|
|
"""导入组织成员"""
|
|
count = 0
|
|
for member_data in org_members_data:
|
|
org_name = member_data.get("organization_name")
|
|
char_name = member_data.get("character_name")
|
|
|
|
org_id = org_mapping.get(org_name)
|
|
char_id = char_mapping.get(char_name)
|
|
|
|
if org_id and char_id:
|
|
member = OrganizationMember(
|
|
organization_id=org_id,
|
|
character_id=char_id,
|
|
position=member_data.get("position"),
|
|
rank=member_data.get("rank", 0),
|
|
status=member_data.get("status", "active"),
|
|
joined_at=member_data.get("joined_at"),
|
|
loyalty=member_data.get("loyalty", 50),
|
|
contribution=member_data.get("contribution", 0),
|
|
notes=member_data.get("notes")
|
|
)
|
|
db.add(member)
|
|
count += 1
|
|
|
|
return count
|
|
|
|
@staticmethod
|
|
async def _import_writing_styles(
|
|
project_id: str,
|
|
styles_data: List[Dict],
|
|
db: AsyncSession
|
|
) -> int:
|
|
"""导入写作风格(用户自定义风格)"""
|
|
# 获取项目所属用户
|
|
project_result = await db.execute(
|
|
select(Project).where(Project.id == project_id)
|
|
)
|
|
project = project_result.scalar_one_or_none()
|
|
if not project:
|
|
return 0
|
|
|
|
count = 0
|
|
for style_data in styles_data:
|
|
# 检查是否已存在同名风格(避免重复导入)
|
|
existing = await db.execute(
|
|
select(WritingStyle).where(
|
|
WritingStyle.user_id == project.user_id,
|
|
WritingStyle.name == style_data.get("name")
|
|
)
|
|
)
|
|
# 使用 first() 避免多行时报错
|
|
if existing.first():
|
|
logger.debug(f"风格 {style_data.get('name')} 已存在,跳过导入")
|
|
continue
|
|
|
|
style = WritingStyle(
|
|
user_id=project.user_id, # 使用 user_id 而不是 project_id
|
|
name=style_data.get("name"),
|
|
style_type=style_data.get("style_type"),
|
|
preset_id=style_data.get("preset_id"),
|
|
description=style_data.get("description"),
|
|
prompt_content=style_data.get("prompt_content"),
|
|
order_index=style_data.get("order_index", 0)
|
|
)
|
|
db.add(style)
|
|
count += 1
|
|
|
|
return count
|
|
|
|
@staticmethod
|
|
async def _import_careers(
|
|
project_id: str,
|
|
careers_data: List[Dict],
|
|
db: AsyncSession
|
|
) -> Dict[str, str]:
|
|
"""导入职业,返回名称到ID的映射"""
|
|
career_mapping = {}
|
|
|
|
for career_data in careers_data:
|
|
career = Career(
|
|
project_id=project_id,
|
|
name=career_data.get("name"),
|
|
type=career_data.get("type", "main"),
|
|
description=career_data.get("description"),
|
|
category=career_data.get("category"),
|
|
stages=career_data.get("stages", "[]"),
|
|
max_stage=career_data.get("max_stage", 10),
|
|
requirements=career_data.get("requirements"),
|
|
special_abilities=career_data.get("special_abilities"),
|
|
worldview_rules=career_data.get("worldview_rules"),
|
|
attribute_bonuses=career_data.get("attribute_bonuses"),
|
|
source=career_data.get("source", "ai")
|
|
)
|
|
db.add(career)
|
|
await db.flush()
|
|
career_mapping[career_data.get("name")] = career.id
|
|
|
|
return career_mapping
|
|
|
|
@staticmethod
|
|
async def _import_character_careers(
|
|
character_careers_data: List[Dict],
|
|
char_mapping: Dict[str, str],
|
|
career_mapping: Dict[str, str],
|
|
db: AsyncSession
|
|
) -> int:
|
|
"""导入角色职业关联"""
|
|
count = 0
|
|
for cc_data in character_careers_data:
|
|
char_name = cc_data.get("character_name")
|
|
career_name = cc_data.get("career_name")
|
|
|
|
char_id = char_mapping.get(char_name)
|
|
career_id = career_mapping.get(career_name)
|
|
|
|
if char_id and career_id:
|
|
# 检查是否已存在(使用 first() 避免多行时报错)
|
|
existing = await db.execute(
|
|
select(CharacterCareer).where(
|
|
CharacterCareer.character_id == char_id,
|
|
CharacterCareer.career_id == career_id
|
|
)
|
|
)
|
|
if existing.first():
|
|
continue
|
|
|
|
char_career = CharacterCareer(
|
|
character_id=char_id,
|
|
career_id=career_id,
|
|
career_type=cc_data.get("career_type", "main"),
|
|
current_stage=cc_data.get("current_stage", 1),
|
|
stage_progress=cc_data.get("stage_progress", 0),
|
|
started_at=cc_data.get("started_at"),
|
|
reached_current_stage_at=cc_data.get("reached_current_stage_at"),
|
|
notes=cc_data.get("notes")
|
|
)
|
|
db.add(char_career)
|
|
count += 1
|
|
|
|
# 同时更新角色的主职业信息
|
|
if cc_data.get("career_type") == "main":
|
|
char_result = await db.execute(
|
|
select(Character).where(Character.id == char_id)
|
|
)
|
|
char = char_result.scalar_one_or_none()
|
|
if char:
|
|
char.main_career_id = career_id
|
|
char.main_career_stage = cc_data.get("current_stage", 1)
|
|
|
|
return count
|
|
|
|
@staticmethod
|
|
async def _import_story_memories(
|
|
project_id: str,
|
|
memories_data: List[Dict],
|
|
chapter_mapping: Dict[str, str],
|
|
char_mapping: Dict[str, str],
|
|
db: AsyncSession
|
|
) -> int:
|
|
"""导入故事记忆"""
|
|
count = 0
|
|
for mem_data in memories_data:
|
|
# 将章节标题转换为ID
|
|
chapter_id = None
|
|
chapter_title = mem_data.get("chapter_title")
|
|
if chapter_title and chapter_title in chapter_mapping:
|
|
chapter_id = chapter_mapping[chapter_title]
|
|
|
|
# 将角色名称列表转换为ID列表
|
|
related_char_ids = None
|
|
related_char_names = mem_data.get("related_characters")
|
|
if related_char_names:
|
|
related_char_ids = [
|
|
char_mapping.get(name)
|
|
for name in related_char_names
|
|
if char_mapping.get(name)
|
|
]
|
|
|
|
memory = StoryMemory(
|
|
project_id=project_id,
|
|
chapter_id=chapter_id,
|
|
memory_type=mem_data.get("memory_type"),
|
|
title=mem_data.get("title"),
|
|
content=mem_data.get("content"),
|
|
full_context=mem_data.get("full_context"),
|
|
related_characters=related_char_ids,
|
|
related_locations=mem_data.get("related_locations"),
|
|
tags=mem_data.get("tags"),
|
|
importance_score=mem_data.get("importance_score", 0.5),
|
|
story_timeline=mem_data.get("story_timeline", 0),
|
|
chapter_position=mem_data.get("chapter_position", 0),
|
|
text_length=mem_data.get("text_length", 0),
|
|
is_foreshadow=mem_data.get("is_foreshadow", 0),
|
|
foreshadow_strength=mem_data.get("foreshadow_strength")
|
|
)
|
|
db.add(memory)
|
|
count += 1
|
|
|
|
return count
|
|
|
|
@staticmethod
|
|
async def _import_plot_analysis(
|
|
project_id: str,
|
|
plot_data: List[Dict],
|
|
chapter_mapping: Dict[str, str],
|
|
db: AsyncSession,
|
|
user_id: str = None
|
|
) -> int:
|
|
"""导入剧情分析,同时创建已完成的分析任务记录"""
|
|
from datetime import datetime
|
|
|
|
count = 0
|
|
for analysis_data in plot_data:
|
|
chapter_title = analysis_data.get("chapter_title")
|
|
chapter_id = chapter_mapping.get(chapter_title)
|
|
|
|
if not chapter_id:
|
|
continue # 跳过找不到章节的分析
|
|
|
|
# 检查是否已存在该章节的分析(使用 first() 避免多行时报错)
|
|
existing = await db.execute(
|
|
select(PlotAnalysis).where(PlotAnalysis.chapter_id == chapter_id)
|
|
)
|
|
if existing.first():
|
|
continue
|
|
|
|
analysis = PlotAnalysis(
|
|
project_id=project_id,
|
|
chapter_id=chapter_id,
|
|
plot_stage=analysis_data.get("plot_stage"),
|
|
conflict_level=analysis_data.get("conflict_level"),
|
|
conflict_types=analysis_data.get("conflict_types"),
|
|
emotional_tone=analysis_data.get("emotional_tone"),
|
|
emotional_intensity=analysis_data.get("emotional_intensity"),
|
|
emotional_curve=analysis_data.get("emotional_curve"),
|
|
hooks=analysis_data.get("hooks"),
|
|
hooks_count=analysis_data.get("hooks_count", 0),
|
|
hooks_avg_strength=analysis_data.get("hooks_avg_strength"),
|
|
foreshadows=analysis_data.get("foreshadows"),
|
|
foreshadows_planted=analysis_data.get("foreshadows_planted", 0),
|
|
foreshadows_resolved=analysis_data.get("foreshadows_resolved", 0),
|
|
plot_points=analysis_data.get("plot_points"),
|
|
plot_points_count=analysis_data.get("plot_points_count", 0),
|
|
character_states=analysis_data.get("character_states"),
|
|
scenes=analysis_data.get("scenes"),
|
|
pacing=analysis_data.get("pacing"),
|
|
overall_quality_score=analysis_data.get("overall_quality_score"),
|
|
pacing_score=analysis_data.get("pacing_score"),
|
|
engagement_score=analysis_data.get("engagement_score"),
|
|
coherence_score=analysis_data.get("coherence_score"),
|
|
analysis_report=analysis_data.get("analysis_report"),
|
|
suggestions=analysis_data.get("suggestions"),
|
|
word_count=analysis_data.get("word_count"),
|
|
dialogue_ratio=analysis_data.get("dialogue_ratio"),
|
|
description_ratio=analysis_data.get("description_ratio")
|
|
)
|
|
db.add(analysis)
|
|
|
|
# 同时创建已完成的分析任务记录,这样章节管理页面会显示"已分析"状态
|
|
if user_id:
|
|
now = datetime.utcnow()
|
|
analysis_task = AnalysisTask(
|
|
chapter_id=chapter_id,
|
|
user_id=user_id,
|
|
project_id=project_id,
|
|
status='completed',
|
|
progress=100,
|
|
started_at=now,
|
|
completed_at=now
|
|
)
|
|
db.add(analysis_task)
|
|
|
|
count += 1
|
|
|
|
return count
|
|
|
|
@staticmethod
|
|
async def _import_project_default_style(
|
|
project_id: str,
|
|
default_style_data: Optional[Dict],
|
|
db: AsyncSession
|
|
) -> bool:
|
|
"""导入项目默认风格"""
|
|
if not default_style_data:
|
|
return False
|
|
|
|
style_name = default_style_data.get("style_name")
|
|
if not style_name:
|
|
return False
|
|
|
|
# 获取项目所属用户
|
|
project_result = await db.execute(
|
|
select(Project).where(Project.id == project_id)
|
|
)
|
|
project = project_result.scalar_one_or_none()
|
|
if not project:
|
|
return False
|
|
|
|
# 查找对应的风格(优先查找用户自定义风格,然后是全局预设风格)
|
|
# 先查用户自定义风格(使用 first() 避免多行时报错)
|
|
style_result = await db.execute(
|
|
select(WritingStyle).where(
|
|
WritingStyle.user_id == project.user_id,
|
|
WritingStyle.name == style_name
|
|
)
|
|
)
|
|
style_row = style_result.first()
|
|
style = style_row[0] if style_row else None
|
|
|
|
# 如果用户自定义风格不存在,查找全局预设风格
|
|
if not style:
|
|
style_result = await db.execute(
|
|
select(WritingStyle).where(
|
|
WritingStyle.user_id.is_(None),
|
|
WritingStyle.name == style_name
|
|
)
|
|
)
|
|
style_row = style_result.first()
|
|
style = style_row[0] if style_row else None
|
|
|
|
if not style:
|
|
logger.warning(f"导入项目默认风格时未找到风格: {style_name}")
|
|
return False
|
|
|
|
# 创建项目默认风格关联
|
|
default_style = ProjectDefaultStyle(
|
|
project_id=project_id,
|
|
style_id=style.id
|
|
)
|
|
db.add(default_style)
|
|
|
|
logger.info(f"项目默认风格导入成功: {style_name}, style_id={style.id}")
|
|
return True
|
|
|
|
@staticmethod
|
|
async def export_characters(
|
|
character_ids: List[str],
|
|
db: AsyncSession
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
导出角色/组织卡片
|
|
|
|
Args:
|
|
character_ids: 要导出的角色/组织ID列表
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
Dict: 导出的角色数据
|
|
"""
|
|
logger.info(f"开始导出角色/组织: {len(character_ids)} 个")
|
|
|
|
# 查询角色数据
|
|
result = await db.execute(
|
|
select(Character).where(Character.id.in_(character_ids))
|
|
)
|
|
characters = result.scalars().all()
|
|
|
|
if not characters:
|
|
raise ValueError("未找到指定的角色/组织")
|
|
|
|
# 导出角色数据
|
|
exported_characters = []
|
|
for char in characters:
|
|
# 解析 traits
|
|
traits = None
|
|
if char.traits:
|
|
try:
|
|
traits = json.loads(char.traits) if isinstance(char.traits, str) else char.traits
|
|
except:
|
|
traits = None
|
|
|
|
# 基础角色数据
|
|
char_data = {
|
|
"name": char.name,
|
|
"age": char.age,
|
|
"gender": char.gender,
|
|
"is_organization": char.is_organization or False,
|
|
"role_type": char.role_type,
|
|
"personality": char.personality,
|
|
"background": char.background,
|
|
"appearance": char.appearance,
|
|
"relationships": char.relationships,
|
|
"traits": traits,
|
|
"organization_type": char.organization_type,
|
|
"organization_purpose": char.organization_purpose,
|
|
"organization_members": char.organization_members,
|
|
"avatar_url": char.avatar_url,
|
|
"main_career_id": char.main_career_id,
|
|
"main_career_stage": char.main_career_stage,
|
|
"sub_careers": char.sub_careers,
|
|
"created_at": char.created_at.isoformat() if char.created_at else None
|
|
}
|
|
|
|
# 如果是组织,添加组织专属字段
|
|
if char.is_organization:
|
|
org_result = await db.execute(
|
|
select(Organization).where(Organization.character_id == char.id)
|
|
)
|
|
org = org_result.scalar_one_or_none()
|
|
|
|
if org:
|
|
char_data.update({
|
|
"power_level": org.power_level,
|
|
"location": org.location,
|
|
"motto": org.motto,
|
|
"color": org.color
|
|
})
|
|
|
|
exported_characters.append(char_data)
|
|
|
|
export_data = {
|
|
"version": ImportExportService.CURRENT_VERSION,
|
|
"export_time": datetime.utcnow().isoformat(),
|
|
"export_type": "characters",
|
|
"count": len(exported_characters),
|
|
"data": exported_characters
|
|
}
|
|
|
|
logger.info(f"角色/组织导出完成: {len(exported_characters)} 个")
|
|
return export_data
|
|
|
|
@staticmethod
|
|
async def import_characters(
|
|
data: Dict,
|
|
project_id: str,
|
|
user_id: str,
|
|
db: AsyncSession
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
导入角色/组织卡片
|
|
|
|
Args:
|
|
data: 导入的JSON数据
|
|
project_id: 目标项目ID
|
|
user_id: 用户ID
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
Dict: 导入结果
|
|
"""
|
|
from app.models.career import CharacterCareer, Career
|
|
|
|
warnings = []
|
|
imported_characters = []
|
|
imported_organizations = []
|
|
skipped = []
|
|
errors = []
|
|
|
|
try:
|
|
# 验证数据格式
|
|
if "data" not in data:
|
|
raise ValueError("导入数据格式错误:缺少data字段")
|
|
|
|
characters_data = data["data"]
|
|
if not isinstance(characters_data, list):
|
|
raise ValueError("导入数据格式错误:data字段必须是数组")
|
|
|
|
# 验证项目权限
|
|
project_result = await db.execute(
|
|
select(Project).where(
|
|
Project.id == project_id,
|
|
Project.user_id == user_id
|
|
)
|
|
)
|
|
project = project_result.scalar_one_or_none()
|
|
if not project:
|
|
raise ValueError("项目不存在或无权访问")
|
|
|
|
logger.info(f"开始导入 {len(characters_data)} 个角色/组织到项目 {project_id}")
|
|
|
|
# 处理每个角色/组织
|
|
for idx, char_data in enumerate(characters_data):
|
|
try:
|
|
name = char_data.get("name")
|
|
if not name:
|
|
errors.append(f"第{idx+1}个角色缺少name字段")
|
|
continue
|
|
|
|
# 检查重复名称(使用 first() 避免多行时报错)
|
|
existing_result = await db.execute(
|
|
select(Character).where(
|
|
Character.project_id == project_id,
|
|
Character.name == name
|
|
)
|
|
)
|
|
existing = existing_result.first()
|
|
|
|
if existing:
|
|
warnings.append(f"角色'{name}'已存在,已跳过")
|
|
skipped.append(name)
|
|
continue
|
|
|
|
# 处理traits
|
|
traits = char_data.get("traits")
|
|
if isinstance(traits, list):
|
|
traits = json.dumps(traits, ensure_ascii=False)
|
|
|
|
is_organization = char_data.get("is_organization", False)
|
|
|
|
# 创建角色
|
|
character = Character(
|
|
project_id=project_id,
|
|
name=name,
|
|
age=char_data.get("age"),
|
|
gender=char_data.get("gender"),
|
|
is_organization=is_organization,
|
|
role_type=char_data.get("role_type"),
|
|
personality=char_data.get("personality"),
|
|
background=char_data.get("background"),
|
|
appearance=char_data.get("appearance"),
|
|
relationships=char_data.get("relationships"),
|
|
traits=traits,
|
|
organization_type=char_data.get("organization_type"),
|
|
organization_purpose=char_data.get("organization_purpose"),
|
|
organization_members=char_data.get("organization_members"),
|
|
avatar_url=char_data.get("avatar_url"),
|
|
main_career_id=None, # 职业ID需要验证后再设置
|
|
main_career_stage=char_data.get("main_career_stage"),
|
|
sub_careers=None # 副职业需要验证后再设置
|
|
)
|
|
db.add(character)
|
|
await db.flush() # 获取character.id
|
|
|
|
# 处理主职业(如果有)
|
|
main_career_id = char_data.get("main_career_id")
|
|
main_career_stage = char_data.get("main_career_stage")
|
|
|
|
if main_career_id and not is_organization:
|
|
# 验证职业是否存在
|
|
career_result = await db.execute(
|
|
select(Career).where(
|
|
Career.id == main_career_id,
|
|
Career.project_id == project_id,
|
|
Career.type == 'main'
|
|
)
|
|
)
|
|
career = career_result.scalar_one_or_none()
|
|
|
|
if career:
|
|
character.main_career_id = main_career_id
|
|
character.main_career_stage = main_career_stage or 1
|
|
|
|
# 创建职业关联
|
|
char_career = CharacterCareer(
|
|
character_id=character.id,
|
|
career_id=main_career_id,
|
|
career_type='main',
|
|
current_stage=main_career_stage or 1,
|
|
stage_progress=0
|
|
)
|
|
db.add(char_career)
|
|
else:
|
|
warnings.append(f"角色'{name}'的主职业ID不存在,已忽略职业信息")
|
|
|
|
# 处理副职业(如果有)
|
|
sub_careers = char_data.get("sub_careers")
|
|
if sub_careers and not is_organization:
|
|
try:
|
|
sub_careers_data = json.loads(sub_careers) if isinstance(sub_careers, str) else sub_careers
|
|
|
|
if isinstance(sub_careers_data, list):
|
|
valid_sub_careers = []
|
|
|
|
for sub_data in sub_careers_data[:2]: # 最多2个副职业
|
|
if isinstance(sub_data, dict):
|
|
career_id = sub_data.get('career_id')
|
|
stage = sub_data.get('stage', 1)
|
|
|
|
if career_id:
|
|
# 验证副职业是否存在
|
|
career_result = await db.execute(
|
|
select(Career).where(
|
|
Career.id == career_id,
|
|
Career.project_id == project_id,
|
|
Career.type == 'sub'
|
|
)
|
|
)
|
|
career = career_result.scalar_one_or_none()
|
|
|
|
if career:
|
|
valid_sub_careers.append({
|
|
'career_id': career_id,
|
|
'stage': stage
|
|
})
|
|
|
|
# 创建副职业关联
|
|
char_career = CharacterCareer(
|
|
character_id=character.id,
|
|
career_id=career_id,
|
|
career_type='sub',
|
|
current_stage=stage,
|
|
stage_progress=0
|
|
)
|
|
db.add(char_career)
|
|
|
|
if valid_sub_careers:
|
|
character.sub_careers = json.dumps(valid_sub_careers, ensure_ascii=False)
|
|
elif sub_careers_data:
|
|
warnings.append(f"角色'{name}'的副职业ID不存在,已忽略副职业信息")
|
|
except Exception as e:
|
|
warnings.append(f"角色'{name}'的副职业数据解析失败: {str(e)}")
|
|
|
|
# 如果是组织,创建Organization记录
|
|
if is_organization:
|
|
organization = Organization(
|
|
character_id=character.id,
|
|
project_id=project_id,
|
|
member_count=0,
|
|
power_level=char_data.get("power_level", 50),
|
|
location=char_data.get("location"),
|
|
motto=char_data.get("motto"),
|
|
color=char_data.get("color")
|
|
)
|
|
db.add(organization)
|
|
await db.flush()
|
|
imported_organizations.append(name)
|
|
else:
|
|
imported_characters.append(name)
|
|
|
|
logger.info(f"导入{'组织' if is_organization else '角色'}成功: {name}")
|
|
|
|
except Exception as e:
|
|
error_msg = f"导入角色'{char_data.get('name', f'第{idx+1}个')}'失败: {str(e)}"
|
|
logger.error(error_msg)
|
|
errors.append(error_msg)
|
|
continue
|
|
|
|
# 提交事务
|
|
await db.commit()
|
|
|
|
total = len(imported_characters) + len(imported_organizations)
|
|
|
|
result = {
|
|
"success": True,
|
|
"message": f"成功导入 {total} 个角色/组织",
|
|
"statistics": {
|
|
"total": len(characters_data),
|
|
"imported": total,
|
|
"skipped": len(skipped),
|
|
"errors": len(errors)
|
|
},
|
|
"details": {
|
|
"imported_characters": imported_characters,
|
|
"imported_organizations": imported_organizations,
|
|
"skipped": skipped,
|
|
"errors": errors
|
|
},
|
|
"warnings": warnings
|
|
}
|
|
|
|
logger.info(f"角色/组织导入完成: 成功{total}个,跳过{len(skipped)}个,失败{len(errors)}个")
|
|
return result
|
|
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(f"导入角色/组织失败: {str(e)}", exc_info=True)
|
|
return {
|
|
"success": False,
|
|
"message": f"导入失败: {str(e)}",
|
|
"statistics": {
|
|
"total": len(characters_data) if "data" in data else 0,
|
|
"imported": len(imported_characters) + len(imported_organizations),
|
|
"skipped": len(skipped),
|
|
"errors": len(errors)
|
|
},
|
|
"details": {
|
|
"imported_characters": imported_characters,
|
|
"imported_organizations": imported_organizations,
|
|
"skipped": skipped,
|
|
"errors": errors
|
|
},
|
|
"warnings": warnings
|
|
}
|
|
|
|
@staticmethod
|
|
def validate_characters_import(data: Dict) -> Dict[str, Any]:
|
|
"""
|
|
验证角色/组织导入数据
|
|
|
|
Args:
|
|
data: 导入的JSON数据
|
|
|
|
Returns:
|
|
Dict: 验证结果
|
|
"""
|
|
errors = []
|
|
warnings = []
|
|
|
|
# 检查版本
|
|
version = data.get("version", "")
|
|
if not version:
|
|
errors.append("缺少版本信息")
|
|
elif version not in ImportExportService.SUPPORTED_VERSIONS:
|
|
warnings.append(f"版本不匹配: 导入文件版本为 {version}, 当前支持版本为 {', '.join(ImportExportService.SUPPORTED_VERSIONS)}")
|
|
|
|
# 检查导出类型
|
|
export_type = data.get("export_type", "")
|
|
if export_type != "characters":
|
|
errors.append(f"导出类型错误: 期望'characters',实际'{export_type}'")
|
|
|
|
# 检查数据字段
|
|
if "data" not in data:
|
|
errors.append("缺少data字段")
|
|
elif not isinstance(data["data"], list):
|
|
errors.append("data字段必须是数组")
|
|
else:
|
|
characters_data = data["data"]
|
|
|
|
# 统计信息
|
|
character_count = sum(1 for c in characters_data if not c.get("is_organization", False))
|
|
org_count = sum(1 for c in characters_data if c.get("is_organization", False))
|
|
|
|
# 检查必填字段
|
|
for idx, char_data in enumerate(characters_data):
|
|
if not char_data.get("name"):
|
|
errors.append(f"第{idx+1}个角色缺少name字段")
|
|
|
|
statistics = {
|
|
"characters": character_count,
|
|
"organizations": org_count
|
|
}
|
|
|
|
if "data" not in data or errors:
|
|
statistics = {"characters": 0, "organizations": 0}
|
|
|
|
return {
|
|
"valid": len(errors) == 0,
|
|
"version": version,
|
|
"statistics": statistics,
|
|
"errors": errors,
|
|
"warnings": warnings
|
|
} |