Files
MuMuAINovel/backend/app/services/json_helper.py
T

354 lines
12 KiB
Python
Raw Normal View History

"""JSON 处理工具类"""
import json
import re
from typing import Any, Dict, List, Union
from app.logger import get_logger
try:
import json5
HAS_JSON5 = True
except ImportError:
HAS_JSON5 = False
logger = get_logger(__name__)
# 中文引号/括号到ASCII的映射
_QUOTE_MAP = {
'\u201c': '"', # " → "
'\u201d': '"', # " → "
'\u2018': "'", # ' → '
'\u2019': "'", # ' → '
'\u300e': '"', # 『 → "
'\u300f': '"', # 』 → "
'\u300c': '"', # 「 → "
'\u300d': '"', # 」 → "
}
def _fix_json_string_values(text: str) -> str:
"""
修复JSON字符串值中的常见问题:
1. 裸换行符/制表符 → 转义
2. 字符串值内的中文引号 → 转义为ASCII引号(避免破坏JSON结构)
3. 结构位置的中文引号 → 直接替换为ASCII引号
AI生成的JSON常在字符串值中插入未转义的换行符和中文引号。
此函数遍历文本,区分字符串内外,分别处理。
"""
if not text or '"' not in text:
return text
result = []
i = 0
in_string = False
fixed_count = 0
while i < len(text):
c = text[i]
if c == '"' and not in_string:
# 进入字符串
in_string = True
result.append(c)
i += 1
continue
if in_string:
if c == '\\':
# 转义字符,检查下一个字符是否合法
if i + 1 < len(text):
next_c = text[i + 1]
# JSON 合法转义:\" \\ \/ \b \f \n \r \t \uXXXX
if next_c in ('"', '\\', '/', 'b', 'f', 'n', 'r', 't'):
# 合法转义,直接保留
result.append(c)
result.append(next_c)
i += 2
continue
elif next_c == 'u':
# Unicode 转义 \uXXXX,检查是否有4个十六进制字符
if i + 5 < len(text) and all(text[i+2+k] in '0123456789abcdefABCDEF' for k in range(4)):
result.append(text[i:i+6])
i += 6
continue
else:
# 不完整的unicode转义,去掉反斜杠
result.append(next_c)
fixed_count += 1
i += 2
continue
else:
# 非法转义字符(如 \c \p \d 等),去掉反斜杠只保留字符
result.append(next_c)
fixed_count += 1
i += 2
continue
else:
# 末尾孤立的反斜杠,去掉
fixed_count += 1
i += 1
continue
if c == '"':
# 字符串结束
in_string = False
result.append(c)
i += 1
continue
if c == '\n':
# 裸换行符 → 替换为转义换行
result.append('\\')
result.append('n')
fixed_count += 1
i += 1
continue
if c == '\r':
# 裸回车符 → 忽略或替换
if i + 1 < len(text) and text[i + 1] == '\n':
result.append('\\')
result.append('n')
fixed_count += 1
i += 2
else:
result.append('\\')
result.append('n')
fixed_count += 1
i += 1
continue
if c == '\t':
# 裸制表符 → 替换为转义制表符
result.append('\\')
result.append('t')
fixed_count += 1
i += 1
continue
# 字符串值内的中文引号 → 转义为 \"(避免破坏JSON结构)
if c in _QUOTE_MAP:
result.append('\\')
result.append(_QUOTE_MAP[c])
fixed_count += 1
i += 1
continue
# 非字符串内的字符
# 结构位置的中文引号 → 直接替换
if not in_string and c in _QUOTE_MAP:
result.append(_QUOTE_MAP[c])
fixed_count += 1
i += 1
continue
result.append(c)
i += 1
if fixed_count > 0:
logger.debug(f"✅ 修复了{fixed_count}个JSON问题(裸控制字符/中文引号)")
return ''.join(result)
def clean_json_response(text: str) -> str:
"""清洗 AI 返回的 JSON(改进版 - 流式安全)"""
try:
if not text:
logger.warning("⚠️ clean_json_response: 输入为空")
return text
original_length = len(text)
logger.debug(f"🔍 开始清洗JSON,原始长度: {original_length}")
# 替换中文逗号/冒号(AI可能在JSON结构位置使用,全局替换是安全的)
text = text.replace('\uff0c', ',') # ,→ ,
text = text.replace('\uff1a', ':') # :→ :
# 修复JSON中的中文引号和裸控制字符(上下文感知,区分字符串内外)
text = _fix_json_string_values(text)
# 去除 markdown 代码块
text = re.sub(r'^```json\s*\n?', '', text, flags=re.MULTILINE | re.IGNORECASE)
text = re.sub(r'^```\s*\n?', '', text, flags=re.MULTILINE)
text = re.sub(r'\n?```\s*$', '', text, flags=re.MULTILINE)
text = text.strip()
if len(text) != original_length:
logger.debug(f" 移除markdown后长度: {len(text)}")
# 尝试直接解析(快速路径)
try:
json.loads(text)
logger.debug(f"✅ 直接解析成功,无需清洗")
return text
except Exception:
pass
# 找到第一个 { 或 [
start = -1
for i, c in enumerate(text):
if c in ('{', '['):
start = i
break
if start == -1:
logger.warning(f"⚠️ 未找到JSON起始符号 {{ 或 [")
logger.debug(f" 文本预览: {text[:200]}")
return text
if start > 0:
logger.debug(f" 跳过前{start}个字符")
text = text[start:]
# 改进的括号匹配算法(更严格的字符串处理)
stack = []
i = 0
end = -1
2025-12-31 12:02:36 +08:00
in_string = False
while i < len(text):
c = text[i]
2025-12-31 12:02:36 +08:00
# 处理字符串状态
if c == '"':
2025-12-31 12:02:36 +08:00
if not in_string:
# 进入字符串
in_string = True
else:
# 检查是否是转义的引号
num_backslashes = 0
j = i - 1
while j >= 0 and text[j] == '\\':
num_backslashes += 1
j -= 1
# 偶数个反斜杠表示引号未被转义,字符串结束
if num_backslashes % 2 == 0:
in_string = False
2025-12-31 12:02:36 +08:00
i += 1
continue
# 在字符串内部,跳过所有字符
if in_string:
i += 1
continue
# 处理括号(只有在字符串外部才有效)
if c == '{' or c == '[':
stack.append(c)
elif c == '}':
if len(stack) > 0 and stack[-1] == '{':
stack.pop()
if len(stack) == 0:
end = i + 1
logger.debug(f"✅ 找到JSON结束位置: {end}")
break
2025-12-31 12:02:36 +08:00
elif len(stack) > 0:
# 括号不匹配,可能是损坏的JSON,尝试继续
logger.warning(f"⚠️ 括号不匹配:遇到 }} 但栈顶是 {stack[-1]}")
else:
2025-12-31 12:02:36 +08:00
# 栈为空遇到 },忽略多余的闭合括号
logger.warning(f"⚠️ 遇到多余的 }},忽略")
elif c == ']':
if len(stack) > 0 and stack[-1] == '[':
stack.pop()
if len(stack) == 0:
end = i + 1
logger.debug(f"✅ 找到JSON结束位置: {end}")
break
2025-12-31 12:02:36 +08:00
elif len(stack) > 0:
# 括号不匹配,可能是损坏的JSON,尝试继续
logger.warning(f"⚠️ 括号不匹配:遇到 ] 但栈顶是 {stack[-1]}")
else:
2025-12-31 12:02:36 +08:00
# 栈为空遇到 ],忽略多余的闭合括号
logger.warning(f"⚠️ 遇到多余的 ],忽略")
i += 1
2025-12-31 12:02:36 +08:00
# 检查未闭合的字符串
if in_string:
logger.warning(f"⚠️ 字符串未闭合,JSON可能不完整")
# 提取结果
if end > 0:
result = text[:end]
logger.debug(f"✅ JSON清洗完成,结果长度: {len(result)}")
else:
result = text
logger.warning(f"⚠️ 未找到JSON结束位置,返回全部内容(长度: {len(result)}")
logger.debug(f" 栈状态: {stack}")
# 验证清洗后的结果
try:
json.loads(result)
logger.debug(f"✅ 清洗后JSON验证成功")
except json.JSONDecodeError as e:
logger.error(f"❌ 清洗后JSON仍然无效: {e}")
logger.debug(f" 结果预览: {result[:500]}")
logger.debug(f" 结果结尾: ...{result[-200:]}")
return result
except Exception as e:
logger.error(f"❌ clean_json_response 出错: {e}")
logger.error(f" 文本长度: {len(text) if text else 0}")
logger.error(f" 文本预览: {text[:200] if text else 'None'}")
raise
def parse_json(text: str) -> Union[Dict, List]:
"""解析 JSON,优先使用标准json,失败后用json5容错解析"""
cleaned = clean_json_response(text)
# 优先使用标准 json
try:
return json.loads(cleaned)
except (json.JSONDecodeError, Exception):
pass
# json5 容错解析(处理单引号、多余逗号、宽松格式等)
if HAS_JSON5:
try:
logger.info("🔄 标准JSON解析失败,使用json5容错解析")
result = json5.loads(cleaned)
logger.info("✅ json5容错解析成功")
return result
except Exception as e5:
logger.error(f"❌ json5容错解析也失败: {e5}")
# 最终失败
logger.error(f"❌ parse_json 完全失败")
logger.error(f" 原始文本长度: {len(text) if text else 0}")
logger.error(f" 清洗后文本长度: {len(cleaned) if cleaned else 0}")
logger.debug(f" 清洗后文本预览: {cleaned[:500] if cleaned else 'None'}")
raise json.JSONDecodeError("JSON解析失败(标准和json5均失败)", cleaned, 0)
def loads_json(text: str) -> Any:
"""
json.loads 的容错替代品,可直接替换 json.loads()。
优先用标准 json.loads,失败后自动降级到 json5。
适用于解析 AI 返回的、可能包含不规范格式的 JSON。
"""
# 优先使用标准 json
try:
return json.loads(text)
except (json.JSONDecodeError, Exception):
pass
# json5 容错解析
if HAS_JSON5:
try:
logger.info("🔄 json.loads失败,使用json5容错解析")
result = json5.loads(text)
logger.info("✅ json5容错解析成功")
return result
except Exception as e5:
logger.error(f"❌ json5容错解析也失败: {e5}")
# 最终失败,抛出标准异常
raise json.JSONDecodeError("JSON解析失败(标准和json5均失败)", text, 0)