From a6e6df5073d89febf0f2dd1f13f91427d13277f2 Mon Sep 17 00:00:00 2001 From: xiamuceer Date: Wed, 18 Mar 2026 12:35:13 +0800 Subject: [PATCH] =?UTF-8?q?feature:=20=E6=96=B0=E5=A2=9EAPI=E8=B0=83?= =?UTF-8?q?=E7=94=A8=E6=97=A5=E5=BF=97=E7=BB=9F=E8=AE=A1=EF=BC=8C=E9=A6=96?= =?UTF-8?q?=E5=AD=97=EF=BC=8C=E6=80=BB=E8=80=97=E6=97=B6=EF=BC=8Ctoken?= =?UTF-8?q?=E6=B6=88=E8=80=97=E7=AD=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/logger.py | 4 +- .../services/ai_clients/anthropic_client.py | 9 + .../app/services/ai_clients/gemini_client.py | 20 +- .../app/services/ai_clients/openai_client.py | 16 + backend/app/services/ai_metrics.py | 186 ++++++++++++ .../ai_providers/anthropic_provider.py | 15 +- .../services/ai_providers/gemini_provider.py | 15 +- .../services/ai_providers/openai_provider.py | 15 +- backend/app/services/ai_service.py | 280 ++++++++++++++---- 9 files changed, 491 insertions(+), 69 deletions(-) create mode 100644 backend/app/services/ai_metrics.py diff --git a/backend/app/logger.py b/backend/app/logger.py index 475451d..f33f957 100644 --- a/backend/app/logger.py +++ b/backend/app/logger.py @@ -144,8 +144,8 @@ def _configure_third_party_loggers(): logging.getLogger('openai').setLevel(logging.WARNING) logging.getLogger('anthropic').setLevel(logging.WARNING) - # 应用模块 - 可根据需要调整 - logging.getLogger('app.services.ai_service').setLevel(logging.WARNING) + # 应用模块 - AI 统计日志需要保留 INFO 级别输出 + logging.getLogger('app.services.ai_service').setLevel(logging.INFO) logging.getLogger('app.api.wizard').setLevel(logging.WARNING) diff --git a/backend/app/services/ai_clients/anthropic_client.py b/backend/app/services/ai_clients/anthropic_client.py index 7bde5ff..ff37b75 100644 --- a/backend/app/services/ai_clients/anthropic_client.py +++ b/backend/app/services/ai_clients/anthropic_client.py @@ -58,10 +58,19 @@ class AnthropicClient: elif block.type == "text": content += block.text + usage = getattr(response, "usage", None) return { "content": content, "tool_calls": tool_calls if tool_calls else None, "finish_reason": response.stop_reason, + "usage": { + "prompt_tokens": getattr(usage, "input_tokens", None), + "completion_tokens": getattr(usage, "output_tokens", None), + "total_tokens": ( + (getattr(usage, "input_tokens", 0) or 0) + + (getattr(usage, "output_tokens", 0) or 0) + ) if usage else None, + }, } async def chat_completion_stream( diff --git a/backend/app/services/ai_clients/gemini_client.py b/backend/app/services/ai_clients/gemini_client.py index 8497652..b52fc3f 100644 --- a/backend/app/services/ai_clients/gemini_client.py +++ b/backend/app/services/ai_clients/gemini_client.py @@ -98,10 +98,19 @@ class GeminiClient: "function": {"name": fc["name"], "arguments": fc.get("args", {})} }) + usage = data.get("usageMetadata") or {} + prompt_tokens = usage.get("promptTokenCount") + completion_tokens = usage.get("candidatesTokenCount") + total_tokens = usage.get("totalTokenCount") return { "content": text, "tool_calls": tool_calls if tool_calls else None, - "finish_reason": "tool_calls" if tool_calls else "stop" + "finish_reason": "tool_calls" if tool_calls else "stop", + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + } } async def chat_completion_stream( @@ -148,6 +157,15 @@ class GeminiClient: import json try: data = json.loads(line[6:]) + usage = data.get("usageMetadata") or {} + if usage: + yield { + "usage": { + "prompt_tokens": usage.get("promptTokenCount"), + "completion_tokens": usage.get("candidatesTokenCount"), + "total_tokens": usage.get("totalTokenCount"), + } + } candidates = data.get("candidates", []) if candidates and len(candidates) > 0: parts = candidates[0].get("content", {}).get("parts", []) diff --git a/backend/app/services/ai_clients/openai_client.py b/backend/app/services/ai_clients/openai_client.py index 6b6c690..db0a013 100644 --- a/backend/app/services/ai_clients/openai_client.py +++ b/backend/app/services/ai_clients/openai_client.py @@ -74,10 +74,16 @@ class OpenAIClient(BaseAIClient): choice = choices[0] message = choice.get("message", {}) + usage = data.get("usage") or {} return { "content": message.get("content", ""), "tool_calls": message.get("tool_calls"), "finish_reason": choice.get("finish_reason"), + "usage": { + "prompt_tokens": usage.get("prompt_tokens"), + "completion_tokens": usage.get("completion_tokens"), + "total_tokens": usage.get("total_tokens"), + }, } async def chat_completion_stream( @@ -138,6 +144,16 @@ class OpenAIClient(BaseAIClient): existing["function"].get("arguments", "") + tc["function"]["arguments"] ) + + usage = data.get("usage") + if usage: + yield { + "usage": { + "prompt_tokens": usage.get("prompt_tokens"), + "completion_tokens": usage.get("completion_tokens"), + "total_tokens": usage.get("total_tokens"), + } + } if content: yield {"content": content} diff --git a/backend/app/services/ai_metrics.py b/backend/app/services/ai_metrics.py new file mode 100644 index 0000000..416cc44 --- /dev/null +++ b/backend/app/services/ai_metrics.py @@ -0,0 +1,186 @@ +"""AI 调用统计与中文日志格式化工具""" +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +@dataclass +class TokenUsage: + """Token 使用量统计""" + + prompt_tokens: Optional[int] = None + completion_tokens: Optional[int] = None + total_tokens: Optional[int] = None + + @classmethod + def from_response(cls, response: Optional[Dict[str, Any]]) -> "TokenUsage": + """从响应中提取 usage 信息""" + if not response: + return cls() + + usage = response.get("usage") or {} + prompt_tokens = cls._to_int(usage.get("prompt_tokens")) + completion_tokens = cls._to_int(usage.get("completion_tokens")) + total_tokens = cls._to_int(usage.get("total_tokens")) + + return cls( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + ) + + @staticmethod + def _to_int(value: Any) -> Optional[int]: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + def add(self, other: "TokenUsage") -> None: + """累加另一个 usage""" + self.prompt_tokens = self._sum_optional(self.prompt_tokens, other.prompt_tokens) + self.completion_tokens = self._sum_optional(self.completion_tokens, other.completion_tokens) + self.total_tokens = self._sum_optional(self.total_tokens, other.total_tokens) + + @staticmethod + def _sum_optional(left: Optional[int], right: Optional[int]) -> Optional[int]: + if left is None and right is None: + return None + return (left or 0) + (right or 0) + + +@dataclass +class ToolCallMetrics: + """MCP 工具调用统计""" + + tool_calls_count: int = 0 + mcp_rounds: int = 0 + tool_error_count: int = 0 + tool_names: List[str] = field(default_factory=list) + usage: TokenUsage = field(default_factory=TokenUsage) + + def add_tool_name(self, tool_name: str) -> None: + if tool_name and tool_name not in self.tool_names: + self.tool_names.append(tool_name) + + +@dataclass +class AICallMetrics: + """单次 AI 调用统计""" + + request_mode: str + provider: str + model: str + user_id: Optional[str] = None + stream: bool = False + auto_mcp: bool = False + tools_count: int = 0 + prompt_length: int = 0 + response_length: int = 0 + chunk_count: int = 0 + retry_count: int = 0 + json_parse_success: Optional[bool] = None + finish_reason: Optional[str] = None + success: bool = False + error_type: Optional[str] = None + error_message: Optional[str] = None + ttft_ms: Optional[int] = None + duration_ms: Optional[int] = None + has_output: bool = False + usage: TokenUsage = field(default_factory=TokenUsage) + tool_metrics: ToolCallMetrics = field(default_factory=ToolCallMetrics) + started_at: float = field(default_factory=time.perf_counter) + first_chunk_at: Optional[float] = None + + def mark_first_chunk(self) -> None: + if self.first_chunk_at is None: + self.first_chunk_at = time.perf_counter() + self.ttft_ms = int((self.first_chunk_at - self.started_at) * 1000) + + def finish( + self, + *, + success: bool, + response_length: Optional[int] = None, + finish_reason: Optional[str] = None, + usage: Optional[TokenUsage] = None, + error: Optional[BaseException] = None, + ) -> None: + self.success = success + self.duration_ms = int((time.perf_counter() - self.started_at) * 1000) + if response_length is not None: + self.response_length = response_length + self.has_output = self.response_length > 0 + if finish_reason is not None: + self.finish_reason = finish_reason + if usage is not None: + self.usage = usage + if error is not None: + self.error_type = type(error).__name__ + self.error_message = self._truncate(str(error), 180) + + def merge_tool_metrics(self, tool_metrics: ToolCallMetrics) -> None: + self.tool_metrics = tool_metrics + self.usage.add(tool_metrics.usage) + + def to_log_message(self, title: str) -> str: + fields = [ + ("请求类型", self.request_mode), + ("提供商", self.provider), + ("模型", self.model), + ("状态", "成功" if self.success else "失败"), + ("首字耗时", self._format_latency(self.ttft_ms, allow_empty=True)), + ("总耗时", self._format_latency(self.duration_ms, allow_empty=False)), + ("输入字符数", str(self.prompt_length)), + ("输出字符数", str(self.response_length)), + ("输入Token", self._format_optional_number(self.usage.prompt_tokens)), + ("输出Token", self._format_optional_number(self.usage.completion_tokens)), + ("总Token", self._format_optional_number(self.usage.total_tokens)), + ("流式块数", str(self.chunk_count) if self.stream else "不适用"), + ("启用MCP", "是" if self.auto_mcp else "否"), + ("工具数", str(self.tools_count)), + ("工具调用次数", str(self.tool_metrics.tool_calls_count)), + ("MCP轮次", str(self.tool_metrics.mcp_rounds)), + ("重试次数", str(self.retry_count) if self.retry_count else "0"), + ("JSON解析", self._format_json_parse_result()), + ("结束原因", self.finish_reason or "未知"), + ] + + if self.user_id: + fields.append(("用户ID", self.user_id)) + if self.tool_metrics.tool_names: + fields.append(("工具名称", ",".join(self.tool_metrics.tool_names))) + if self.error_type: + fields.append(("异常类型", self.error_type)) + if self.error_message: + fields.append(("异常摘要", self.error_message)) + + formatted = "|".join(f"{key}={value}" for key, value in fields) + return f"{title}|{formatted}" + + def _format_json_parse_result(self) -> str: + if self.json_parse_success is None: + return "不适用" + return "成功" if self.json_parse_success else "失败" + + @staticmethod + def _format_optional_number(value: Optional[int]) -> str: + return str(value) if value is not None else "未知" + + @staticmethod + def _format_latency(value: Optional[int], allow_empty: bool) -> str: + if value is None: + return "无" if allow_empty else "未知" + if value < 1000: + return f"{value}ms" + return f"{value / 1000:.2f}s" + + @staticmethod + def _truncate(text: str, limit: int) -> str: + if len(text) <= limit: + return text + return f"{text[:limit]}..." diff --git a/backend/app/services/ai_providers/anthropic_provider.py b/backend/app/services/ai_providers/anthropic_provider.py index ce11207..66b1e4a 100644 --- a/backend/app/services/ai_providers/anthropic_provider.py +++ b/backend/app/services/ai_providers/anthropic_provider.py @@ -90,7 +90,12 @@ class AnthropicProvider(BaseAIProvider): final_messages, model, temperature, max_tokens, system_prompt, tools, user_id ): yield final_chunk + if chunk.get("finish_reason"): + yield {"finish_reason": chunk.get("finish_reason"), "done": True} break + + if chunk.get("usage"): + yield {"usage": chunk.get("usage")} # 输出文本内容 if chunk.get("content"): @@ -106,8 +111,11 @@ class AnthropicProvider(BaseAIProvider): max_tokens=max_tokens, system_prompt=system_prompt, ): - # 确保只 yield 字符串内容,避免 yield 字典导致类型错误 if isinstance(chunk, dict): + if chunk.get("usage"): + yield {"usage": chunk.get("usage")} + if chunk.get("finish_reason"): + yield {"finish_reason": chunk.get("finish_reason")} if chunk.get("content"): yield chunk["content"] else: @@ -155,7 +163,12 @@ class AnthropicProvider(BaseAIProvider): messages, model, temperature, max_tokens, system_prompt, tools, user_id ): yield final_chunk + if chunk.get("finish_reason"): + yield {"finish_reason": chunk.get("finish_reason"), "done": True} break + + if chunk.get("usage"): + yield {"usage": chunk.get("usage")} if chunk.get("content"): yield chunk["content"] \ No newline at end of file diff --git a/backend/app/services/ai_providers/gemini_provider.py b/backend/app/services/ai_providers/gemini_provider.py index 6efc04d..e38af40 100644 --- a/backend/app/services/ai_providers/gemini_provider.py +++ b/backend/app/services/ai_providers/gemini_provider.py @@ -88,7 +88,12 @@ class GeminiProvider(BaseAIProvider): final_messages, model, temperature, max_tokens, system_prompt, tools, user_id ): yield final_chunk + if chunk.get("finish_reason"): + yield {"finish_reason": chunk.get("finish_reason"), "done": True} break + + if chunk.get("usage"): + yield {"usage": chunk.get("usage")} # 输出文本内容 if chunk.get("content"): @@ -104,8 +109,11 @@ class GeminiProvider(BaseAIProvider): max_tokens=max_tokens, system_prompt=system_prompt, ): - # 确保只 yield 字符串内容,避免 yield 字典导致类型错误 if isinstance(chunk, dict): + if chunk.get("usage"): + yield {"usage": chunk.get("usage")} + if chunk.get("finish_reason"): + yield {"finish_reason": chunk.get("finish_reason")} if chunk.get("content"): yield chunk["content"] else: @@ -153,7 +161,12 @@ class GeminiProvider(BaseAIProvider): messages, model, temperature, max_tokens, system_prompt, tools, user_id ): yield final_chunk + if chunk.get("finish_reason"): + yield {"finish_reason": chunk.get("finish_reason"), "done": True} break + + if chunk.get("usage"): + yield {"usage": chunk.get("usage")} if chunk.get("content"): yield chunk["content"] \ No newline at end of file diff --git a/backend/app/services/ai_providers/openai_provider.py b/backend/app/services/ai_providers/openai_provider.py index ddf5d90..9089429 100644 --- a/backend/app/services/ai_providers/openai_provider.py +++ b/backend/app/services/ai_providers/openai_provider.py @@ -97,8 +97,13 @@ class OpenAIProvider(BaseAIProvider): final_messages, model, temperature, max_tokens, tools, user_id ): yield final_chunk + if chunk.get("finish_reason"): + yield {"finish_reason": chunk.get("finish_reason"), "done": True} break + if chunk.get("usage"): + yield {"usage": chunk.get("usage")} + # 输出文本内容 if chunk.get("content"): yield chunk["content"] @@ -111,8 +116,11 @@ class OpenAIProvider(BaseAIProvider): temperature=temperature, max_tokens=max_tokens, ): - # 确保只 yield 字符串内容,避免 yield 字典导致类型错误 if isinstance(chunk, dict): + if chunk.get("usage"): + yield {"usage": chunk.get("usage")} + if chunk.get("finish_reason"): + yield {"finish_reason": chunk.get("finish_reason")} if chunk.get("content"): yield chunk["content"] else: @@ -155,7 +163,12 @@ class OpenAIProvider(BaseAIProvider): break if chunk.get("done"): + if chunk.get("finish_reason"): + yield {"finish_reason": chunk.get("finish_reason"), "done": True} break + + if chunk.get("usage"): + yield {"usage": chunk.get("usage")} if chunk.get("content"): yield chunk["content"] \ No newline at end of file diff --git a/backend/app/services/ai_service.py b/backend/app/services/ai_service.py index f3eb706..d31e6f5 100644 --- a/backend/app/services/ai_service.py +++ b/backend/app/services/ai_service.py @@ -10,6 +10,7 @@ from typing import Optional, AsyncGenerator, List, Dict, Any, Union from app.config import settings as app_settings from app.logger import get_logger from app.services.ai_config import AIClientConfig, default_config +from app.services.ai_metrics import AICallMetrics, TokenUsage, ToolCallMetrics from app.services.ai_clients.openai_client import OpenAIClient from app.services.ai_clients.anthropic_client import AnthropicClient from app.services.ai_clients.gemini_client import GeminiClient @@ -163,6 +164,36 @@ class AIService: return self._gemini_provider raise ValueError(f"Provider {p} 未初始化") + def _build_call_metrics( + self, + *, + request_mode: str, + provider: Optional[str], + model: Optional[str], + prompt: str, + auto_mcp: bool, + tools_count: int, + stream: bool, + ) -> AICallMetrics: + return AICallMetrics( + request_mode=request_mode, + provider=normalize_provider(provider or self.api_provider) or "unknown", + model=model or self.default_model, + user_id=self.user_id, + stream=stream, + auto_mcp=auto_mcp, + tools_count=tools_count, + prompt_length=len(prompt or ""), + ) + + def _log_call_metrics(self, metrics: AICallMetrics, title: Optional[str] = None): + log_title = title or ("AI调用完成" if metrics.success else "AI调用失败") + message = metrics.to_log_message(log_title) + if metrics.success: + logger.info(message) + else: + logger.error(message) + async def _prepare_mcp_tools(self, auto_mcp: bool = True, force_refresh: bool = False) -> Optional[List[Dict]]: """ 预处理MCP工具 @@ -255,19 +286,24 @@ class AIService: tool_calls = response.get("tool_calls", []) if not tool_calls or not self.user_id: return response + + tool_metrics = ToolCallMetrics() + tool_metrics.usage.add(TokenUsage.from_response(response)) result = { "content": response.get("content", ""), "tool_calls_made": 0, "tools_used": [], "finish_reason": response.get("finish_reason", ""), - "mcp_enhanced": True + "mcp_enhanced": True, + "usage": response.get("usage"), } prompt = original_prompt for round_num in range(max_rounds): logger.info(f"🔧 工具调用 - 第{round_num+1}/{max_rounds}轮,{len(tool_calls)}个工具") + tool_metrics.mcp_rounds += 1 try: # 批量执行工具调用 @@ -279,9 +315,11 @@ class AIService: # 记录使用的工具 for tc in tool_calls: name = tc["function"]["name"] + tool_metrics.add_tool_name(name) if name not in result["tools_used"]: result["tools_used"].append(name) result["tool_calls_made"] += len(tool_calls) + tool_metrics.tool_calls_count += len(tool_calls) # 构建工具上下文 tool_context = mcp_client.build_tool_context(tool_results, format="markdown") @@ -306,6 +344,7 @@ class AIService: tools=None if tool_choice == "none" else self._cached_tools, tool_choice=tool_choice, ) + tool_metrics.usage.add(TokenUsage.from_response(next_response)) tool_calls = next_response.get("tool_calls", []) @@ -313,13 +352,26 @@ class AIService: # 没有更多工具调用,返回结果 result["content"] = next_response.get("content", "") result["finish_reason"] = next_response.get("finish_reason", "stop") + result["usage"] = { + "prompt_tokens": tool_metrics.usage.prompt_tokens, + "completion_tokens": tool_metrics.usage.completion_tokens, + "total_tokens": tool_metrics.usage.total_tokens, + } break except Exception as e: logger.error(f"❌ 工具调用失败: {e}") + tool_metrics.tool_error_count += 1 result["content"] = response.get("content", "") result["finish_reason"] = "tool_error" + result["usage"] = { + "prompt_tokens": tool_metrics.usage.prompt_tokens, + "completion_tokens": tool_metrics.usage.completion_tokens, + "total_tokens": tool_metrics.usage.total_tokens, + } break + + result["__tool_metrics"] = tool_metrics return result @@ -363,33 +415,60 @@ class AIService: # 自动加载MCP工具 if auto_mcp and tools is None: tools = await self._prepare_mcp_tools(auto_mcp=auto_mcp) - - prov = self._get_provider(provider) - response = await prov.generate( + + metrics = self._build_call_metrics( + request_mode="文本", + provider=provider, + model=model, prompt=prompt, - model=model or self.default_model, - temperature=temperature or self.default_temperature, - max_tokens=max_tokens or self.default_max_tokens, - system_prompt=system_prompt or self.default_system_prompt, - tools=tools, - tool_choice=tool_choice, + auto_mcp=auto_mcp, + tools_count=len(tools) if tools else 0, + stream=False, ) - # 处理工具调用 - if handle_tool_calls and response.get("tool_calls"): - return await self._handle_tool_calls( - original_prompt=prompt, - response=response, - provider=provider, - model=model, - temperature=temperature, - max_tokens=max_tokens, - system_prompt=system_prompt, + try: + prov = self._get_provider(provider) + response = await prov.generate( + prompt=prompt, + model=model or self.default_model, + temperature=temperature or self.default_temperature, + max_tokens=max_tokens or self.default_max_tokens, + system_prompt=system_prompt or self.default_system_prompt, + tools=tools, tool_choice=tool_choice, - max_rounds=mcp_max_rounds, ) - - return response + usage = TokenUsage.from_response(response) + + # 处理工具调用 + if handle_tool_calls and response.get("tool_calls"): + response = await self._handle_tool_calls( + original_prompt=prompt, + response=response, + provider=provider, + model=model, + temperature=temperature, + max_tokens=max_tokens, + system_prompt=system_prompt, + tool_choice=tool_choice, + max_rounds=mcp_max_rounds, + ) + usage = TokenUsage.from_response(response) + tool_metrics = response.get("__tool_metrics") + if tool_metrics: + metrics.merge_tool_metrics(tool_metrics) + + metrics.finish( + success=True, + response_length=len(response.get("content", "") or ""), + finish_reason=response.get("finish_reason"), + usage=usage, + ) + self._log_call_metrics(metrics) + return response + except Exception as e: + metrics.finish(success=False, error=e) + self._log_call_metrics(metrics) + raise async def generate_text_stream( self, @@ -431,21 +510,64 @@ class AIService: tools_to_use = await self._prepare_mcp_tools(auto_mcp=auto_mcp) if tools_to_use: logger.info(f"🔧 已获取 {len(tools_to_use)} 个MCP工具") - - # 流式生成(Provider 层处理工具调用) - prov = self._get_provider(provider) - logger.debug(f"🔧 开始流式生成,provider={provider or self.api_provider}, tools_count={len(tools_to_use) if tools_to_use else 0}") - async for chunk in prov.generate_stream( + + metrics = self._build_call_metrics( + request_mode="流式文本", + provider=provider, + model=model, prompt=prompt, - model=model or self.default_model, - temperature=temperature or self.default_temperature, - max_tokens=max_tokens or self.default_max_tokens, - system_prompt=system_prompt or self.default_system_prompt, - tools=tools_to_use, - tool_choice=tool_choice, - user_id=self.user_id, - ): - yield chunk + auto_mcp=auto_mcp, + tools_count=len(tools_to_use) if tools_to_use else 0, + stream=True, + ) + response_parts: List[str] = [] + latest_usage = TokenUsage() + finish_reason = "stop" + + try: + # 流式生成(Provider 层处理工具调用) + prov = self._get_provider(provider) + logger.debug(f"🔧 开始流式生成,provider={provider or self.api_provider}, tools_count={len(tools_to_use) if tools_to_use else 0}") + async for chunk in prov.generate_stream( + prompt=prompt, + model=model or self.default_model, + temperature=temperature or self.default_temperature, + max_tokens=max_tokens or self.default_max_tokens, + system_prompt=system_prompt or self.default_system_prompt, + tools=tools_to_use, + tool_choice=tool_choice, + user_id=self.user_id, + ): + if isinstance(chunk, dict): + if chunk.get("usage"): + latest_usage = TokenUsage.from_response({"usage": chunk.get("usage")}) + if chunk.get("finish_reason"): + finish_reason = chunk.get("finish_reason") or finish_reason + continue + + if chunk: + metrics.mark_first_chunk() + metrics.chunk_count += 1 + response_parts.append(chunk) + yield chunk + + metrics.finish( + success=True, + response_length=len("".join(response_parts)), + finish_reason=finish_reason, + usage=latest_usage, + ) + self._log_call_metrics(metrics) + except Exception as e: + metrics.finish( + success=False, + response_length=len("".join(response_parts)), + finish_reason=finish_reason, + usage=latest_usage, + error=e, + ) + self._log_call_metrics(metrics) + raise async def call_with_json_retry( self, @@ -477,35 +599,67 @@ class AIService: 解析后的JSON数据 """ last_response = "" + aggregate_usage = TokenUsage() + metrics = self._build_call_metrics( + request_mode="JSON重试", + provider=provider, + model=model, + prompt=prompt, + auto_mcp=auto_mcp, + tools_count=0, + stream=False, + ) - for attempt in range(1, max_retries + 1): - current_prompt = prompt if attempt == 1 else self._add_json_hint(prompt, last_response, attempt) + try: + for attempt in range(1, max_retries + 1): + current_prompt = prompt if attempt == 1 else self._add_json_hint(prompt, last_response, attempt) + + result = await self.generate_text( + prompt=current_prompt, + provider=provider, + model=model, + temperature=temperature, + max_tokens=max_tokens, + system_prompt=system_prompt, + auto_mcp=auto_mcp, + handle_tool_calls=True, + ) + aggregate_usage.add(TokenUsage.from_response(result)) + metrics.retry_count = attempt + metrics.tools_count = max(metrics.tools_count, len(self._cached_tools) if self._cached_tools else 0) + + last_response = result.get("content", "") + + try: + data = parse_json(last_response) + if expected_type == "object" and not isinstance(data, dict): + raise ValueError("期望对象") + if expected_type == "array" and not isinstance(data, list): + raise ValueError("期望数组") + metrics.json_parse_success = True + metrics.finish( + success=True, + response_length=len(last_response), + finish_reason=result.get("finish_reason"), + usage=aggregate_usage, + ) + self._log_call_metrics(metrics, title="AI调用汇总") + return data + except Exception as e: + metrics.json_parse_success = False + if attempt == max_retries: + raise ValueError(f"JSON 解析失败: {e}") - result = await self.generate_text( - prompt=current_prompt, - provider=provider, - model=model, - temperature=temperature, - max_tokens=max_tokens, - system_prompt=system_prompt, - auto_mcp=auto_mcp, - handle_tool_calls=True, + raise ValueError("JSON 调用失败") + except Exception as e: + metrics.finish( + success=False, + response_length=len(last_response), + usage=aggregate_usage, + error=e, ) - - last_response = result.get("content", "") - - try: - data = parse_json(last_response) - if expected_type == "object" and not isinstance(data, dict): - raise ValueError("期望对象") - if expected_type == "array" and not isinstance(data, list): - raise ValueError("期望数组") - return data - except Exception as e: - if attempt == max_retries: - raise ValueError(f"JSON 解析失败: {e}") - - raise ValueError("JSON 调用失败") + self._log_call_metrics(metrics, title="AI调用汇总") + raise @staticmethod def _add_json_hint(prompt: str, failed: str, attempt: int) -> str: