"""JSON 处理工具类""" import json import re from typing import Any, Dict, List, Union from app.logger import get_logger logger = get_logger(__name__) def clean_json_response(text: str) -> str: """清洗 AI 返回的 JSON(改进版 - 流式安全)""" try: if not text: logger.warning("⚠️ clean_json_response: 输入为空") return text original_length = len(text) logger.debug(f"🔍 开始清洗JSON,原始长度: {original_length}") # 去除 markdown 代码块 text = re.sub(r'^```json\s*\n?', '', text, flags=re.MULTILINE | re.IGNORECASE) text = re.sub(r'^```\s*\n?', '', text, flags=re.MULTILINE) text = re.sub(r'\n?```\s*$', '', text, flags=re.MULTILINE) text = text.strip() if len(text) != original_length: logger.debug(f" 移除markdown后长度: {len(text)}") # 尝试直接解析(快速路径) try: json.loads(text) logger.debug(f"✅ 直接解析成功,无需清洗") return text except: pass # 找到第一个 { 或 [ start = -1 for i, c in enumerate(text): if c in ('{', '['): start = i break if start == -1: logger.warning(f"⚠️ 未找到JSON起始符号 {{ 或 [") logger.debug(f" 文本预览: {text[:200]}") return text if start > 0: logger.debug(f" 跳过前{start}个字符") text = text[start:] # 改进的括号匹配算法(更严格的字符串处理) stack = [] i = 0 end = -1 while i < len(text): c = text[i] # 处理字符串(关键:正确处理转义) if c == '"': # 计算前面有多少个连续的反斜杠 num_backslashes = 0 j = i - 1 while j >= 0 and text[j] == '\\': num_backslashes += 1 j -= 1 # 偶数个反斜杠(包括0)表示引号未被转义 if num_backslashes % 2 == 0: # 这是字符串边界,跳过整个字符串 i += 1 while i < len(text): if text[i] == '"': # 再次检查转义 num_backslashes = 0 j = i - 1 while j >= 0 and text[j] == '\\': num_backslashes += 1 j -= 1 if num_backslashes % 2 == 0: # 字符串结束 break i += 1 i += 1 continue # 处理括号(只有在字符串外部才有效) if c == '{' or c == '[': stack.append(c) elif c == '}': if len(stack) > 0 and stack[-1] == '{': stack.pop() if len(stack) == 0: end = i + 1 logger.debug(f"✅ 找到JSON结束位置: {end}") break else: logger.warning(f"⚠️ 括号不匹配:遇到 }} 但栈顶是 {stack[-1] if stack else 'empty'}") elif c == ']': if len(stack) > 0 and stack[-1] == '[': stack.pop() if len(stack) == 0: end = i + 1 logger.debug(f"✅ 找到JSON结束位置: {end}") break else: logger.warning(f"⚠️ 括号不匹配:遇到 ] 但栈顶是 {stack[-1] if stack else 'empty'}") i += 1 # 提取结果 if end > 0: result = text[:end] logger.debug(f"✅ JSON清洗完成,结果长度: {len(result)}") else: result = text logger.warning(f"⚠️ 未找到JSON结束位置,返回全部内容(长度: {len(result)})") logger.debug(f" 栈状态: {stack}") # 验证清洗后的结果 try: json.loads(result) logger.debug(f"✅ 清洗后JSON验证成功") except json.JSONDecodeError as e: logger.error(f"❌ 清洗后JSON仍然无效: {e}") logger.debug(f" 结果预览: {result[:500]}") logger.debug(f" 结果结尾: ...{result[-200:]}") return result except Exception as e: logger.error(f"❌ clean_json_response 出错: {e}") logger.error(f" 文本长度: {len(text) if text else 0}") logger.error(f" 文本预览: {text[:200] if text else 'None'}") raise def parse_json(text: str) -> Union[Dict, List]: """解析 JSON""" try: cleaned = clean_json_response(text) return json.loads(cleaned) except Exception as e: logger.error(f"❌ parse_json 出错: {e}") logger.error(f" 原始文本长度: {len(text) if text else 0}") logger.error(f" 清洗后文本长度: {len(cleaned) if cleaned else 0}") raise