"""导入导出服务""" import json from datetime import datetime from typing import Dict, List, Optional, Tuple, Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, or_ from app.models.project import Project from app.models.chapter import Chapter from app.models.character import Character from app.models.outline import Outline from app.models.relationship import CharacterRelationship, Organization, OrganizationMember from app.models.writing_style import WritingStyle from app.models.generation_history import GenerationHistory from app.models.career import Career, CharacterCareer from app.models.memory import StoryMemory, PlotAnalysis from app.models.analysis_task import AnalysisTask from app.models.project_default_style import ProjectDefaultStyle from app.schemas.import_export import ( ProjectExportData, ChapterExportData, CharacterExportData, OutlineExportData, RelationshipExportData, OrganizationExportData, OrganizationMemberExportData, WritingStyleExportData, GenerationHistoryExportData, CareerExportData, CharacterCareerExportData, StoryMemoryExportData, PlotAnalysisExportData, ProjectDefaultStyleExportData, ImportValidationResult, ImportResult ) from app.logger import get_logger logger = get_logger(__name__) class ImportExportService: """导入导出服务类""" SUPPORTED_VERSIONS = ["1.0.0", "1.1.0"] # 支持的版本列表 CURRENT_VERSION = "1.1.0" # 当前导出版本 @staticmethod async def export_project( project_id: str, db: AsyncSession, include_generation_history: bool = False, include_writing_styles: bool = True, include_careers: bool = True, include_memories: bool = False, include_plot_analysis: bool = False ) -> ProjectExportData: """ 导出项目完整数据 Args: project_id: 项目ID db: 数据库会话 include_generation_history: 是否包含生成历史 include_writing_styles: 是否包含写作风格 include_careers: 是否包含职业系统 include_memories: 是否包含故事记忆 include_plot_analysis: 是否包含剧情分析 Returns: ProjectExportData: 导出的项目数据 """ logger.info(f"开始导出项目: {project_id}") # 获取项目基本信息 result = await db.execute(select(Project).where(Project.id == project_id)) project = result.scalar_one_or_none() if not project: raise ValueError(f"项目不存在: {project_id}") # 项目基本信息 project_data = { "title": project.title, "description": project.description, "theme": project.theme, "genre": project.genre, "target_words": project.target_words, "current_words": project.current_words, "status": project.status, "world_time_period": project.world_time_period, "world_location": project.world_location, "world_atmosphere": project.world_atmosphere, "world_rules": project.world_rules, "chapter_count": project.chapter_count, "narrative_perspective": project.narrative_perspective, "character_count": project.character_count, "outline_mode": project.outline_mode, "user_id": project.user_id, "created_at": project.created_at.isoformat() if project.created_at else None, } # 导出章节 chapters = await ImportExportService._export_chapters(project_id, db) logger.info(f"导出章节数: {len(chapters)}") # 导出角色 characters = await ImportExportService._export_characters(project_id, db) logger.info(f"导出角色数: {len(characters)}") # 导出大纲 outlines = await ImportExportService._export_outlines(project_id, db) logger.info(f"导出大纲数: {len(outlines)}") # 导出关系 relationships = await ImportExportService._export_relationships(project_id, db) logger.info(f"导出关系数: {len(relationships)}") # 导出组织详情 organizations = await ImportExportService._export_organizations(project_id, db) logger.info(f"导出组织数: {len(organizations)}") # 导出组织成员 org_members = await ImportExportService._export_organization_members(project_id, db) logger.info(f"导出组织成员数: {len(org_members)}") # 导出写作风格(可选) writing_styles = [] if include_writing_styles: writing_styles = await ImportExportService._export_writing_styles(project_id, db) logger.info(f"导出写作风格数: {len(writing_styles)}") # 导出生成历史(可选) generation_history = [] if include_generation_history: generation_history = await ImportExportService._export_generation_history(project_id, db) logger.info(f"导出生成历史数: {len(generation_history)}") # 导出职业系统(可选) careers = [] character_careers = [] if include_careers: careers = await ImportExportService._export_careers(project_id, db) logger.info(f"导出职业数: {len(careers)}") character_careers = await ImportExportService._export_character_careers(project_id, db) logger.info(f"导出角色职业关联数: {len(character_careers)}") # 导出故事记忆(可选) story_memories = [] if include_memories: story_memories = await ImportExportService._export_story_memories(project_id, db) logger.info(f"导出故事记忆数: {len(story_memories)}") # 导出剧情分析(可选) plot_analysis = [] if include_plot_analysis: plot_analysis = await ImportExportService._export_plot_analysis(project_id, db) logger.info(f"导出剧情分析数: {len(plot_analysis)}") # 导出项目默认风格 project_default_style = await ImportExportService._export_project_default_style(project_id, db) if project_default_style: logger.info(f"导出项目默认风格: {project_default_style.style_name}") export_data = ProjectExportData( version=ImportExportService.CURRENT_VERSION, export_time=datetime.utcnow().isoformat(), project=project_data, chapters=chapters, characters=characters, outlines=outlines, relationships=relationships, organizations=organizations, organization_members=org_members, writing_styles=writing_styles, generation_history=generation_history, careers=careers, character_careers=character_careers, story_memories=story_memories, plot_analysis=plot_analysis, project_default_style=project_default_style ) logger.info(f"项目导出完成: {project_id}") return export_data @staticmethod async def _export_chapters(project_id: str, db: AsyncSession) -> List[ChapterExportData]: """导出章节""" result = await db.execute( select(Chapter) .where(Chapter.project_id == project_id) .order_by(Chapter.chapter_number) ) chapters = result.scalars().all() # 构建大纲ID到标题的映射 outline_mapping = {} if chapters: outline_ids = [ch.outline_id for ch in chapters if ch.outline_id] if outline_ids: outline_result = await db.execute( select(Outline).where(Outline.id.in_(outline_ids)) ) outlines = outline_result.scalars().all() outline_mapping = {ol.id: ol.title for ol in outlines} exported_chapters = [] for ch in chapters: # 解析expansion_plan JSON expansion_plan = None if ch.expansion_plan: try: expansion_plan = json.loads(ch.expansion_plan) if isinstance(ch.expansion_plan, str) else ch.expansion_plan except Exception: expansion_plan = None exported_chapters.append(ChapterExportData( title=ch.title, content=ch.content, summary=ch.summary, chapter_number=ch.chapter_number, word_count=ch.word_count or 0, status=ch.status, created_at=ch.created_at.isoformat() if ch.created_at else None, outline_title=outline_mapping.get(ch.outline_id) if ch.outline_id else None, sub_index=ch.sub_index, expansion_plan=expansion_plan )) return exported_chapters @staticmethod async def _export_characters(project_id: str, db: AsyncSession) -> List[CharacterExportData]: """导出角色""" result = await db.execute( select(Character).where(Character.project_id == project_id) ) characters = result.scalars().all() exported = [] for char in characters: # 解析traits JSON traits = None if char.traits: try: traits = json.loads(char.traits) if isinstance(char.traits, str) else char.traits except Exception: traits = None exported.append(CharacterExportData( name=char.name, age=char.age, gender=char.gender, is_organization=char.is_organization or False, role_type=char.role_type, personality=char.personality, background=char.background, appearance=char.appearance, traits=traits, organization_type=char.organization_type, organization_purpose=char.organization_purpose, created_at=char.created_at.isoformat() if char.created_at else None )) return exported @staticmethod async def _export_outlines(project_id: str, db: AsyncSession) -> List[OutlineExportData]: """导出大纲""" result = await db.execute( select(Outline) .where(Outline.project_id == project_id) .order_by(Outline.order_index) ) outlines = result.scalars().all() return [ OutlineExportData( title=ol.title, content=ol.content, structure=ol.structure, order_index=ol.order_index, created_at=ol.created_at.isoformat() if ol.created_at else None ) for ol in outlines ] @staticmethod async def _export_relationships(project_id: str, db: AsyncSession) -> List[RelationshipExportData]: """导出关系""" result = await db.execute( select(CharacterRelationship, Character) .join(Character, CharacterRelationship.character_from_id == Character.id) .where(CharacterRelationship.project_id == project_id) ) relationships = result.all() exported = [] for rel, char_from in relationships: # 获取目标角色名称 target_result = await db.execute( select(Character).where(Character.id == rel.character_to_id) ) char_to = target_result.scalar_one_or_none() if char_to: exported.append(RelationshipExportData( source_name=char_from.name, target_name=char_to.name, relationship_name=rel.relationship_name, intimacy_level=rel.intimacy_level or 50, status=rel.status or "active", description=rel.description, started_at=rel.started_at )) return exported @staticmethod async def _export_organizations(project_id: str, db: AsyncSession) -> List[OrganizationExportData]: """导出组织详情""" result = await db.execute( select(Organization, Character) .join(Character, Organization.character_id == Character.id) .where(Organization.project_id == project_id) ) organizations = result.all() exported = [] for org, char in organizations: # 获取父组织名称 parent_name = None if org.parent_org_id: parent_result = await db.execute( select(Organization, Character) .join(Character, Organization.character_id == Character.id) .where(Organization.id == org.parent_org_id) ) parent_data = parent_result.first() if parent_data: parent_name = parent_data[1].name exported.append(OrganizationExportData( character_name=char.name, parent_org_name=parent_name, power_level=org.power_level or 50, member_count=org.member_count or 0, location=org.location, motto=org.motto, color=org.color )) return exported @staticmethod async def _export_organization_members(project_id: str, db: AsyncSession) -> List[OrganizationMemberExportData]: """导出组织成员""" result = await db.execute( select(OrganizationMember, Organization, Character) .join(Organization, OrganizationMember.organization_id == Organization.id) .join(Character, Organization.character_id == Character.id) .where(Organization.project_id == project_id) ) members = result.all() exported = [] for member, org, org_char in members: # 获取成员角色名称 char_result = await db.execute( select(Character).where(Character.id == member.character_id) ) member_char = char_result.scalar_one_or_none() if member_char: exported.append(OrganizationMemberExportData( organization_name=org_char.name, character_name=member_char.name, position=member.position, rank=member.rank or 0, status=member.status or "active", joined_at=member.joined_at, loyalty=member.loyalty or 50, contribution=member.contribution or 0, notes=member.notes )) return exported @staticmethod async def _export_writing_styles(project_id: str, db: AsyncSession) -> List[WritingStyleExportData]: """导出写作风格(用户自定义风格)""" # 获取项目所属用户 project_result = await db.execute( select(Project).where(Project.id == project_id) ) project = project_result.scalar_one_or_none() if not project: return [] # 导出该用户的自定义风格(不包括全局预设) result = await db.execute( select(WritingStyle) .where(WritingStyle.user_id == project.user_id) .order_by(WritingStyle.order_index) ) styles = result.scalars().all() return [ WritingStyleExportData( name=style.name, style_type=style.style_type, preset_id=style.preset_id, description=style.description, prompt_content=style.prompt_content, order_index=style.order_index or 0 ) for style in styles ] @staticmethod async def _export_generation_history(project_id: str, db: AsyncSession) -> List[GenerationHistoryExportData]: """导出生成历史""" result = await db.execute( select(GenerationHistory, Chapter) .outerjoin(Chapter, GenerationHistory.chapter_id == Chapter.id) .where(GenerationHistory.project_id == project_id) .order_by(GenerationHistory.created_at.desc()) .limit(100) # 限制最多导出100条历史记录 ) histories = result.all() return [ GenerationHistoryExportData( chapter_title=chapter.title if chapter else None, prompt=history.prompt, generated_content=history.generated_content, model=history.model, tokens_used=history.tokens_used, generation_time=history.generation_time, created_at=history.created_at.isoformat() if history.created_at else None ) for history, chapter in histories ] @staticmethod async def _export_careers(project_id: str, db: AsyncSession) -> List[CareerExportData]: """导出职业系统""" result = await db.execute( select(Career) .where(Career.project_id == project_id) .order_by(Career.type, Career.created_at) ) careers = result.scalars().all() return [ CareerExportData( name=career.name, type=career.type, description=career.description, category=career.category, stages=career.stages, max_stage=career.max_stage or 10, requirements=career.requirements, special_abilities=career.special_abilities, worldview_rules=career.worldview_rules, attribute_bonuses=career.attribute_bonuses, source=career.source or "ai", created_at=career.created_at.isoformat() if career.created_at else None ) for career in careers ] @staticmethod async def _export_character_careers(project_id: str, db: AsyncSession) -> List[CharacterCareerExportData]: """导出角色职业关联""" # 查询所有属于该项目的角色职业关联 result = await db.execute( select(CharacterCareer, Character, Career) .join(Character, CharacterCareer.character_id == Character.id) .join(Career, CharacterCareer.career_id == Career.id) .where(Character.project_id == project_id) ) character_careers = result.all() return [ CharacterCareerExportData( character_name=char.name, career_name=career.name, career_type=cc.career_type, current_stage=cc.current_stage or 1, stage_progress=cc.stage_progress or 0, started_at=cc.started_at, reached_current_stage_at=cc.reached_current_stage_at, notes=cc.notes ) for cc, char, career in character_careers ] @staticmethod async def _export_story_memories(project_id: str, db: AsyncSession) -> List[StoryMemoryExportData]: """导出故事记忆""" # 构建章节ID到标题的映射 chapter_result = await db.execute( select(Chapter).where(Chapter.project_id == project_id) ) chapters = chapter_result.scalars().all() chapter_mapping = {ch.id: ch.title for ch in chapters} # 构建角色ID到名称的映射 char_result = await db.execute( select(Character).where(Character.project_id == project_id) ) characters = char_result.scalars().all() char_mapping = {char.id: char.name for char in characters} result = await db.execute( select(StoryMemory) .where(StoryMemory.project_id == project_id) .order_by(StoryMemory.story_timeline, StoryMemory.chapter_position) ) memories = result.scalars().all() exported = [] for mem in memories: # 将角色ID列表转换为名称列表 related_char_names = None if mem.related_characters: related_char_names = [ char_mapping.get(char_id, char_id) for char_id in mem.related_characters ] exported.append(StoryMemoryExportData( chapter_title=chapter_mapping.get(mem.chapter_id) if mem.chapter_id else None, memory_type=mem.memory_type, title=mem.title, content=mem.content, full_context=mem.full_context, related_characters=related_char_names, related_locations=mem.related_locations, tags=mem.tags, importance_score=mem.importance_score or 0.5, story_timeline=mem.story_timeline, chapter_position=mem.chapter_position or 0, text_length=mem.text_length or 0, is_foreshadow=mem.is_foreshadow or 0, foreshadow_strength=mem.foreshadow_strength, created_at=mem.created_at.isoformat() if mem.created_at else None )) return exported @staticmethod async def _export_plot_analysis(project_id: str, db: AsyncSession) -> List[PlotAnalysisExportData]: """导出剧情分析""" # 构建章节ID到标题的映射 chapter_result = await db.execute( select(Chapter).where(Chapter.project_id == project_id) ) chapters = chapter_result.scalars().all() chapter_mapping = {ch.id: ch.title for ch in chapters} result = await db.execute( select(PlotAnalysis) .where(PlotAnalysis.project_id == project_id) ) analyses = result.scalars().all() exported = [] for analysis in analyses: chapter_title = chapter_mapping.get(analysis.chapter_id) if not chapter_title: continue # 跳过没有关联章节的分析 exported.append(PlotAnalysisExportData( chapter_title=chapter_title, plot_stage=analysis.plot_stage, conflict_level=analysis.conflict_level, conflict_types=analysis.conflict_types, emotional_tone=analysis.emotional_tone, emotional_intensity=analysis.emotional_intensity, emotional_curve=analysis.emotional_curve, hooks=analysis.hooks, hooks_count=analysis.hooks_count or 0, hooks_avg_strength=analysis.hooks_avg_strength, foreshadows=analysis.foreshadows, foreshadows_planted=analysis.foreshadows_planted or 0, foreshadows_resolved=analysis.foreshadows_resolved or 0, plot_points=analysis.plot_points, plot_points_count=analysis.plot_points_count or 0, character_states=analysis.character_states, scenes=analysis.scenes, pacing=analysis.pacing, overall_quality_score=analysis.overall_quality_score, pacing_score=analysis.pacing_score, engagement_score=analysis.engagement_score, coherence_score=analysis.coherence_score, analysis_report=analysis.analysis_report, suggestions=analysis.suggestions, word_count=analysis.word_count, dialogue_ratio=analysis.dialogue_ratio, description_ratio=analysis.description_ratio, created_at=analysis.created_at.isoformat() if analysis.created_at else None )) return exported @staticmethod async def _export_project_default_style(project_id: str, db: AsyncSession) -> Optional[ProjectDefaultStyleExportData]: """导出项目默认风格""" result = await db.execute( select(ProjectDefaultStyle, WritingStyle) .join(WritingStyle, ProjectDefaultStyle.style_id == WritingStyle.id) .where(ProjectDefaultStyle.project_id == project_id) ) row = result.first() if row: _, style = row return ProjectDefaultStyleExportData(style_name=style.name) return None @staticmethod def validate_import_data(data: Dict) -> ImportValidationResult: """ 验证导入数据 Args: data: 导入的JSON数据 Returns: ImportValidationResult: 验证结果 """ errors = [] warnings = [] statistics = {} # 检查版本 version = data.get("version", "") if not version: errors.append("缺少版本信息") elif version not in ImportExportService.SUPPORTED_VERSIONS: warnings.append(f"版本不匹配: 导入文件版本为 {version}, 当前支持版本为 {', '.join(ImportExportService.SUPPORTED_VERSIONS)}") # 检查必需字段 if "project" not in data: errors.append("缺少项目信息") else: project = data["project"] if not project.get("title"): errors.append("项目标题不能为空") # 统计数据(包含新增字段) statistics = { "chapters": len(data.get("chapters", [])), "characters": len(data.get("characters", [])), "outlines": len(data.get("outlines", [])), "relationships": len(data.get("relationships", [])), "organizations": len(data.get("organizations", [])), "organization_members": len(data.get("organization_members", [])), "writing_styles": len(data.get("writing_styles", [])), "generation_history": len(data.get("generation_history", [])), "careers": len(data.get("careers", [])), "character_careers": len(data.get("character_careers", [])), "story_memories": len(data.get("story_memories", [])), "plot_analysis": len(data.get("plot_analysis", [])), "has_default_style": data.get("project_default_style") is not None } # 检查数据完整性 if statistics["chapters"] == 0: warnings.append("项目没有章节数据") if statistics["characters"] == 0: warnings.append("项目没有角色数据") project_name = data.get("project", {}).get("title", "未知项目") return ImportValidationResult( valid=len(errors) == 0, version=version, project_name=project_name, statistics=statistics, errors=errors, warnings=warnings ) @staticmethod async def import_project( data: Dict, db: AsyncSession, user_id: str ) -> ImportResult: """ 导入项目数据(创建新项目) Args: data: 导入的JSON数据 db: 数据库会话 user_id: 目标用户ID(导入后的项目归属) Returns: ImportResult: 导入结果 """ warnings = [] statistics = {} try: # 验证数据 validation = ImportExportService.validate_import_data(data) if not validation.valid: return ImportResult( success=False, message=f"数据验证失败: {', '.join(validation.errors)}", statistics={}, warnings=validation.warnings ) warnings.extend(validation.warnings) logger.info(f"开始导入项目: {validation.project_name}") # 创建项目 project_data = data["project"] new_project = Project( user_id=user_id, # 设置为当前用户ID title=project_data.get("title"), description=project_data.get("description"), theme=project_data.get("theme"), genre=project_data.get("genre"), target_words=project_data.get("target_words"), status=project_data.get("status", "planning"), world_time_period=project_data.get("world_time_period"), world_location=project_data.get("world_location"), world_atmosphere=project_data.get("world_atmosphere"), world_rules=project_data.get("world_rules"), chapter_count=project_data.get("chapter_count"), narrative_perspective=project_data.get("narrative_perspective"), character_count=project_data.get("character_count"), outline_mode=project_data.get("outline_mode", "one-to-many"), # ✅ 导入大纲模式,默认为一对多 current_words=project_data.get("current_words", 0), # 保留原项目的字数 wizard_step=4, # 导入的项目设置为向导完成状态 wizard_status="completed" # 标记向导已完成 ) db.add(new_project) await db.flush() # 获取project_id logger.info(f"创建项目成功: {new_project.id}") # 导入角色(包括组织)- 需要先导入角色,因为大纲可能需要角色信息 char_mapping = await ImportExportService._import_characters( new_project.id, data.get("characters", []), db ) statistics["characters"] = len(char_mapping) logger.info(f"导入角色数: {len(char_mapping)}") # 导入大纲 - 需要在章节之前导入,以便建立关联 outline_mapping = await ImportExportService._import_outlines( new_project.id, data.get("outlines", []), db ) statistics["outlines"] = len(outline_mapping) logger.info(f"导入大纲数: {len(outline_mapping)}") # 导入章节 - 使用大纲映射重建关联关系 chapters_count = await ImportExportService._import_chapters( new_project.id, data.get("chapters", []), outline_mapping, db ) statistics["chapters"] = chapters_count logger.info(f"导入章节数: {chapters_count}") # 导入关系 relationships_count = await ImportExportService._import_relationships( new_project.id, data.get("relationships", []), char_mapping, db ) statistics["relationships"] = relationships_count logger.info(f"导入关系数: {relationships_count}") # 导入组织详情 org_mapping = await ImportExportService._import_organizations( new_project.id, data.get("organizations", []), char_mapping, db ) statistics["organizations"] = len(org_mapping) logger.info(f"导入组织数: {len(org_mapping)}") # 导入组织成员 org_members_count = await ImportExportService._import_organization_members( data.get("organization_members", []), char_mapping, org_mapping, db ) statistics["organization_members"] = org_members_count logger.info(f"导入组织成员数: {org_members_count}") # 导入写作风格 styles_count = await ImportExportService._import_writing_styles( new_project.id, data.get("writing_styles", []), db ) statistics["writing_styles"] = styles_count logger.info(f"导入写作风格数: {styles_count}") # 导入职业系统 career_mapping = await ImportExportService._import_careers( new_project.id, data.get("careers", []), db ) statistics["careers"] = len(career_mapping) logger.info(f"导入职业数: {len(career_mapping)}") # 导入角色职业关联 char_careers_count = await ImportExportService._import_character_careers( data.get("character_careers", []), char_mapping, career_mapping, db ) statistics["character_careers"] = char_careers_count logger.info(f"导入角色职业关联数: {char_careers_count}") # 导入故事记忆 # 需要先构建章节标题到ID的映射(使用章节号+标题组合确保唯一性) chapter_title_to_id = {} chapter_result = await db.execute( select(Chapter).where(Chapter.project_id == new_project.id) ) imported_chapters = chapter_result.scalars().all() for ch in imported_chapters: # 使用标题作为key,如果有重复标题则取第一个(已导入的顺序) if ch.title and ch.title not in chapter_title_to_id: chapter_title_to_id[ch.title] = ch.id memories_count = await ImportExportService._import_story_memories( new_project.id, data.get("story_memories", []), chapter_title_to_id, char_mapping, db ) statistics["story_memories"] = memories_count logger.info(f"导入故事记忆数: {memories_count}") # 导入剧情分析(传入user_id以便创建分析任务记录) plot_analysis_count = await ImportExportService._import_plot_analysis( new_project.id, data.get("plot_analysis", []), chapter_title_to_id, db, user_id ) statistics["plot_analysis"] = plot_analysis_count logger.info(f"导入剧情分析数: {plot_analysis_count}") # 导入项目默认风格 default_style_imported = await ImportExportService._import_project_default_style( new_project.id, data.get("project_default_style"), db ) statistics["project_default_style"] = 1 if default_style_imported else 0 if default_style_imported: logger.info("导入项目默认风格成功") # 提交事务 await db.commit() logger.info(f"项目导入完成: {new_project.id}") return ImportResult( success=True, project_id=new_project.id, message="项目导入成功", statistics=statistics, warnings=warnings ) except Exception as e: await db.rollback() logger.error(f"导入项目失败: {str(e)}", exc_info=True) return ImportResult( success=False, message=f"导入失败: {str(e)}", statistics=statistics, warnings=warnings ) @staticmethod async def _import_chapters( project_id: str, chapters_data: List[Dict], outline_mapping: Dict[str, str], db: AsyncSession ) -> int: """导入章节""" count = 0 for ch_data in chapters_data: # 根据大纲标题查找对应的新大纲ID outline_id = None outline_title = ch_data.get("outline_title") if outline_title and outline_title in outline_mapping: outline_id = outline_mapping[outline_title] # 处理expansion_plan expansion_plan = ch_data.get("expansion_plan") if expansion_plan and isinstance(expansion_plan, dict): expansion_plan = json.dumps(expansion_plan, ensure_ascii=False) chapter = Chapter( project_id=project_id, title=ch_data.get("title"), content=ch_data.get("content"), summary=ch_data.get("summary"), chapter_number=ch_data.get("chapter_number"), word_count=ch_data.get("word_count", 0), status=ch_data.get("status", "draft"), outline_id=outline_id, sub_index=ch_data.get("sub_index"), expansion_plan=expansion_plan ) db.add(chapter) count += 1 return count @staticmethod async def _import_characters( project_id: str, characters_data: List[Dict], db: AsyncSession ) -> Dict[str, str]: """导入角色,返回名称到ID的映射""" char_mapping = {} for char_data in characters_data: # 处理traits traits = char_data.get("traits") if isinstance(traits, list): traits = json.dumps(traits, ensure_ascii=False) character = Character( project_id=project_id, name=char_data.get("name"), age=char_data.get("age"), gender=char_data.get("gender"), is_organization=char_data.get("is_organization", False), role_type=char_data.get("role_type"), personality=char_data.get("personality"), background=char_data.get("background"), appearance=char_data.get("appearance"), traits=traits, organization_type=char_data.get("organization_type"), organization_purpose=char_data.get("organization_purpose") ) db.add(character) await db.flush() # 获取ID char_mapping[char_data.get("name")] = character.id return char_mapping @staticmethod async def _import_outlines( project_id: str, outlines_data: List[Dict], db: AsyncSession ) -> Dict[str, str]: """导入大纲,返回标题到ID的映射""" outline_mapping = {} for ol_data in outlines_data: outline = Outline( project_id=project_id, title=ol_data.get("title"), content=ol_data.get("content"), structure=ol_data.get("structure"), order_index=ol_data.get("order_index") ) db.add(outline) await db.flush() # 获取ID outline_mapping[ol_data.get("title")] = outline.id return outline_mapping @staticmethod async def _import_relationships( project_id: str, relationships_data: List[Dict], char_mapping: Dict[str, str], db: AsyncSession ) -> int: """导入关系""" count = 0 for rel_data in relationships_data: source_name = rel_data.get("source_name") target_name = rel_data.get("target_name") # 查找角色ID source_id = char_mapping.get(source_name) target_id = char_mapping.get(target_name) if source_id and target_id: relationship = CharacterRelationship( project_id=project_id, character_from_id=source_id, character_to_id=target_id, relationship_name=rel_data.get("relationship_name"), intimacy_level=rel_data.get("intimacy_level", 50), status=rel_data.get("status", "active"), description=rel_data.get("description"), started_at=rel_data.get("started_at") ) db.add(relationship) count += 1 return count @staticmethod async def _import_organizations( project_id: str, organizations_data: List[Dict], char_mapping: Dict[str, str], db: AsyncSession ) -> Dict[str, str]: """导入组织详情,返回名称到ID的映射""" org_mapping = {} # 第一遍:创建所有组织(不设置父组织) temp_orgs = [] for org_data in organizations_data: char_name = org_data.get("character_name") char_id = char_mapping.get(char_name) if char_id: organization = Organization( project_id=project_id, character_id=char_id, power_level=org_data.get("power_level", 50), member_count=org_data.get("member_count", 0), location=org_data.get("location"), motto=org_data.get("motto"), color=org_data.get("color") ) db.add(organization) temp_orgs.append((organization, org_data.get("parent_org_name"))) await db.flush() # 获取所有组织的ID # 建立名称到ID的映射 for org, _ in temp_orgs: # 通过character_id查找角色名 result = await db.execute( select(Character).where(Character.id == org.character_id) ) char = result.scalar_one_or_none() if char: org_mapping[char.name] = org.id # 第二遍:设置父组织关系 for org, parent_name in temp_orgs: if parent_name: parent_id = org_mapping.get(parent_name) if parent_id: org.parent_org_id = parent_id return org_mapping @staticmethod async def _import_organization_members( org_members_data: List[Dict], char_mapping: Dict[str, str], org_mapping: Dict[str, str], db: AsyncSession ) -> int: """导入组织成员""" count = 0 for member_data in org_members_data: org_name = member_data.get("organization_name") char_name = member_data.get("character_name") org_id = org_mapping.get(org_name) char_id = char_mapping.get(char_name) if org_id and char_id: member = OrganizationMember( organization_id=org_id, character_id=char_id, position=member_data.get("position"), rank=member_data.get("rank", 0), status=member_data.get("status", "active"), joined_at=member_data.get("joined_at"), loyalty=member_data.get("loyalty", 50), contribution=member_data.get("contribution", 0), notes=member_data.get("notes") ) db.add(member) count += 1 return count @staticmethod async def _import_writing_styles( project_id: str, styles_data: List[Dict], db: AsyncSession ) -> int: """导入写作风格(用户自定义风格)""" # 获取项目所属用户 project_result = await db.execute( select(Project).where(Project.id == project_id) ) project = project_result.scalar_one_or_none() if not project: return 0 count = 0 for style_data in styles_data: # 检查是否已存在同名风格(避免重复导入) existing = await db.execute( select(WritingStyle).where( WritingStyle.user_id == project.user_id, WritingStyle.name == style_data.get("name") ) ) # 使用 first() 避免多行时报错 if existing.first(): logger.debug(f"风格 {style_data.get('name')} 已存在,跳过导入") continue style = WritingStyle( user_id=project.user_id, # 使用 user_id 而不是 project_id name=style_data.get("name"), style_type=style_data.get("style_type"), preset_id=style_data.get("preset_id"), description=style_data.get("description"), prompt_content=style_data.get("prompt_content"), order_index=style_data.get("order_index", 0) ) db.add(style) count += 1 return count @staticmethod async def _import_careers( project_id: str, careers_data: List[Dict], db: AsyncSession ) -> Dict[str, str]: """导入职业,返回名称到ID的映射""" career_mapping = {} for career_data in careers_data: career = Career( project_id=project_id, name=career_data.get("name"), type=career_data.get("type", "main"), description=career_data.get("description"), category=career_data.get("category"), stages=career_data.get("stages", "[]"), max_stage=career_data.get("max_stage", 10), requirements=career_data.get("requirements"), special_abilities=career_data.get("special_abilities"), worldview_rules=career_data.get("worldview_rules"), attribute_bonuses=career_data.get("attribute_bonuses"), source=career_data.get("source", "ai") ) db.add(career) await db.flush() career_mapping[career_data.get("name")] = career.id return career_mapping @staticmethod async def _import_character_careers( character_careers_data: List[Dict], char_mapping: Dict[str, str], career_mapping: Dict[str, str], db: AsyncSession ) -> int: """导入角色职业关联""" count = 0 for cc_data in character_careers_data: char_name = cc_data.get("character_name") career_name = cc_data.get("career_name") char_id = char_mapping.get(char_name) career_id = career_mapping.get(career_name) if char_id and career_id: # 检查是否已存在(使用 first() 避免多行时报错) existing = await db.execute( select(CharacterCareer).where( CharacterCareer.character_id == char_id, CharacterCareer.career_id == career_id ) ) if existing.first(): continue char_career = CharacterCareer( character_id=char_id, career_id=career_id, career_type=cc_data.get("career_type", "main"), current_stage=cc_data.get("current_stage", 1), stage_progress=cc_data.get("stage_progress", 0), started_at=cc_data.get("started_at"), reached_current_stage_at=cc_data.get("reached_current_stage_at"), notes=cc_data.get("notes") ) db.add(char_career) count += 1 # 同时更新角色的主职业信息 if cc_data.get("career_type") == "main": char_result = await db.execute( select(Character).where(Character.id == char_id) ) char = char_result.scalar_one_or_none() if char: char.main_career_id = career_id char.main_career_stage = cc_data.get("current_stage", 1) return count @staticmethod async def _import_story_memories( project_id: str, memories_data: List[Dict], chapter_mapping: Dict[str, str], char_mapping: Dict[str, str], db: AsyncSession ) -> int: """导入故事记忆""" count = 0 for mem_data in memories_data: # 将章节标题转换为ID chapter_id = None chapter_title = mem_data.get("chapter_title") if chapter_title and chapter_title in chapter_mapping: chapter_id = chapter_mapping[chapter_title] # 将角色名称列表转换为ID列表 related_char_ids = None related_char_names = mem_data.get("related_characters") if related_char_names: related_char_ids = [ char_mapping.get(name) for name in related_char_names if char_mapping.get(name) ] memory = StoryMemory( project_id=project_id, chapter_id=chapter_id, memory_type=mem_data.get("memory_type"), title=mem_data.get("title"), content=mem_data.get("content"), full_context=mem_data.get("full_context"), related_characters=related_char_ids, related_locations=mem_data.get("related_locations"), tags=mem_data.get("tags"), importance_score=mem_data.get("importance_score", 0.5), story_timeline=mem_data.get("story_timeline", 0), chapter_position=mem_data.get("chapter_position", 0), text_length=mem_data.get("text_length", 0), is_foreshadow=mem_data.get("is_foreshadow", 0), foreshadow_strength=mem_data.get("foreshadow_strength") ) db.add(memory) count += 1 return count @staticmethod async def _import_plot_analysis( project_id: str, plot_data: List[Dict], chapter_mapping: Dict[str, str], db: AsyncSession, user_id: str = None ) -> int: """导入剧情分析,同时创建已完成的分析任务记录""" from datetime import datetime count = 0 for analysis_data in plot_data: chapter_title = analysis_data.get("chapter_title") chapter_id = chapter_mapping.get(chapter_title) if not chapter_id: continue # 跳过找不到章节的分析 # 检查是否已存在该章节的分析(使用 first() 避免多行时报错) existing = await db.execute( select(PlotAnalysis).where(PlotAnalysis.chapter_id == chapter_id) ) if existing.first(): continue analysis = PlotAnalysis( project_id=project_id, chapter_id=chapter_id, plot_stage=analysis_data.get("plot_stage"), conflict_level=analysis_data.get("conflict_level"), conflict_types=analysis_data.get("conflict_types"), emotional_tone=analysis_data.get("emotional_tone"), emotional_intensity=analysis_data.get("emotional_intensity"), emotional_curve=analysis_data.get("emotional_curve"), hooks=analysis_data.get("hooks"), hooks_count=analysis_data.get("hooks_count", 0), hooks_avg_strength=analysis_data.get("hooks_avg_strength"), foreshadows=analysis_data.get("foreshadows"), foreshadows_planted=analysis_data.get("foreshadows_planted", 0), foreshadows_resolved=analysis_data.get("foreshadows_resolved", 0), plot_points=analysis_data.get("plot_points"), plot_points_count=analysis_data.get("plot_points_count", 0), character_states=analysis_data.get("character_states"), scenes=analysis_data.get("scenes"), pacing=analysis_data.get("pacing"), overall_quality_score=analysis_data.get("overall_quality_score"), pacing_score=analysis_data.get("pacing_score"), engagement_score=analysis_data.get("engagement_score"), coherence_score=analysis_data.get("coherence_score"), analysis_report=analysis_data.get("analysis_report"), suggestions=analysis_data.get("suggestions"), word_count=analysis_data.get("word_count"), dialogue_ratio=analysis_data.get("dialogue_ratio"), description_ratio=analysis_data.get("description_ratio") ) db.add(analysis) # 同时创建已完成的分析任务记录,这样章节管理页面会显示"已分析"状态 if user_id: now = datetime.utcnow() analysis_task = AnalysisTask( chapter_id=chapter_id, user_id=user_id, project_id=project_id, status='completed', progress=100, started_at=now, completed_at=now ) db.add(analysis_task) count += 1 return count @staticmethod async def _import_project_default_style( project_id: str, default_style_data: Optional[Dict], db: AsyncSession ) -> bool: """导入项目默认风格""" if not default_style_data: return False style_name = default_style_data.get("style_name") if not style_name: return False # 获取项目所属用户 project_result = await db.execute( select(Project).where(Project.id == project_id) ) project = project_result.scalar_one_or_none() if not project: return False # 查找对应的风格(优先查找用户自定义风格,然后是全局预设风格) # 先查用户自定义风格(使用 first() 避免多行时报错) style_result = await db.execute( select(WritingStyle).where( WritingStyle.user_id == project.user_id, WritingStyle.name == style_name ) ) style_row = style_result.first() style = style_row[0] if style_row else None # 如果用户自定义风格不存在,查找全局预设风格 if not style: style_result = await db.execute( select(WritingStyle).where( WritingStyle.user_id.is_(None), WritingStyle.name == style_name ) ) style_row = style_result.first() style = style_row[0] if style_row else None if not style: logger.warning(f"导入项目默认风格时未找到风格: {style_name}") return False # 创建项目默认风格关联 default_style = ProjectDefaultStyle( project_id=project_id, style_id=style.id ) db.add(default_style) logger.info(f"项目默认风格导入成功: {style_name}, style_id={style.id}") return True @staticmethod async def export_characters( character_ids: List[str], db: AsyncSession ) -> Dict[str, Any]: """ 导出角色/组织卡片 Args: character_ids: 要导出的角色/组织ID列表 db: 数据库会话 Returns: Dict: 导出的角色数据 """ logger.info(f"开始导出角色/组织: {len(character_ids)} 个") # 查询角色数据 result = await db.execute( select(Character).where(Character.id.in_(character_ids)) ) characters = result.scalars().all() if not characters: raise ValueError("未找到指定的角色/组织") # 导出角色数据 exported_characters = [] for char in characters: # 解析 traits traits = None if char.traits: try: traits = json.loads(char.traits) if isinstance(char.traits, str) else char.traits except Exception: traits = None # 基础角色数据 char_data = { "name": char.name, "age": char.age, "gender": char.gender, "is_organization": char.is_organization or False, "role_type": char.role_type, "personality": char.personality, "background": char.background, "appearance": char.appearance, "traits": traits, "organization_type": char.organization_type, "organization_purpose": char.organization_purpose, "avatar_url": char.avatar_url, "main_career_id": char.main_career_id, "main_career_stage": char.main_career_stage, "sub_careers": char.sub_careers, "created_at": char.created_at.isoformat() if char.created_at else None } # 如果是组织,添加组织专属字段 if char.is_organization: org_result = await db.execute( select(Organization).where(Organization.character_id == char.id) ) org = org_result.scalar_one_or_none() if org: char_data.update({ "power_level": org.power_level, "location": org.location, "motto": org.motto, "color": org.color }) # 从 OrganizationMember 表导出结构化成员数据 members_result = await db.execute( select(OrganizationMember).where(OrganizationMember.organization_id == org.id) ) members = members_result.scalars().all() if members: char_data["organization_members_data"] = [ { "character_id": m.character_id, "position": m.position, "rank": m.rank, "loyalty": m.loyalty, "contribution": m.contribution, "status": m.status, "joined_at": m.joined_at, "source": m.source } for m in members ] exported_characters.append(char_data) export_data = { "version": ImportExportService.CURRENT_VERSION, "export_time": datetime.utcnow().isoformat(), "export_type": "characters", "count": len(exported_characters), "data": exported_characters } logger.info(f"角色/组织导出完成: {len(exported_characters)} 个") return export_data @staticmethod async def import_characters( data: Dict, project_id: str, user_id: str, db: AsyncSession ) -> Dict[str, Any]: """ 导入角色/组织卡片 Args: data: 导入的JSON数据 project_id: 目标项目ID user_id: 用户ID db: 数据库会话 Returns: Dict: 导入结果 """ from app.models.career import CharacterCareer, Career warnings = [] imported_characters = [] imported_organizations = [] skipped = [] errors = [] try: # 验证数据格式 if "data" not in data: raise ValueError("导入数据格式错误:缺少data字段") characters_data = data["data"] if not isinstance(characters_data, list): raise ValueError("导入数据格式错误:data字段必须是数组") # 验证项目权限 project_result = await db.execute( select(Project).where( Project.id == project_id, Project.user_id == user_id ) ) project = project_result.scalar_one_or_none() if not project: raise ValueError("项目不存在或无权访问") logger.info(f"开始导入 {len(characters_data)} 个角色/组织到项目 {project_id}") # 处理每个角色/组织 for idx, char_data in enumerate(characters_data): try: name = char_data.get("name") if not name: errors.append(f"第{idx+1}个角色缺少name字段") continue # 检查重复名称(使用 first() 避免多行时报错) existing_result = await db.execute( select(Character).where( Character.project_id == project_id, Character.name == name ) ) existing = existing_result.first() if existing: warnings.append(f"角色'{name}'已存在,已跳过") skipped.append(name) continue # 处理traits traits = char_data.get("traits") if isinstance(traits, list): traits = json.dumps(traits, ensure_ascii=False) is_organization = char_data.get("is_organization", False) # 创建角色 character = Character( project_id=project_id, name=name, age=char_data.get("age"), gender=char_data.get("gender"), is_organization=is_organization, role_type=char_data.get("role_type"), personality=char_data.get("personality"), background=char_data.get("background"), appearance=char_data.get("appearance"), traits=traits, organization_type=char_data.get("organization_type"), organization_purpose=char_data.get("organization_purpose"), avatar_url=char_data.get("avatar_url"), main_career_id=None, # 职业ID需要验证后再设置 main_career_stage=char_data.get("main_career_stage"), sub_careers=None # 副职业需要验证后再设置 ) db.add(character) await db.flush() # 获取character.id # 处理主职业(如果有) main_career_id = char_data.get("main_career_id") main_career_stage = char_data.get("main_career_stage") if main_career_id and not is_organization: # 验证职业是否存在 career_result = await db.execute( select(Career).where( Career.id == main_career_id, Career.project_id == project_id, Career.type == 'main' ) ) career = career_result.scalar_one_or_none() if career: character.main_career_id = main_career_id character.main_career_stage = main_career_stage or 1 # 创建职业关联 char_career = CharacterCareer( character_id=character.id, career_id=main_career_id, career_type='main', current_stage=main_career_stage or 1, stage_progress=0 ) db.add(char_career) else: warnings.append(f"角色'{name}'的主职业ID不存在,已忽略职业信息") # 处理副职业(如果有) sub_careers = char_data.get("sub_careers") if sub_careers and not is_organization: try: sub_careers_data = json.loads(sub_careers) if isinstance(sub_careers, str) else sub_careers if isinstance(sub_careers_data, list): valid_sub_careers = [] for sub_data in sub_careers_data[:2]: # 最多2个副职业 if isinstance(sub_data, dict): career_id = sub_data.get('career_id') stage = sub_data.get('stage', 1) if career_id: # 验证副职业是否存在 career_result = await db.execute( select(Career).where( Career.id == career_id, Career.project_id == project_id, Career.type == 'sub' ) ) career = career_result.scalar_one_or_none() if career: valid_sub_careers.append({ 'career_id': career_id, 'stage': stage }) # 创建副职业关联 char_career = CharacterCareer( character_id=character.id, career_id=career_id, career_type='sub', current_stage=stage, stage_progress=0 ) db.add(char_career) if valid_sub_careers: character.sub_careers = json.dumps(valid_sub_careers, ensure_ascii=False) elif sub_careers_data: warnings.append(f"角色'{name}'的副职业ID不存在,已忽略副职业信息") except Exception as e: warnings.append(f"角色'{name}'的副职业数据解析失败: {str(e)}") # 如果是组织,创建Organization记录 if is_organization: organization = Organization( character_id=character.id, project_id=project_id, member_count=0, power_level=char_data.get("power_level", 50), location=char_data.get("location"), motto=char_data.get("motto"), color=char_data.get("color") ) db.add(organization) await db.flush() # 导入组织成员数据(如果有) members_data = char_data.get("organization_members_data", []) if members_data and isinstance(members_data, list): imported_member_count = 0 for m_data in members_data: try: member_char_id = m_data.get("character_id") if not member_char_id: continue # 验证成员角色是否存在于目标项目 member_char_result = await db.execute( select(Character).where( Character.id == member_char_id, Character.project_id == project_id ) ) if member_char_result.scalar_one_or_none(): member = OrganizationMember( organization_id=organization.id, character_id=member_char_id, position=m_data.get("position", "成员"), rank=m_data.get("rank", 0), loyalty=m_data.get("loyalty", 50), contribution=m_data.get("contribution", 0), status=m_data.get("status", "active"), joined_at=m_data.get("joined_at"), source=m_data.get("source", "imported") ) db.add(member) imported_member_count += 1 except Exception as me: logger.warning(f"导入组织成员失败: {str(me)}") if imported_member_count > 0: organization.member_count = imported_member_count logger.info(f"导入组织'{name}'的 {imported_member_count} 个成员") imported_organizations.append(name) else: imported_characters.append(name) logger.info(f"导入{'组织' if is_organization else '角色'}成功: {name}") except Exception as e: error_msg = f"导入角色'{char_data.get('name', f'第{idx+1}个')}'失败: {str(e)}" logger.error(error_msg) errors.append(error_msg) continue # 提交事务 await db.commit() total = len(imported_characters) + len(imported_organizations) result = { "success": True, "message": f"成功导入 {total} 个角色/组织", "statistics": { "total": len(characters_data), "imported": total, "skipped": len(skipped), "errors": len(errors) }, "details": { "imported_characters": imported_characters, "imported_organizations": imported_organizations, "skipped": skipped, "errors": errors }, "warnings": warnings } logger.info(f"角色/组织导入完成: 成功{total}个,跳过{len(skipped)}个,失败{len(errors)}个") return result except Exception as e: await db.rollback() logger.error(f"导入角色/组织失败: {str(e)}", exc_info=True) return { "success": False, "message": f"导入失败: {str(e)}", "statistics": { "total": len(characters_data) if "data" in data else 0, "imported": len(imported_characters) + len(imported_organizations), "skipped": len(skipped), "errors": len(errors) }, "details": { "imported_characters": imported_characters, "imported_organizations": imported_organizations, "skipped": skipped, "errors": errors }, "warnings": warnings } @staticmethod def validate_characters_import(data: Dict) -> Dict[str, Any]: """ 验证角色/组织导入数据 Args: data: 导入的JSON数据 Returns: Dict: 验证结果 """ errors = [] warnings = [] # 检查版本 version = data.get("version", "") if not version: errors.append("缺少版本信息") elif version not in ImportExportService.SUPPORTED_VERSIONS: warnings.append(f"版本不匹配: 导入文件版本为 {version}, 当前支持版本为 {', '.join(ImportExportService.SUPPORTED_VERSIONS)}") # 检查导出类型 export_type = data.get("export_type", "") if export_type != "characters": errors.append(f"导出类型错误: 期望'characters',实际'{export_type}'") # 检查数据字段 if "data" not in data: errors.append("缺少data字段") elif not isinstance(data["data"], list): errors.append("data字段必须是数组") else: characters_data = data["data"] # 统计信息 character_count = sum(1 for c in characters_data if not c.get("is_organization", False)) org_count = sum(1 for c in characters_data if c.get("is_organization", False)) # 检查必填字段 for idx, char_data in enumerate(characters_data): if not char_data.get("name"): errors.append(f"第{idx+1}个角色缺少name字段") statistics = { "characters": character_count, "organizations": org_count } if "data" not in data or errors: statistics = {"characters": 0, "organizations": 0} return { "valid": len(errors) == 0, "version": version, "statistics": statistics, "errors": errors, "warnings": warnings }