Files
MuMuAINovel/backend/app/services/json_helper.py
T
2026-02-25 04:23:42 +00:00

159 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""JSON 处理工具类"""
import json
import re
from typing import Any, Dict, List, Union
from app.logger import get_logger
logger = get_logger(__name__)
def clean_json_response(text: str) -> str:
"""清洗 AI 返回的 JSON(改进版 - 流式安全)"""
try:
if not text:
logger.warning("⚠️ clean_json_response: 输入为空")
return text
original_length = len(text)
logger.debug(f"🔍 开始清洗JSON,原始长度: {original_length}")
# 去除 markdown 代码块
text = re.sub(r'^```json\s*\n?', '', text, flags=re.MULTILINE | re.IGNORECASE)
text = re.sub(r'^```\s*\n?', '', text, flags=re.MULTILINE)
text = re.sub(r'\n?```\s*$', '', text, flags=re.MULTILINE)
text = text.strip()
if len(text) != original_length:
logger.debug(f" 移除markdown后长度: {len(text)}")
# 尝试直接解析(快速路径)
try:
json.loads(text)
logger.debug(f"✅ 直接解析成功,无需清洗")
return text
except Exception:
pass
# 找到第一个 { 或 [
start = -1
for i, c in enumerate(text):
if c in ('{', '['):
start = i
break
if start == -1:
logger.warning(f"⚠️ 未找到JSON起始符号 {{ 或 [")
logger.debug(f" 文本预览: {text[:200]}")
return text
if start > 0:
logger.debug(f" 跳过前{start}个字符")
text = text[start:]
# 改进的括号匹配算法(更严格的字符串处理)
stack = []
i = 0
end = -1
in_string = False
while i < len(text):
c = text[i]
# 处理字符串状态
if c == '"':
if not in_string:
# 进入字符串
in_string = True
else:
# 检查是否是转义的引号
num_backslashes = 0
j = i - 1
while j >= 0 and text[j] == '\\':
num_backslashes += 1
j -= 1
# 偶数个反斜杠表示引号未被转义,字符串结束
if num_backslashes % 2 == 0:
in_string = False
i += 1
continue
# 在字符串内部,跳过所有字符
if in_string:
i += 1
continue
# 处理括号(只有在字符串外部才有效)
if c == '{' or c == '[':
stack.append(c)
elif c == '}':
if len(stack) > 0 and stack[-1] == '{':
stack.pop()
if len(stack) == 0:
end = i + 1
logger.debug(f"✅ 找到JSON结束位置: {end}")
break
elif len(stack) > 0:
# 括号不匹配,可能是损坏的JSON,尝试继续
logger.warning(f"⚠️ 括号不匹配:遇到 }} 但栈顶是 {stack[-1]}")
else:
# 栈为空遇到 },忽略多余的闭合括号
logger.warning(f"⚠️ 遇到多余的 }},忽略")
elif c == ']':
if len(stack) > 0 and stack[-1] == '[':
stack.pop()
if len(stack) == 0:
end = i + 1
logger.debug(f"✅ 找到JSON结束位置: {end}")
break
elif len(stack) > 0:
# 括号不匹配,可能是损坏的JSON,尝试继续
logger.warning(f"⚠️ 括号不匹配:遇到 ] 但栈顶是 {stack[-1]}")
else:
# 栈为空遇到 ],忽略多余的闭合括号
logger.warning(f"⚠️ 遇到多余的 ],忽略")
i += 1
# 检查未闭合的字符串
if in_string:
logger.warning(f"⚠️ 字符串未闭合,JSON可能不完整")
# 提取结果
if end > 0:
result = text[:end]
logger.debug(f"✅ JSON清洗完成,结果长度: {len(result)}")
else:
result = text
logger.warning(f"⚠️ 未找到JSON结束位置,返回全部内容(长度: {len(result)}")
logger.debug(f" 栈状态: {stack}")
# 验证清洗后的结果
try:
json.loads(result)
logger.debug(f"✅ 清洗后JSON验证成功")
except json.JSONDecodeError as e:
logger.error(f"❌ 清洗后JSON仍然无效: {e}")
logger.debug(f" 结果预览: {result[:500]}")
logger.debug(f" 结果结尾: ...{result[-200:]}")
return result
except Exception as e:
logger.error(f"❌ clean_json_response 出错: {e}")
logger.error(f" 文本长度: {len(text) if text else 0}")
logger.error(f" 文本预览: {text[:200] if text else 'None'}")
raise
def parse_json(text: str) -> Union[Dict, List]:
"""解析 JSON"""
try:
cleaned = clean_json_response(text)
return json.loads(cleaned)
except Exception as e:
logger.error(f"❌ parse_json 出错: {e}")
logger.error(f" 原始文本长度: {len(text) if text else 0}")
logger.error(f" 清洗后文本长度: {len(cleaned) if cleaned else 0}")
raise