fix: 修复MCP插件创建和测试时的500错误

问题:MCP SDK使用anyio TaskGroup,与FastAPI请求上下文不兼容,
导致在请求中直接await MCP操作时报RuntimeError: No response returned

解决方案:
- 将MCP连接操作放到后台任务执行,避免阻塞请求
- 添加is_registered()和get_session_status()同步检查方法
- 测试时先检查会话是否存在,不存在则触发后台注册
- 改进ExceptionGroup错误处理,显示详细错误信息
- 状态同步改用异步队列,避免阻塞

修改文件:
- backend/app/api/mcp_plugins.py: 重写test_plugin和create_plugin_simple
- backend/app/mcp/facade.py: 添加同步检查方法和改进错误处理
- backend/app/mcp/status_sync.py: 使用异步队列同步状态
- backend/app/services/mcp_test_service.py: 使用同步检查代替异步ensure
This commit is contained in:
snemc
2026-01-24 10:03:59 +08:00
parent 165a02ea75
commit 980cc5b0e5
4 changed files with 282 additions and 95 deletions
+47 -7
View File
@@ -307,18 +307,18 @@ class MCPClientFacade:
是否注册成功
"""
self._ensure_background_tasks()
key = self._get_key(config.user_id, config.plugin_name)
user_lock = await self._get_user_lock(config.user_id)
async with user_lock:
# 如果已存在,先关闭
if key in self._sessions:
await self._close_session_unsafe(key)
try:
logger.info(f"🔗 连接MCP服务器: {config.plugin_name} -> {config.url} (类型: {config.plugin_type})")
# 根据类型选择客户端
if config.plugin_type == "sse":
# SSE 客户端 - 返回 2 个值
@@ -357,9 +357,19 @@ class MCPClientFacade:
logger.info(f"✅ MCP会话建立成功: {key}")
await self._emit_status_change(config.user_id, config.plugin_name, "inactive", "active", "连接成功")
return True
except ExceptionGroup as eg:
# 处理 TaskGroup 的异常组,提取详细错误信息
error_details = []
for exc in eg.exceptions:
error_details.append(f"{type(exc).__name__}: {exc}")
error_msg = "; ".join(error_details)
logger.error(f"❌ MCP连接失败 {key}: TaskGroup异常 - {error_msg}")
await self._emit_status_change(config.user_id, config.plugin_name, "inactive", "error", error_msg)
return False
except Exception as e:
logger.error(f"❌ MCP连接失败 {key}: {e}")
logger.error(f"❌ MCP连接失败 {key}: {type(e).__name__}: {e}")
await self._emit_status_change(config.user_id, config.plugin_name, "inactive", "error", str(e))
return False
@@ -428,7 +438,37 @@ class MCPClientFacade:
info.last_access = time.time()
info.request_count += 1
return info.session
def is_registered(self, user_id: str, plugin_name: str) -> bool:
"""
检查插件是否已注册(同步方法,仅检查内存状态)
Args:
user_id: 用户ID
plugin_name: 插件名称
Returns:
是否已注册且状态正常
"""
key = self._get_key(user_id, plugin_name)
info = self._sessions.get(key)
return info is not None and info.status != "error"
def get_session_status(self, user_id: str, plugin_name: str) -> Optional[str]:
"""
获取会话状态(同步方法)
Args:
user_id: 用户ID
plugin_name: 插件名称
Returns:
会话状态,如果不存在返回 None
"""
key = self._get_key(user_id, plugin_name)
info = self._sessions.get(key)
return info.status if info else None
async def ensure_registered(
self,
user_id: str,
+54 -10
View File
@@ -3,6 +3,7 @@
将内存中的会话状态变更同步到数据库,确保状态一致性。
"""
import asyncio
from typing import Dict, Any
from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
@@ -12,22 +13,42 @@ from app.logger import get_logger
logger = get_logger(__name__)
# 状态同步队列
_sync_queue: asyncio.Queue = None
_sync_task: asyncio.Task = None
async def sync_status_to_db(event: Dict[str, Any]):
"""
状态变更回调 - 同步到数据库
"""
async def _sync_worker():
"""后台状态同步工作线程"""
global _sync_queue
while True:
try:
event = await _sync_queue.get()
if event is None: # 停止信号
break
await _do_sync_status(event)
_sync_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"状态同步工作线程异常: {e}")
async def _do_sync_status(event: Dict[str, Any]):
"""实际执行状态同步"""
user_id = event["user_id"]
plugin_name = event["plugin_name"]
new_status = event["new_status"]
reason = event.get("reason", "")
try:
from app.database import get_engine
engine = await get_engine(user_id)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with AsyncSessionLocal() as db:
stmt = (
update(MCPPlugin)
@@ -36,15 +57,38 @@ async def sync_status_to_db(event: Dict[str, Any]):
)
await db.execute(stmt)
await db.commit()
logger.debug(f"✅ 状态已同步到数据库: {plugin_name} -> {new_status}")
except Exception as e:
logger.error(f"❌ 状态同步失败: {plugin_name}, 错误: {e}")
async def sync_status_to_db(event: Dict[str, Any]):
"""
状态变更回调 - 将事件加入队列异步同步到数据库
使用队列异步处理,避免在请求处理过程中阻塞或产生数据库连接冲突
"""
global _sync_queue, _sync_task
# 延迟初始化队列和工作线程
if _sync_queue is None:
_sync_queue = asyncio.Queue()
if _sync_task is None or _sync_task.done():
_sync_task = asyncio.create_task(_sync_worker())
logger.info("✅ MCP状态同步工作线程已启动")
# 将事件加入队列(非阻塞)
try:
_sync_queue.put_nowait(event)
except asyncio.QueueFull:
logger.warning(f"状态同步队列已满,丢弃事件: {event['plugin_name']}")
def register_status_sync():
"""注册状态同步回调到MCP客户端"""
from app.mcp import mcp_client
mcp_client.register_status_callback(sync_status_to_db)
logger.info("✅ MCP状态同步服务已注册")
logger.info("✅ MCP状态同步服务已注册")