Files
snemc 980cc5b0e5 fix: 修复MCP插件创建和测试时的500错误
问题:MCP SDK使用anyio TaskGroup,与FastAPI请求上下文不兼容,
导致在请求中直接await MCP操作时报RuntimeError: No response returned

解决方案:
- 将MCP连接操作放到后台任务执行,避免阻塞请求
- 添加is_registered()和get_session_status()同步检查方法
- 测试时先检查会话是否存在,不存在则触发后台注册
- 改进ExceptionGroup错误处理,显示详细错误信息
- 状态同步改用异步队列,避免阻塞

修改文件:
- backend/app/api/mcp_plugins.py: 重写test_plugin和create_plugin_simple
- backend/app/mcp/facade.py: 添加同步检查方法和改进错误处理
- backend/app/mcp/status_sync.py: 使用异步队列同步状态
- backend/app/services/mcp_test_service.py: 使用同步检查代替异步ensure
2026-01-24 10:03:59 +08:00

336 lines
13 KiB
Python

"""MCP插件测试服务 - 专门处理插件测试逻辑
重构后使用统一的MCPClientFacade门面来管理所有MCP操作。
"""
import time
import json
from typing import Dict, Any, Optional
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.mcp_plugin import MCPPlugin
from app.models.settings import Settings as UserSettings
from app.mcp import mcp_client, MCPPluginConfig # 使用新的统一门面
from app.services.ai_service import create_user_ai_service
from app.schemas.mcp_plugin import MCPTestResult
from app.services.prompt_service import prompt_service
from app.logger import get_logger
from app.user_manager import User
logger = get_logger(__name__)
class MCPTestService:
"""MCP插件测试服务(使用统一门面重构)"""
def _check_plugin_registered(self, plugin: MCPPlugin, user_id: str) -> bool:
"""
检查插件是否已注册(同步方法,不触发新的连接)
Args:
plugin: 插件配置
user_id: 用户ID
Returns:
是否已注册
"""
return mcp_client.is_registered(user_id, plugin.plugin_name)
async def test_plugin_connection(
self,
plugin: MCPPlugin,
user_id: str
) -> MCPTestResult:
"""
简单连接测试
注意:调用此方法前,需要确保插件已通过后台任务注册。
Args:
plugin: 插件配置
user_id: 用户ID
Returns:
测试结果
"""
start_time = time.time()
try:
# 检查插件是否已注册(不触发新连接)
if not self._check_plugin_registered(plugin, user_id):
return MCPTestResult(
success=False,
message="插件未注册",
error="MCP会话不存在,请先启用插件",
suggestions=["请先启用插件", "如果已启用,请稍等片刻后重试"]
)
# 使用统一门面测试连接
test_result = await mcp_client.test_connection(user_id, plugin.plugin_name)
end_time = time.time()
response_time = round((end_time - start_time) * 1000, 2)
if test_result["success"]:
return MCPTestResult(
success=True,
message=f"✅ 连接测试成功",
response_time_ms=response_time,
tools_count=test_result.get("tools_count", 0),
suggestions=[
f"响应时间: {response_time}ms",
f"可用工具数: {test_result.get('tools_count', 0)}"
]
)
else:
return MCPTestResult(
success=False,
message="❌ 连接测试失败",
response_time_ms=response_time,
error=test_result.get("message", "未知错误"),
error_type=test_result.get("error_type"),
suggestions=[
"请检查服务器是否在线",
"请确认配置正确",
"请检查API Key是否有效"
]
)
except Exception as e:
end_time = time.time()
response_time = round((end_time - start_time) * 1000, 2)
logger.error(f"测试插件失败: {plugin.plugin_name}, 错误: {e}")
return MCPTestResult(
success=False,
message="❌ 测试失败",
response_time_ms=response_time,
error=str(e),
error_type=type(e).__name__,
suggestions=[
"请检查服务器是否在线",
"请确认配置正确",
"请检查API Key是否有效"
]
)
async def test_plugin_with_ai(
self,
plugin: MCPPlugin,
user: User,
db_session: AsyncSession
) -> MCPTestResult:
"""
使用AI进行智能工具调用测试
Args:
plugin: 插件配置
user: 用户对象
db_session: 数据库会话
Returns:
测试结果
"""
start_time = time.time()
try:
# 1. 先进行连接测试
connection_result = await self.test_plugin_connection(plugin, user.user_id)
if not connection_result.success:
return connection_result
# 2. 使用统一门面获取工具列表
tools = await mcp_client.get_tools(user.user_id, plugin.plugin_name)
if not tools:
return MCPTestResult(
success=False,
message="插件没有提供任何工具",
error="工具列表为空",
response_time_ms=connection_result.response_time_ms,
suggestions=["请检查插件配置", "请确认MCP服务器正常运行"]
)
# 3. 获取用户的AI设置
settings_result = await db_session.execute(
select(UserSettings).where(UserSettings.user_id == user.user_id)
)
user_settings = settings_result.scalar_one_or_none()
if not user_settings or not user_settings.api_key:
# 没有AI配置,返回简单测试结果
logger.warning("用户未配置AI服务,跳过智能测试")
return MCPTestResult(
success=True,
message=f"✅ 连接测试成功(未配置AI,跳过工具调用测试)",
response_time_ms=connection_result.response_time_ms,
tools_count=len(tools),
suggestions=[
f"连接测试: 成功",
f"可用工具数: {len(tools)}",
"提示: 配置AI服务后可进行智能工具调用测试"
]
)
# 4. 使用AI选择工具并生成测试参数
logger.info(f"使用AI分析工具并生成测试计划...")
ai_service = create_user_ai_service(
api_provider=user_settings.api_provider,
api_key=user_settings.api_key,
api_base_url=user_settings.api_base_url,
model_name=user_settings.llm_model,
temperature=0.3,
max_tokens=1000
)
# 使用统一门面转换为OpenAI Function Calling格式
openai_tools = mcp_client.format_tools_for_openai(tools, plugin.plugin_name)
logger.info(f"📋 转换后的OpenAI工具数量: {len(openai_tools)}")
logger.debug(f"📋 OpenAI工具列表: {[t['function']['name'] for t in openai_tools]}")
# 调用AI选择工具(使用自定义模板系统)
prompts = await prompt_service.get_mcp_tool_test_prompts(
plugin_name=plugin.plugin_name,
user_id=user.user_id,
db=db_session
)
# 使用 generate_text 进行 Function Calling(非流式)
ai_response = await ai_service.generate_text(
prompt=prompts["user"],
system_prompt=prompts["system"],
tools=openai_tools,
tool_choice="auto"
)
accumulated_text = ai_response.get("content", "")
tool_calls = ai_response.get("tool_calls")
# 5. 检查AI是否返回工具调用
if not tool_calls:
logger.error(f"❌ AI未返回工具调用")
return MCPTestResult(
success=False,
message="❌ AI Function Calling失败",
error=f"AI未返回工具调用请求。响应: {accumulated_text[:200] if accumulated_text else 'N/A'}",
tools_count=len(tools),
suggestions=[
"请确认使用的AI模型支持Function Calling",
f"当前Provider: {user_settings.api_provider}",
f"当前模型: {user_settings.llm_model}"
]
)
# 6. 解析工具调用
tool_call = tool_calls[0]
function = tool_call["function"]
tool_name_with_prefix = function["name"]
test_arguments = function["arguments"]
if isinstance(test_arguments, str):
try:
# 使用统一的JSON清洗方法
cleaned_args = ai_service._clean_json_response(test_arguments)
test_arguments = json.loads(cleaned_args)
except json.JSONDecodeError as e:
logger.error(f"❌ 解析AI参数失败: {e}")
return MCPTestResult(
success=False,
message="❌ AI返回的参数格式错误",
error=f"无法解析参数JSON: {str(e)}",
tools_count=len(tools)
)
# 解析插件名和工具名
try:
_, tool_name = mcp_client.parse_function_name(tool_name_with_prefix)
except ValueError:
tool_name = tool_name_with_prefix
logger.info(f"🤖 AI选择的工具: {tool_name}")
logger.info(f"📝 AI生成的参数: {test_arguments}")
# 7. 使用统一门面调用MCP工具
call_start = time.time()
try:
tool_result = await mcp_client.call_tool(
user_id=user.user_id,
plugin_name=plugin.plugin_name,
tool_name=tool_name,
arguments=test_arguments
)
call_end = time.time()
call_time = round((call_end - call_start) * 1000, 2)
total_time = round((call_end - start_time) * 1000, 2)
# 格式化结果
result_str = str(tool_result)
if len(result_str) > 800:
result_preview = result_str[:800] + "\n...(结果已截断)"
else:
result_preview = result_str
return MCPTestResult(
success=True,
message=f"✅ Function Calling测试成功!工具 '{tool_name}' 调用正常",
response_time_ms=total_time,
tools_count=len(tools),
suggestions=[
f"🤖 AI选择: {tool_name}",
f"📝 参数: {json.dumps(test_arguments, ensure_ascii=False)}",
f"⏱️ 耗时: {call_time}ms",
f"📊 结果:\n{result_preview}"
]
)
except Exception as call_error:
call_end = time.time()
total_time = round((call_end - start_time) * 1000, 2)
logger.warning(f"工具调用失败: {tool_name}, 错误: {call_error}")
return MCPTestResult(
success=True, # 连接成功就算测试通过
message=f"⚠️ 连接成功,但工具调用失败",
response_time_ms=total_time,
tools_count=len(tools),
error=f"工具 '{tool_name}' 调用失败: {str(call_error)}",
suggestions=[
f"✅ 连接测试: 成功",
f"❌ 工具调用测试: 失败",
f"🤖 AI选择: {tool_name}",
f"❌ 错误: {str(call_error)}",
"💡 可能原因: API Key无效、参数错误或服务限制"
]
)
except Exception as e:
end_time = time.time()
total_time = round((end_time - start_time) * 1000, 2)
logger.error(f"测试插件失败: {plugin.plugin_name}, 错误: {e}")
return MCPTestResult(
success=False,
message="❌ 测试失败",
response_time_ms=total_time,
error=str(e),
error_type=type(e).__name__,
suggestions=[
"请检查服务器是否在线",
"请确认配置正确",
"请检查API Key是否有效"
]
)
# 全局单例
mcp_test_service = MCPTestService()