Files
MuMuAINovel/backend/app/services/import_export_service.py
T
2026-02-25 04:23:42 +00:00

1840 lines
75 KiB
Python

"""导入导出服务"""
import json
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, or_
from app.models.project import Project
from app.models.chapter import Chapter
from app.models.character import Character
from app.models.outline import Outline
from app.models.relationship import CharacterRelationship, Organization, OrganizationMember
from app.models.writing_style import WritingStyle
from app.models.generation_history import GenerationHistory
from app.models.career import Career, CharacterCareer
from app.models.memory import StoryMemory, PlotAnalysis
from app.models.analysis_task import AnalysisTask
from app.models.project_default_style import ProjectDefaultStyle
from app.schemas.import_export import (
ProjectExportData,
ChapterExportData,
CharacterExportData,
OutlineExportData,
RelationshipExportData,
OrganizationExportData,
OrganizationMemberExportData,
WritingStyleExportData,
GenerationHistoryExportData,
CareerExportData,
CharacterCareerExportData,
StoryMemoryExportData,
PlotAnalysisExportData,
ProjectDefaultStyleExportData,
ImportValidationResult,
ImportResult
)
from app.logger import get_logger
logger = get_logger(__name__)
class ImportExportService:
"""导入导出服务类"""
SUPPORTED_VERSIONS = ["1.0.0", "1.1.0"] # 支持的版本列表
CURRENT_VERSION = "1.1.0" # 当前导出版本
@staticmethod
async def export_project(
project_id: str,
db: AsyncSession,
include_generation_history: bool = False,
include_writing_styles: bool = True,
include_careers: bool = True,
include_memories: bool = False,
include_plot_analysis: bool = False
) -> ProjectExportData:
"""
导出项目完整数据
Args:
project_id: 项目ID
db: 数据库会话
include_generation_history: 是否包含生成历史
include_writing_styles: 是否包含写作风格
include_careers: 是否包含职业系统
include_memories: 是否包含故事记忆
include_plot_analysis: 是否包含剧情分析
Returns:
ProjectExportData: 导出的项目数据
"""
logger.info(f"开始导出项目: {project_id}")
# 获取项目基本信息
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise ValueError(f"项目不存在: {project_id}")
# 项目基本信息
project_data = {
"title": project.title,
"description": project.description,
"theme": project.theme,
"genre": project.genre,
"target_words": project.target_words,
"current_words": project.current_words,
"status": project.status,
"world_time_period": project.world_time_period,
"world_location": project.world_location,
"world_atmosphere": project.world_atmosphere,
"world_rules": project.world_rules,
"chapter_count": project.chapter_count,
"narrative_perspective": project.narrative_perspective,
"character_count": project.character_count,
"outline_mode": project.outline_mode,
"user_id": project.user_id,
"created_at": project.created_at.isoformat() if project.created_at else None,
}
# 导出章节
chapters = await ImportExportService._export_chapters(project_id, db)
logger.info(f"导出章节数: {len(chapters)}")
# 导出角色
characters = await ImportExportService._export_characters(project_id, db)
logger.info(f"导出角色数: {len(characters)}")
# 导出大纲
outlines = await ImportExportService._export_outlines(project_id, db)
logger.info(f"导出大纲数: {len(outlines)}")
# 导出关系
relationships = await ImportExportService._export_relationships(project_id, db)
logger.info(f"导出关系数: {len(relationships)}")
# 导出组织详情
organizations = await ImportExportService._export_organizations(project_id, db)
logger.info(f"导出组织数: {len(organizations)}")
# 导出组织成员
org_members = await ImportExportService._export_organization_members(project_id, db)
logger.info(f"导出组织成员数: {len(org_members)}")
# 导出写作风格(可选)
writing_styles = []
if include_writing_styles:
writing_styles = await ImportExportService._export_writing_styles(project_id, db)
logger.info(f"导出写作风格数: {len(writing_styles)}")
# 导出生成历史(可选)
generation_history = []
if include_generation_history:
generation_history = await ImportExportService._export_generation_history(project_id, db)
logger.info(f"导出生成历史数: {len(generation_history)}")
# 导出职业系统(可选)
careers = []
character_careers = []
if include_careers:
careers = await ImportExportService._export_careers(project_id, db)
logger.info(f"导出职业数: {len(careers)}")
character_careers = await ImportExportService._export_character_careers(project_id, db)
logger.info(f"导出角色职业关联数: {len(character_careers)}")
# 导出故事记忆(可选)
story_memories = []
if include_memories:
story_memories = await ImportExportService._export_story_memories(project_id, db)
logger.info(f"导出故事记忆数: {len(story_memories)}")
# 导出剧情分析(可选)
plot_analysis = []
if include_plot_analysis:
plot_analysis = await ImportExportService._export_plot_analysis(project_id, db)
logger.info(f"导出剧情分析数: {len(plot_analysis)}")
# 导出项目默认风格
project_default_style = await ImportExportService._export_project_default_style(project_id, db)
if project_default_style:
logger.info(f"导出项目默认风格: {project_default_style.style_name}")
export_data = ProjectExportData(
version=ImportExportService.CURRENT_VERSION,
export_time=datetime.utcnow().isoformat(),
project=project_data,
chapters=chapters,
characters=characters,
outlines=outlines,
relationships=relationships,
organizations=organizations,
organization_members=org_members,
writing_styles=writing_styles,
generation_history=generation_history,
careers=careers,
character_careers=character_careers,
story_memories=story_memories,
plot_analysis=plot_analysis,
project_default_style=project_default_style
)
logger.info(f"项目导出完成: {project_id}")
return export_data
@staticmethod
async def _export_chapters(project_id: str, db: AsyncSession) -> List[ChapterExportData]:
"""导出章节"""
result = await db.execute(
select(Chapter)
.where(Chapter.project_id == project_id)
.order_by(Chapter.chapter_number)
)
chapters = result.scalars().all()
# 构建大纲ID到标题的映射
outline_mapping = {}
if chapters:
outline_ids = [ch.outline_id for ch in chapters if ch.outline_id]
if outline_ids:
outline_result = await db.execute(
select(Outline).where(Outline.id.in_(outline_ids))
)
outlines = outline_result.scalars().all()
outline_mapping = {ol.id: ol.title for ol in outlines}
exported_chapters = []
for ch in chapters:
# 解析expansion_plan JSON
expansion_plan = None
if ch.expansion_plan:
try:
expansion_plan = json.loads(ch.expansion_plan) if isinstance(ch.expansion_plan, str) else ch.expansion_plan
except Exception:
expansion_plan = None
exported_chapters.append(ChapterExportData(
title=ch.title,
content=ch.content,
summary=ch.summary,
chapter_number=ch.chapter_number,
word_count=ch.word_count or 0,
status=ch.status,
created_at=ch.created_at.isoformat() if ch.created_at else None,
outline_title=outline_mapping.get(ch.outline_id) if ch.outline_id else None,
sub_index=ch.sub_index,
expansion_plan=expansion_plan
))
return exported_chapters
@staticmethod
async def _export_characters(project_id: str, db: AsyncSession) -> List[CharacterExportData]:
"""导出角色"""
result = await db.execute(
select(Character).where(Character.project_id == project_id)
)
characters = result.scalars().all()
exported = []
for char in characters:
# 解析traits JSON
traits = None
if char.traits:
try:
traits = json.loads(char.traits) if isinstance(char.traits, str) else char.traits
except Exception:
traits = None
exported.append(CharacterExportData(
name=char.name,
age=char.age,
gender=char.gender,
is_organization=char.is_organization or False,
role_type=char.role_type,
personality=char.personality,
background=char.background,
appearance=char.appearance,
traits=traits,
organization_type=char.organization_type,
organization_purpose=char.organization_purpose,
created_at=char.created_at.isoformat() if char.created_at else None
))
return exported
@staticmethod
async def _export_outlines(project_id: str, db: AsyncSession) -> List[OutlineExportData]:
"""导出大纲"""
result = await db.execute(
select(Outline)
.where(Outline.project_id == project_id)
.order_by(Outline.order_index)
)
outlines = result.scalars().all()
return [
OutlineExportData(
title=ol.title,
content=ol.content,
structure=ol.structure,
order_index=ol.order_index,
created_at=ol.created_at.isoformat() if ol.created_at else None
)
for ol in outlines
]
@staticmethod
async def _export_relationships(project_id: str, db: AsyncSession) -> List[RelationshipExportData]:
"""导出关系"""
result = await db.execute(
select(CharacterRelationship, Character)
.join(Character, CharacterRelationship.character_from_id == Character.id)
.where(CharacterRelationship.project_id == project_id)
)
relationships = result.all()
exported = []
for rel, char_from in relationships:
# 获取目标角色名称
target_result = await db.execute(
select(Character).where(Character.id == rel.character_to_id)
)
char_to = target_result.scalar_one_or_none()
if char_to:
exported.append(RelationshipExportData(
source_name=char_from.name,
target_name=char_to.name,
relationship_name=rel.relationship_name,
intimacy_level=rel.intimacy_level or 50,
status=rel.status or "active",
description=rel.description,
started_at=rel.started_at
))
return exported
@staticmethod
async def _export_organizations(project_id: str, db: AsyncSession) -> List[OrganizationExportData]:
"""导出组织详情"""
result = await db.execute(
select(Organization, Character)
.join(Character, Organization.character_id == Character.id)
.where(Organization.project_id == project_id)
)
organizations = result.all()
exported = []
for org, char in organizations:
# 获取父组织名称
parent_name = None
if org.parent_org_id:
parent_result = await db.execute(
select(Organization, Character)
.join(Character, Organization.character_id == Character.id)
.where(Organization.id == org.parent_org_id)
)
parent_data = parent_result.first()
if parent_data:
parent_name = parent_data[1].name
exported.append(OrganizationExportData(
character_name=char.name,
parent_org_name=parent_name,
power_level=org.power_level or 50,
member_count=org.member_count or 0,
location=org.location,
motto=org.motto,
color=org.color
))
return exported
@staticmethod
async def _export_organization_members(project_id: str, db: AsyncSession) -> List[OrganizationMemberExportData]:
"""导出组织成员"""
result = await db.execute(
select(OrganizationMember, Organization, Character)
.join(Organization, OrganizationMember.organization_id == Organization.id)
.join(Character, Organization.character_id == Character.id)
.where(Organization.project_id == project_id)
)
members = result.all()
exported = []
for member, org, org_char in members:
# 获取成员角色名称
char_result = await db.execute(
select(Character).where(Character.id == member.character_id)
)
member_char = char_result.scalar_one_or_none()
if member_char:
exported.append(OrganizationMemberExportData(
organization_name=org_char.name,
character_name=member_char.name,
position=member.position,
rank=member.rank or 0,
status=member.status or "active",
joined_at=member.joined_at,
loyalty=member.loyalty or 50,
contribution=member.contribution or 0,
notes=member.notes
))
return exported
@staticmethod
async def _export_writing_styles(project_id: str, db: AsyncSession) -> List[WritingStyleExportData]:
"""导出写作风格(用户自定义风格)"""
# 获取项目所属用户
project_result = await db.execute(
select(Project).where(Project.id == project_id)
)
project = project_result.scalar_one_or_none()
if not project:
return []
# 导出该用户的自定义风格(不包括全局预设)
result = await db.execute(
select(WritingStyle)
.where(WritingStyle.user_id == project.user_id)
.order_by(WritingStyle.order_index)
)
styles = result.scalars().all()
return [
WritingStyleExportData(
name=style.name,
style_type=style.style_type,
preset_id=style.preset_id,
description=style.description,
prompt_content=style.prompt_content,
order_index=style.order_index or 0
)
for style in styles
]
@staticmethod
async def _export_generation_history(project_id: str, db: AsyncSession) -> List[GenerationHistoryExportData]:
"""导出生成历史"""
result = await db.execute(
select(GenerationHistory, Chapter)
.outerjoin(Chapter, GenerationHistory.chapter_id == Chapter.id)
.where(GenerationHistory.project_id == project_id)
.order_by(GenerationHistory.created_at.desc())
.limit(100) # 限制最多导出100条历史记录
)
histories = result.all()
return [
GenerationHistoryExportData(
chapter_title=chapter.title if chapter else None,
prompt=history.prompt,
generated_content=history.generated_content,
model=history.model,
tokens_used=history.tokens_used,
generation_time=history.generation_time,
created_at=history.created_at.isoformat() if history.created_at else None
)
for history, chapter in histories
]
@staticmethod
async def _export_careers(project_id: str, db: AsyncSession) -> List[CareerExportData]:
"""导出职业系统"""
result = await db.execute(
select(Career)
.where(Career.project_id == project_id)
.order_by(Career.type, Career.created_at)
)
careers = result.scalars().all()
return [
CareerExportData(
name=career.name,
type=career.type,
description=career.description,
category=career.category,
stages=career.stages,
max_stage=career.max_stage or 10,
requirements=career.requirements,
special_abilities=career.special_abilities,
worldview_rules=career.worldview_rules,
attribute_bonuses=career.attribute_bonuses,
source=career.source or "ai",
created_at=career.created_at.isoformat() if career.created_at else None
)
for career in careers
]
@staticmethod
async def _export_character_careers(project_id: str, db: AsyncSession) -> List[CharacterCareerExportData]:
"""导出角色职业关联"""
# 查询所有属于该项目的角色职业关联
result = await db.execute(
select(CharacterCareer, Character, Career)
.join(Character, CharacterCareer.character_id == Character.id)
.join(Career, CharacterCareer.career_id == Career.id)
.where(Character.project_id == project_id)
)
character_careers = result.all()
return [
CharacterCareerExportData(
character_name=char.name,
career_name=career.name,
career_type=cc.career_type,
current_stage=cc.current_stage or 1,
stage_progress=cc.stage_progress or 0,
started_at=cc.started_at,
reached_current_stage_at=cc.reached_current_stage_at,
notes=cc.notes
)
for cc, char, career in character_careers
]
@staticmethod
async def _export_story_memories(project_id: str, db: AsyncSession) -> List[StoryMemoryExportData]:
"""导出故事记忆"""
# 构建章节ID到标题的映射
chapter_result = await db.execute(
select(Chapter).where(Chapter.project_id == project_id)
)
chapters = chapter_result.scalars().all()
chapter_mapping = {ch.id: ch.title for ch in chapters}
# 构建角色ID到名称的映射
char_result = await db.execute(
select(Character).where(Character.project_id == project_id)
)
characters = char_result.scalars().all()
char_mapping = {char.id: char.name for char in characters}
result = await db.execute(
select(StoryMemory)
.where(StoryMemory.project_id == project_id)
.order_by(StoryMemory.story_timeline, StoryMemory.chapter_position)
)
memories = result.scalars().all()
exported = []
for mem in memories:
# 将角色ID列表转换为名称列表
related_char_names = None
if mem.related_characters:
related_char_names = [
char_mapping.get(char_id, char_id)
for char_id in mem.related_characters
]
exported.append(StoryMemoryExportData(
chapter_title=chapter_mapping.get(mem.chapter_id) if mem.chapter_id else None,
memory_type=mem.memory_type,
title=mem.title,
content=mem.content,
full_context=mem.full_context,
related_characters=related_char_names,
related_locations=mem.related_locations,
tags=mem.tags,
importance_score=mem.importance_score or 0.5,
story_timeline=mem.story_timeline,
chapter_position=mem.chapter_position or 0,
text_length=mem.text_length or 0,
is_foreshadow=mem.is_foreshadow or 0,
foreshadow_strength=mem.foreshadow_strength,
created_at=mem.created_at.isoformat() if mem.created_at else None
))
return exported
@staticmethod
async def _export_plot_analysis(project_id: str, db: AsyncSession) -> List[PlotAnalysisExportData]:
"""导出剧情分析"""
# 构建章节ID到标题的映射
chapter_result = await db.execute(
select(Chapter).where(Chapter.project_id == project_id)
)
chapters = chapter_result.scalars().all()
chapter_mapping = {ch.id: ch.title for ch in chapters}
result = await db.execute(
select(PlotAnalysis)
.where(PlotAnalysis.project_id == project_id)
)
analyses = result.scalars().all()
exported = []
for analysis in analyses:
chapter_title = chapter_mapping.get(analysis.chapter_id)
if not chapter_title:
continue # 跳过没有关联章节的分析
exported.append(PlotAnalysisExportData(
chapter_title=chapter_title,
plot_stage=analysis.plot_stage,
conflict_level=analysis.conflict_level,
conflict_types=analysis.conflict_types,
emotional_tone=analysis.emotional_tone,
emotional_intensity=analysis.emotional_intensity,
emotional_curve=analysis.emotional_curve,
hooks=analysis.hooks,
hooks_count=analysis.hooks_count or 0,
hooks_avg_strength=analysis.hooks_avg_strength,
foreshadows=analysis.foreshadows,
foreshadows_planted=analysis.foreshadows_planted or 0,
foreshadows_resolved=analysis.foreshadows_resolved or 0,
plot_points=analysis.plot_points,
plot_points_count=analysis.plot_points_count or 0,
character_states=analysis.character_states,
scenes=analysis.scenes,
pacing=analysis.pacing,
overall_quality_score=analysis.overall_quality_score,
pacing_score=analysis.pacing_score,
engagement_score=analysis.engagement_score,
coherence_score=analysis.coherence_score,
analysis_report=analysis.analysis_report,
suggestions=analysis.suggestions,
word_count=analysis.word_count,
dialogue_ratio=analysis.dialogue_ratio,
description_ratio=analysis.description_ratio,
created_at=analysis.created_at.isoformat() if analysis.created_at else None
))
return exported
@staticmethod
async def _export_project_default_style(project_id: str, db: AsyncSession) -> Optional[ProjectDefaultStyleExportData]:
"""导出项目默认风格"""
result = await db.execute(
select(ProjectDefaultStyle, WritingStyle)
.join(WritingStyle, ProjectDefaultStyle.style_id == WritingStyle.id)
.where(ProjectDefaultStyle.project_id == project_id)
)
row = result.first()
if row:
_, style = row
return ProjectDefaultStyleExportData(style_name=style.name)
return None
@staticmethod
def validate_import_data(data: Dict) -> ImportValidationResult:
"""
验证导入数据
Args:
data: 导入的JSON数据
Returns:
ImportValidationResult: 验证结果
"""
errors = []
warnings = []
statistics = {}
# 检查版本
version = data.get("version", "")
if not version:
errors.append("缺少版本信息")
elif version not in ImportExportService.SUPPORTED_VERSIONS:
warnings.append(f"版本不匹配: 导入文件版本为 {version}, 当前支持版本为 {', '.join(ImportExportService.SUPPORTED_VERSIONS)}")
# 检查必需字段
if "project" not in data:
errors.append("缺少项目信息")
else:
project = data["project"]
if not project.get("title"):
errors.append("项目标题不能为空")
# 统计数据(包含新增字段)
statistics = {
"chapters": len(data.get("chapters", [])),
"characters": len(data.get("characters", [])),
"outlines": len(data.get("outlines", [])),
"relationships": len(data.get("relationships", [])),
"organizations": len(data.get("organizations", [])),
"organization_members": len(data.get("organization_members", [])),
"writing_styles": len(data.get("writing_styles", [])),
"generation_history": len(data.get("generation_history", [])),
"careers": len(data.get("careers", [])),
"character_careers": len(data.get("character_careers", [])),
"story_memories": len(data.get("story_memories", [])),
"plot_analysis": len(data.get("plot_analysis", [])),
"has_default_style": data.get("project_default_style") is not None
}
# 检查数据完整性
if statistics["chapters"] == 0:
warnings.append("项目没有章节数据")
if statistics["characters"] == 0:
warnings.append("项目没有角色数据")
project_name = data.get("project", {}).get("title", "未知项目")
return ImportValidationResult(
valid=len(errors) == 0,
version=version,
project_name=project_name,
statistics=statistics,
errors=errors,
warnings=warnings
)
@staticmethod
async def import_project(
data: Dict,
db: AsyncSession,
user_id: str
) -> ImportResult:
"""
导入项目数据(创建新项目)
Args:
data: 导入的JSON数据
db: 数据库会话
user_id: 目标用户ID(导入后的项目归属)
Returns:
ImportResult: 导入结果
"""
warnings = []
statistics = {}
try:
# 验证数据
validation = ImportExportService.validate_import_data(data)
if not validation.valid:
return ImportResult(
success=False,
message=f"数据验证失败: {', '.join(validation.errors)}",
statistics={},
warnings=validation.warnings
)
warnings.extend(validation.warnings)
logger.info(f"开始导入项目: {validation.project_name}")
# 创建项目
project_data = data["project"]
new_project = Project(
user_id=user_id, # 设置为当前用户ID
title=project_data.get("title"),
description=project_data.get("description"),
theme=project_data.get("theme"),
genre=project_data.get("genre"),
target_words=project_data.get("target_words"),
status=project_data.get("status", "planning"),
world_time_period=project_data.get("world_time_period"),
world_location=project_data.get("world_location"),
world_atmosphere=project_data.get("world_atmosphere"),
world_rules=project_data.get("world_rules"),
chapter_count=project_data.get("chapter_count"),
narrative_perspective=project_data.get("narrative_perspective"),
character_count=project_data.get("character_count"),
outline_mode=project_data.get("outline_mode", "one-to-many"), # ✅ 导入大纲模式,默认为一对多
current_words=project_data.get("current_words", 0), # 保留原项目的字数
wizard_step=4, # 导入的项目设置为向导完成状态
wizard_status="completed" # 标记向导已完成
)
db.add(new_project)
await db.flush() # 获取project_id
logger.info(f"创建项目成功: {new_project.id}")
# 导入角色(包括组织)- 需要先导入角色,因为大纲可能需要角色信息
char_mapping = await ImportExportService._import_characters(
new_project.id, data.get("characters", []), db
)
statistics["characters"] = len(char_mapping)
logger.info(f"导入角色数: {len(char_mapping)}")
# 导入大纲 - 需要在章节之前导入,以便建立关联
outline_mapping = await ImportExportService._import_outlines(
new_project.id, data.get("outlines", []), db
)
statistics["outlines"] = len(outline_mapping)
logger.info(f"导入大纲数: {len(outline_mapping)}")
# 导入章节 - 使用大纲映射重建关联关系
chapters_count = await ImportExportService._import_chapters(
new_project.id, data.get("chapters", []), outline_mapping, db
)
statistics["chapters"] = chapters_count
logger.info(f"导入章节数: {chapters_count}")
# 导入关系
relationships_count = await ImportExportService._import_relationships(
new_project.id, data.get("relationships", []), char_mapping, db
)
statistics["relationships"] = relationships_count
logger.info(f"导入关系数: {relationships_count}")
# 导入组织详情
org_mapping = await ImportExportService._import_organizations(
new_project.id, data.get("organizations", []), char_mapping, db
)
statistics["organizations"] = len(org_mapping)
logger.info(f"导入组织数: {len(org_mapping)}")
# 导入组织成员
org_members_count = await ImportExportService._import_organization_members(
data.get("organization_members", []), char_mapping, org_mapping, db
)
statistics["organization_members"] = org_members_count
logger.info(f"导入组织成员数: {org_members_count}")
# 导入写作风格
styles_count = await ImportExportService._import_writing_styles(
new_project.id, data.get("writing_styles", []), db
)
statistics["writing_styles"] = styles_count
logger.info(f"导入写作风格数: {styles_count}")
# 导入职业系统
career_mapping = await ImportExportService._import_careers(
new_project.id, data.get("careers", []), db
)
statistics["careers"] = len(career_mapping)
logger.info(f"导入职业数: {len(career_mapping)}")
# 导入角色职业关联
char_careers_count = await ImportExportService._import_character_careers(
data.get("character_careers", []), char_mapping, career_mapping, db
)
statistics["character_careers"] = char_careers_count
logger.info(f"导入角色职业关联数: {char_careers_count}")
# 导入故事记忆
# 需要先构建章节标题到ID的映射(使用章节号+标题组合确保唯一性)
chapter_title_to_id = {}
chapter_result = await db.execute(
select(Chapter).where(Chapter.project_id == new_project.id)
)
imported_chapters = chapter_result.scalars().all()
for ch in imported_chapters:
# 使用标题作为key,如果有重复标题则取第一个(已导入的顺序)
if ch.title and ch.title not in chapter_title_to_id:
chapter_title_to_id[ch.title] = ch.id
memories_count = await ImportExportService._import_story_memories(
new_project.id, data.get("story_memories", []), chapter_title_to_id, char_mapping, db
)
statistics["story_memories"] = memories_count
logger.info(f"导入故事记忆数: {memories_count}")
# 导入剧情分析(传入user_id以便创建分析任务记录)
plot_analysis_count = await ImportExportService._import_plot_analysis(
new_project.id, data.get("plot_analysis", []), chapter_title_to_id, db, user_id
)
statistics["plot_analysis"] = plot_analysis_count
logger.info(f"导入剧情分析数: {plot_analysis_count}")
# 导入项目默认风格
default_style_imported = await ImportExportService._import_project_default_style(
new_project.id, data.get("project_default_style"), db
)
statistics["project_default_style"] = 1 if default_style_imported else 0
if default_style_imported:
logger.info("导入项目默认风格成功")
# 提交事务
await db.commit()
logger.info(f"项目导入完成: {new_project.id}")
return ImportResult(
success=True,
project_id=new_project.id,
message="项目导入成功",
statistics=statistics,
warnings=warnings
)
except Exception as e:
await db.rollback()
logger.error(f"导入项目失败: {str(e)}", exc_info=True)
return ImportResult(
success=False,
message=f"导入失败: {str(e)}",
statistics=statistics,
warnings=warnings
)
@staticmethod
async def _import_chapters(
project_id: str,
chapters_data: List[Dict],
outline_mapping: Dict[str, str],
db: AsyncSession
) -> int:
"""导入章节"""
count = 0
for ch_data in chapters_data:
# 根据大纲标题查找对应的新大纲ID
outline_id = None
outline_title = ch_data.get("outline_title")
if outline_title and outline_title in outline_mapping:
outline_id = outline_mapping[outline_title]
# 处理expansion_plan
expansion_plan = ch_data.get("expansion_plan")
if expansion_plan and isinstance(expansion_plan, dict):
expansion_plan = json.dumps(expansion_plan, ensure_ascii=False)
chapter = Chapter(
project_id=project_id,
title=ch_data.get("title"),
content=ch_data.get("content"),
summary=ch_data.get("summary"),
chapter_number=ch_data.get("chapter_number"),
word_count=ch_data.get("word_count", 0),
status=ch_data.get("status", "draft"),
outline_id=outline_id,
sub_index=ch_data.get("sub_index"),
expansion_plan=expansion_plan
)
db.add(chapter)
count += 1
return count
@staticmethod
async def _import_characters(
project_id: str,
characters_data: List[Dict],
db: AsyncSession
) -> Dict[str, str]:
"""导入角色,返回名称到ID的映射"""
char_mapping = {}
for char_data in characters_data:
# 处理traits
traits = char_data.get("traits")
if isinstance(traits, list):
traits = json.dumps(traits, ensure_ascii=False)
character = Character(
project_id=project_id,
name=char_data.get("name"),
age=char_data.get("age"),
gender=char_data.get("gender"),
is_organization=char_data.get("is_organization", False),
role_type=char_data.get("role_type"),
personality=char_data.get("personality"),
background=char_data.get("background"),
appearance=char_data.get("appearance"),
traits=traits,
organization_type=char_data.get("organization_type"),
organization_purpose=char_data.get("organization_purpose")
)
db.add(character)
await db.flush() # 获取ID
char_mapping[char_data.get("name")] = character.id
return char_mapping
@staticmethod
async def _import_outlines(
project_id: str,
outlines_data: List[Dict],
db: AsyncSession
) -> Dict[str, str]:
"""导入大纲,返回标题到ID的映射"""
outline_mapping = {}
for ol_data in outlines_data:
outline = Outline(
project_id=project_id,
title=ol_data.get("title"),
content=ol_data.get("content"),
structure=ol_data.get("structure"),
order_index=ol_data.get("order_index")
)
db.add(outline)
await db.flush() # 获取ID
outline_mapping[ol_data.get("title")] = outline.id
return outline_mapping
@staticmethod
async def _import_relationships(
project_id: str,
relationships_data: List[Dict],
char_mapping: Dict[str, str],
db: AsyncSession
) -> int:
"""导入关系"""
count = 0
for rel_data in relationships_data:
source_name = rel_data.get("source_name")
target_name = rel_data.get("target_name")
# 查找角色ID
source_id = char_mapping.get(source_name)
target_id = char_mapping.get(target_name)
if source_id and target_id:
relationship = CharacterRelationship(
project_id=project_id,
character_from_id=source_id,
character_to_id=target_id,
relationship_name=rel_data.get("relationship_name"),
intimacy_level=rel_data.get("intimacy_level", 50),
status=rel_data.get("status", "active"),
description=rel_data.get("description"),
started_at=rel_data.get("started_at")
)
db.add(relationship)
count += 1
return count
@staticmethod
async def _import_organizations(
project_id: str,
organizations_data: List[Dict],
char_mapping: Dict[str, str],
db: AsyncSession
) -> Dict[str, str]:
"""导入组织详情,返回名称到ID的映射"""
org_mapping = {}
# 第一遍:创建所有组织(不设置父组织)
temp_orgs = []
for org_data in organizations_data:
char_name = org_data.get("character_name")
char_id = char_mapping.get(char_name)
if char_id:
organization = Organization(
project_id=project_id,
character_id=char_id,
power_level=org_data.get("power_level", 50),
member_count=org_data.get("member_count", 0),
location=org_data.get("location"),
motto=org_data.get("motto"),
color=org_data.get("color")
)
db.add(organization)
temp_orgs.append((organization, org_data.get("parent_org_name")))
await db.flush() # 获取所有组织的ID
# 建立名称到ID的映射
for org, _ in temp_orgs:
# 通过character_id查找角色名
result = await db.execute(
select(Character).where(Character.id == org.character_id)
)
char = result.scalar_one_or_none()
if char:
org_mapping[char.name] = org.id
# 第二遍:设置父组织关系
for org, parent_name in temp_orgs:
if parent_name:
parent_id = org_mapping.get(parent_name)
if parent_id:
org.parent_org_id = parent_id
return org_mapping
@staticmethod
async def _import_organization_members(
org_members_data: List[Dict],
char_mapping: Dict[str, str],
org_mapping: Dict[str, str],
db: AsyncSession
) -> int:
"""导入组织成员"""
count = 0
for member_data in org_members_data:
org_name = member_data.get("organization_name")
char_name = member_data.get("character_name")
org_id = org_mapping.get(org_name)
char_id = char_mapping.get(char_name)
if org_id and char_id:
member = OrganizationMember(
organization_id=org_id,
character_id=char_id,
position=member_data.get("position"),
rank=member_data.get("rank", 0),
status=member_data.get("status", "active"),
joined_at=member_data.get("joined_at"),
loyalty=member_data.get("loyalty", 50),
contribution=member_data.get("contribution", 0),
notes=member_data.get("notes")
)
db.add(member)
count += 1
return count
@staticmethod
async def _import_writing_styles(
project_id: str,
styles_data: List[Dict],
db: AsyncSession
) -> int:
"""导入写作风格(用户自定义风格)"""
# 获取项目所属用户
project_result = await db.execute(
select(Project).where(Project.id == project_id)
)
project = project_result.scalar_one_or_none()
if not project:
return 0
count = 0
for style_data in styles_data:
# 检查是否已存在同名风格(避免重复导入)
existing = await db.execute(
select(WritingStyle).where(
WritingStyle.user_id == project.user_id,
WritingStyle.name == style_data.get("name")
)
)
# 使用 first() 避免多行时报错
if existing.first():
logger.debug(f"风格 {style_data.get('name')} 已存在,跳过导入")
continue
style = WritingStyle(
user_id=project.user_id, # 使用 user_id 而不是 project_id
name=style_data.get("name"),
style_type=style_data.get("style_type"),
preset_id=style_data.get("preset_id"),
description=style_data.get("description"),
prompt_content=style_data.get("prompt_content"),
order_index=style_data.get("order_index", 0)
)
db.add(style)
count += 1
return count
@staticmethod
async def _import_careers(
project_id: str,
careers_data: List[Dict],
db: AsyncSession
) -> Dict[str, str]:
"""导入职业,返回名称到ID的映射"""
career_mapping = {}
for career_data in careers_data:
career = Career(
project_id=project_id,
name=career_data.get("name"),
type=career_data.get("type", "main"),
description=career_data.get("description"),
category=career_data.get("category"),
stages=career_data.get("stages", "[]"),
max_stage=career_data.get("max_stage", 10),
requirements=career_data.get("requirements"),
special_abilities=career_data.get("special_abilities"),
worldview_rules=career_data.get("worldview_rules"),
attribute_bonuses=career_data.get("attribute_bonuses"),
source=career_data.get("source", "ai")
)
db.add(career)
await db.flush()
career_mapping[career_data.get("name")] = career.id
return career_mapping
@staticmethod
async def _import_character_careers(
character_careers_data: List[Dict],
char_mapping: Dict[str, str],
career_mapping: Dict[str, str],
db: AsyncSession
) -> int:
"""导入角色职业关联"""
count = 0
for cc_data in character_careers_data:
char_name = cc_data.get("character_name")
career_name = cc_data.get("career_name")
char_id = char_mapping.get(char_name)
career_id = career_mapping.get(career_name)
if char_id and career_id:
# 检查是否已存在(使用 first() 避免多行时报错)
existing = await db.execute(
select(CharacterCareer).where(
CharacterCareer.character_id == char_id,
CharacterCareer.career_id == career_id
)
)
if existing.first():
continue
char_career = CharacterCareer(
character_id=char_id,
career_id=career_id,
career_type=cc_data.get("career_type", "main"),
current_stage=cc_data.get("current_stage", 1),
stage_progress=cc_data.get("stage_progress", 0),
started_at=cc_data.get("started_at"),
reached_current_stage_at=cc_data.get("reached_current_stage_at"),
notes=cc_data.get("notes")
)
db.add(char_career)
count += 1
# 同时更新角色的主职业信息
if cc_data.get("career_type") == "main":
char_result = await db.execute(
select(Character).where(Character.id == char_id)
)
char = char_result.scalar_one_or_none()
if char:
char.main_career_id = career_id
char.main_career_stage = cc_data.get("current_stage", 1)
return count
@staticmethod
async def _import_story_memories(
project_id: str,
memories_data: List[Dict],
chapter_mapping: Dict[str, str],
char_mapping: Dict[str, str],
db: AsyncSession
) -> int:
"""导入故事记忆"""
count = 0
for mem_data in memories_data:
# 将章节标题转换为ID
chapter_id = None
chapter_title = mem_data.get("chapter_title")
if chapter_title and chapter_title in chapter_mapping:
chapter_id = chapter_mapping[chapter_title]
# 将角色名称列表转换为ID列表
related_char_ids = None
related_char_names = mem_data.get("related_characters")
if related_char_names:
related_char_ids = [
char_mapping.get(name)
for name in related_char_names
if char_mapping.get(name)
]
memory = StoryMemory(
project_id=project_id,
chapter_id=chapter_id,
memory_type=mem_data.get("memory_type"),
title=mem_data.get("title"),
content=mem_data.get("content"),
full_context=mem_data.get("full_context"),
related_characters=related_char_ids,
related_locations=mem_data.get("related_locations"),
tags=mem_data.get("tags"),
importance_score=mem_data.get("importance_score", 0.5),
story_timeline=mem_data.get("story_timeline", 0),
chapter_position=mem_data.get("chapter_position", 0),
text_length=mem_data.get("text_length", 0),
is_foreshadow=mem_data.get("is_foreshadow", 0),
foreshadow_strength=mem_data.get("foreshadow_strength")
)
db.add(memory)
count += 1
return count
@staticmethod
async def _import_plot_analysis(
project_id: str,
plot_data: List[Dict],
chapter_mapping: Dict[str, str],
db: AsyncSession,
user_id: str = None
) -> int:
"""导入剧情分析,同时创建已完成的分析任务记录"""
from datetime import datetime
count = 0
for analysis_data in plot_data:
chapter_title = analysis_data.get("chapter_title")
chapter_id = chapter_mapping.get(chapter_title)
if not chapter_id:
continue # 跳过找不到章节的分析
# 检查是否已存在该章节的分析(使用 first() 避免多行时报错)
existing = await db.execute(
select(PlotAnalysis).where(PlotAnalysis.chapter_id == chapter_id)
)
if existing.first():
continue
analysis = PlotAnalysis(
project_id=project_id,
chapter_id=chapter_id,
plot_stage=analysis_data.get("plot_stage"),
conflict_level=analysis_data.get("conflict_level"),
conflict_types=analysis_data.get("conflict_types"),
emotional_tone=analysis_data.get("emotional_tone"),
emotional_intensity=analysis_data.get("emotional_intensity"),
emotional_curve=analysis_data.get("emotional_curve"),
hooks=analysis_data.get("hooks"),
hooks_count=analysis_data.get("hooks_count", 0),
hooks_avg_strength=analysis_data.get("hooks_avg_strength"),
foreshadows=analysis_data.get("foreshadows"),
foreshadows_planted=analysis_data.get("foreshadows_planted", 0),
foreshadows_resolved=analysis_data.get("foreshadows_resolved", 0),
plot_points=analysis_data.get("plot_points"),
plot_points_count=analysis_data.get("plot_points_count", 0),
character_states=analysis_data.get("character_states"),
scenes=analysis_data.get("scenes"),
pacing=analysis_data.get("pacing"),
overall_quality_score=analysis_data.get("overall_quality_score"),
pacing_score=analysis_data.get("pacing_score"),
engagement_score=analysis_data.get("engagement_score"),
coherence_score=analysis_data.get("coherence_score"),
analysis_report=analysis_data.get("analysis_report"),
suggestions=analysis_data.get("suggestions"),
word_count=analysis_data.get("word_count"),
dialogue_ratio=analysis_data.get("dialogue_ratio"),
description_ratio=analysis_data.get("description_ratio")
)
db.add(analysis)
# 同时创建已完成的分析任务记录,这样章节管理页面会显示"已分析"状态
if user_id:
now = datetime.utcnow()
analysis_task = AnalysisTask(
chapter_id=chapter_id,
user_id=user_id,
project_id=project_id,
status='completed',
progress=100,
started_at=now,
completed_at=now
)
db.add(analysis_task)
count += 1
return count
@staticmethod
async def _import_project_default_style(
project_id: str,
default_style_data: Optional[Dict],
db: AsyncSession
) -> bool:
"""导入项目默认风格"""
if not default_style_data:
return False
style_name = default_style_data.get("style_name")
if not style_name:
return False
# 获取项目所属用户
project_result = await db.execute(
select(Project).where(Project.id == project_id)
)
project = project_result.scalar_one_or_none()
if not project:
return False
# 查找对应的风格(优先查找用户自定义风格,然后是全局预设风格)
# 先查用户自定义风格(使用 first() 避免多行时报错)
style_result = await db.execute(
select(WritingStyle).where(
WritingStyle.user_id == project.user_id,
WritingStyle.name == style_name
)
)
style_row = style_result.first()
style = style_row[0] if style_row else None
# 如果用户自定义风格不存在,查找全局预设风格
if not style:
style_result = await db.execute(
select(WritingStyle).where(
WritingStyle.user_id.is_(None),
WritingStyle.name == style_name
)
)
style_row = style_result.first()
style = style_row[0] if style_row else None
if not style:
logger.warning(f"导入项目默认风格时未找到风格: {style_name}")
return False
# 创建项目默认风格关联
default_style = ProjectDefaultStyle(
project_id=project_id,
style_id=style.id
)
db.add(default_style)
logger.info(f"项目默认风格导入成功: {style_name}, style_id={style.id}")
return True
@staticmethod
async def export_characters(
character_ids: List[str],
db: AsyncSession
) -> Dict[str, Any]:
"""
导出角色/组织卡片
Args:
character_ids: 要导出的角色/组织ID列表
db: 数据库会话
Returns:
Dict: 导出的角色数据
"""
logger.info(f"开始导出角色/组织: {len(character_ids)}")
# 查询角色数据
result = await db.execute(
select(Character).where(Character.id.in_(character_ids))
)
characters = result.scalars().all()
if not characters:
raise ValueError("未找到指定的角色/组织")
# 导出角色数据
exported_characters = []
for char in characters:
# 解析 traits
traits = None
if char.traits:
try:
traits = json.loads(char.traits) if isinstance(char.traits, str) else char.traits
except Exception:
traits = None
# 基础角色数据
char_data = {
"name": char.name,
"age": char.age,
"gender": char.gender,
"is_organization": char.is_organization or False,
"role_type": char.role_type,
"personality": char.personality,
"background": char.background,
"appearance": char.appearance,
"traits": traits,
"organization_type": char.organization_type,
"organization_purpose": char.organization_purpose,
"avatar_url": char.avatar_url,
"main_career_id": char.main_career_id,
"main_career_stage": char.main_career_stage,
"sub_careers": char.sub_careers,
"created_at": char.created_at.isoformat() if char.created_at else None
}
# 如果是组织,添加组织专属字段
if char.is_organization:
org_result = await db.execute(
select(Organization).where(Organization.character_id == char.id)
)
org = org_result.scalar_one_or_none()
if org:
char_data.update({
"power_level": org.power_level,
"location": org.location,
"motto": org.motto,
"color": org.color
})
# 从 OrganizationMember 表导出结构化成员数据
members_result = await db.execute(
select(OrganizationMember).where(OrganizationMember.organization_id == org.id)
)
members = members_result.scalars().all()
if members:
char_data["organization_members_data"] = [
{
"character_id": m.character_id,
"position": m.position,
"rank": m.rank,
"loyalty": m.loyalty,
"contribution": m.contribution,
"status": m.status,
"joined_at": m.joined_at,
"source": m.source
}
for m in members
]
exported_characters.append(char_data)
export_data = {
"version": ImportExportService.CURRENT_VERSION,
"export_time": datetime.utcnow().isoformat(),
"export_type": "characters",
"count": len(exported_characters),
"data": exported_characters
}
logger.info(f"角色/组织导出完成: {len(exported_characters)}")
return export_data
@staticmethod
async def import_characters(
data: Dict,
project_id: str,
user_id: str,
db: AsyncSession
) -> Dict[str, Any]:
"""
导入角色/组织卡片
Args:
data: 导入的JSON数据
project_id: 目标项目ID
user_id: 用户ID
db: 数据库会话
Returns:
Dict: 导入结果
"""
from app.models.career import CharacterCareer, Career
warnings = []
imported_characters = []
imported_organizations = []
skipped = []
errors = []
try:
# 验证数据格式
if "data" not in data:
raise ValueError("导入数据格式错误:缺少data字段")
characters_data = data["data"]
if not isinstance(characters_data, list):
raise ValueError("导入数据格式错误:data字段必须是数组")
# 验证项目权限
project_result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.user_id == user_id
)
)
project = project_result.scalar_one_or_none()
if not project:
raise ValueError("项目不存在或无权访问")
logger.info(f"开始导入 {len(characters_data)} 个角色/组织到项目 {project_id}")
# 处理每个角色/组织
for idx, char_data in enumerate(characters_data):
try:
name = char_data.get("name")
if not name:
errors.append(f"{idx+1}个角色缺少name字段")
continue
# 检查重复名称(使用 first() 避免多行时报错)
existing_result = await db.execute(
select(Character).where(
Character.project_id == project_id,
Character.name == name
)
)
existing = existing_result.first()
if existing:
warnings.append(f"角色'{name}'已存在,已跳过")
skipped.append(name)
continue
# 处理traits
traits = char_data.get("traits")
if isinstance(traits, list):
traits = json.dumps(traits, ensure_ascii=False)
is_organization = char_data.get("is_organization", False)
# 创建角色
character = Character(
project_id=project_id,
name=name,
age=char_data.get("age"),
gender=char_data.get("gender"),
is_organization=is_organization,
role_type=char_data.get("role_type"),
personality=char_data.get("personality"),
background=char_data.get("background"),
appearance=char_data.get("appearance"),
traits=traits,
organization_type=char_data.get("organization_type"),
organization_purpose=char_data.get("organization_purpose"),
avatar_url=char_data.get("avatar_url"),
main_career_id=None, # 职业ID需要验证后再设置
main_career_stage=char_data.get("main_career_stage"),
sub_careers=None # 副职业需要验证后再设置
)
db.add(character)
await db.flush() # 获取character.id
# 处理主职业(如果有)
main_career_id = char_data.get("main_career_id")
main_career_stage = char_data.get("main_career_stage")
if main_career_id and not is_organization:
# 验证职业是否存在
career_result = await db.execute(
select(Career).where(
Career.id == main_career_id,
Career.project_id == project_id,
Career.type == 'main'
)
)
career = career_result.scalar_one_or_none()
if career:
character.main_career_id = main_career_id
character.main_career_stage = main_career_stage or 1
# 创建职业关联
char_career = CharacterCareer(
character_id=character.id,
career_id=main_career_id,
career_type='main',
current_stage=main_career_stage or 1,
stage_progress=0
)
db.add(char_career)
else:
warnings.append(f"角色'{name}'的主职业ID不存在,已忽略职业信息")
# 处理副职业(如果有)
sub_careers = char_data.get("sub_careers")
if sub_careers and not is_organization:
try:
sub_careers_data = json.loads(sub_careers) if isinstance(sub_careers, str) else sub_careers
if isinstance(sub_careers_data, list):
valid_sub_careers = []
for sub_data in sub_careers_data[:2]: # 最多2个副职业
if isinstance(sub_data, dict):
career_id = sub_data.get('career_id')
stage = sub_data.get('stage', 1)
if career_id:
# 验证副职业是否存在
career_result = await db.execute(
select(Career).where(
Career.id == career_id,
Career.project_id == project_id,
Career.type == 'sub'
)
)
career = career_result.scalar_one_or_none()
if career:
valid_sub_careers.append({
'career_id': career_id,
'stage': stage
})
# 创建副职业关联
char_career = CharacterCareer(
character_id=character.id,
career_id=career_id,
career_type='sub',
current_stage=stage,
stage_progress=0
)
db.add(char_career)
if valid_sub_careers:
character.sub_careers = json.dumps(valid_sub_careers, ensure_ascii=False)
elif sub_careers_data:
warnings.append(f"角色'{name}'的副职业ID不存在,已忽略副职业信息")
except Exception as e:
warnings.append(f"角色'{name}'的副职业数据解析失败: {str(e)}")
# 如果是组织,创建Organization记录
if is_organization:
organization = Organization(
character_id=character.id,
project_id=project_id,
member_count=0,
power_level=char_data.get("power_level", 50),
location=char_data.get("location"),
motto=char_data.get("motto"),
color=char_data.get("color")
)
db.add(organization)
await db.flush()
# 导入组织成员数据(如果有)
members_data = char_data.get("organization_members_data", [])
if members_data and isinstance(members_data, list):
imported_member_count = 0
for m_data in members_data:
try:
member_char_id = m_data.get("character_id")
if not member_char_id:
continue
# 验证成员角色是否存在于目标项目
member_char_result = await db.execute(
select(Character).where(
Character.id == member_char_id,
Character.project_id == project_id
)
)
if member_char_result.scalar_one_or_none():
member = OrganizationMember(
organization_id=organization.id,
character_id=member_char_id,
position=m_data.get("position", "成员"),
rank=m_data.get("rank", 0),
loyalty=m_data.get("loyalty", 50),
contribution=m_data.get("contribution", 0),
status=m_data.get("status", "active"),
joined_at=m_data.get("joined_at"),
source=m_data.get("source", "imported")
)
db.add(member)
imported_member_count += 1
except Exception as me:
logger.warning(f"导入组织成员失败: {str(me)}")
if imported_member_count > 0:
organization.member_count = imported_member_count
logger.info(f"导入组织'{name}'{imported_member_count} 个成员")
imported_organizations.append(name)
else:
imported_characters.append(name)
logger.info(f"导入{'组织' if is_organization else '角色'}成功: {name}")
except Exception as e:
error_msg = f"导入角色'{char_data.get('name', f'{idx+1}')}'失败: {str(e)}"
logger.error(error_msg)
errors.append(error_msg)
continue
# 提交事务
await db.commit()
total = len(imported_characters) + len(imported_organizations)
result = {
"success": True,
"message": f"成功导入 {total} 个角色/组织",
"statistics": {
"total": len(characters_data),
"imported": total,
"skipped": len(skipped),
"errors": len(errors)
},
"details": {
"imported_characters": imported_characters,
"imported_organizations": imported_organizations,
"skipped": skipped,
"errors": errors
},
"warnings": warnings
}
logger.info(f"角色/组织导入完成: 成功{total}个,跳过{len(skipped)}个,失败{len(errors)}")
return result
except Exception as e:
await db.rollback()
logger.error(f"导入角色/组织失败: {str(e)}", exc_info=True)
return {
"success": False,
"message": f"导入失败: {str(e)}",
"statistics": {
"total": len(characters_data) if "data" in data else 0,
"imported": len(imported_characters) + len(imported_organizations),
"skipped": len(skipped),
"errors": len(errors)
},
"details": {
"imported_characters": imported_characters,
"imported_organizations": imported_organizations,
"skipped": skipped,
"errors": errors
},
"warnings": warnings
}
@staticmethod
def validate_characters_import(data: Dict) -> Dict[str, Any]:
"""
验证角色/组织导入数据
Args:
data: 导入的JSON数据
Returns:
Dict: 验证结果
"""
errors = []
warnings = []
# 检查版本
version = data.get("version", "")
if not version:
errors.append("缺少版本信息")
elif version not in ImportExportService.SUPPORTED_VERSIONS:
warnings.append(f"版本不匹配: 导入文件版本为 {version}, 当前支持版本为 {', '.join(ImportExportService.SUPPORTED_VERSIONS)}")
# 检查导出类型
export_type = data.get("export_type", "")
if export_type != "characters":
errors.append(f"导出类型错误: 期望'characters',实际'{export_type}'")
# 检查数据字段
if "data" not in data:
errors.append("缺少data字段")
elif not isinstance(data["data"], list):
errors.append("data字段必须是数组")
else:
characters_data = data["data"]
# 统计信息
character_count = sum(1 for c in characters_data if not c.get("is_organization", False))
org_count = sum(1 for c in characters_data if c.get("is_organization", False))
# 检查必填字段
for idx, char_data in enumerate(characters_data):
if not char_data.get("name"):
errors.append(f"{idx+1}个角色缺少name字段")
statistics = {
"characters": character_count,
"organizations": org_count
}
if "data" not in data or errors:
statistics = {"characters": 0, "organizations": 0}
return {
"valid": len(errors) == 0,
"version": version,
"statistics": statistics,
"errors": errors,
"warnings": warnings
}