Files
未来 17e78955a9 fix: MCP插件TimeoutError修复 + 多项Bug修复和性能优化
- fix: MCP插件管理接口改为后台任务,修复TimeoutError
- fix: MCP连接失败后上下文清理的cancel scope错误
- feat: MCP插件后台注册添加重试机制
- fix: 限制每章自动创建伏笔数量上限
- fix: 修复JSON非法转义字符清洗
- fix: SSE流式生成添加心跳保活
- fix: 职业生成改用POST请求避免URL长度限制
- perf: 使用torch CPU版本加速Docker构建
- fix: 自动修复JSON字符串值中的裸换行符
- feat: 集成json5容错解析器
2026-04-26 13:58:15 +08:00

958 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MCP插件管理API
重构后使用统一的MCPClientFacade门面来管理所有MCP操作。
"""
import asyncio
from fastapi import APIRouter, HTTPException, Depends, Query, Request, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy import select, update
from typing import List, Optional
from datetime import datetime
from app.database import get_db, get_engine
from app.models.mcp_plugin import MCPPlugin
from app.schemas.mcp_plugin import (
MCPPluginCreate,
MCPPluginSimpleCreate,
MCPPluginUpdate,
MCPPluginResponse,
MCPToolCall,
MCPTestResult
)
import json
from app.user_manager import User
from app.mcp import mcp_client, MCPPluginConfig, PluginStatus
from app.services.mcp_test_service import mcp_test_service
from app.logger import get_logger
from app.security import validate_public_http_url
logger = get_logger(__name__)
router = APIRouter(prefix="/mcp/plugins", tags=["MCP插件管理"])
HTTP_PLUGIN_TYPES = {"http", "streamable_http", "sse"}
def _validate_mcp_server_url(plugin_type: str, server_url: Optional[str]) -> Optional[str]:
if plugin_type in HTTP_PLUGIN_TYPES:
if not server_url:
raise HTTPException(status_code=400, detail=f"{plugin_type}类型插件必须提供server_url")
return validate_public_http_url(server_url)
return server_url
def require_login(request: Request) -> User:
"""依赖:要求用户已登录"""
if not hasattr(request.state, "user") or not request.state.user:
raise HTTPException(status_code=401, detail="需要登录")
return request.state.user
async def _register_plugin_background(
user_id: str,
plugin_name: str,
plugin_type: str,
server_url: str,
headers: Optional[dict],
config: Optional[dict],
max_retries: int = 2,
retry_delay: float = 3.0
):
"""
后台任务:注册MCP插件并更新数据库状态(带重试)
在独立的任务中执行MCP连接,避免阻塞请求处理。
连接失败时会自动重试,提高对临时网络问题的容错性。
"""
last_error = None
for attempt in range(max_retries + 1):
try:
if attempt > 0:
logger.info(f"后台注册MCP插件重试 ({attempt}/{max_retries}): {plugin_name}")
await asyncio.sleep(retry_delay)
else:
logger.info(f"后台注册MCP插件: {plugin_name}")
if plugin_type in HTTP_PLUGIN_TYPES and server_url:
server_url = _validate_mcp_server_url(plugin_type, server_url)
success = await mcp_client.register(MCPPluginConfig(
user_id=user_id,
plugin_name=plugin_name,
url=server_url,
plugin_type=plugin_type,
headers=headers,
timeout=config.get('timeout', 60.0) if config else 60.0
))
else:
success = False
if success:
# 更新数据库状态为active
engine = await get_engine(user_id)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with AsyncSessionLocal() as db:
stmt = (
update(MCPPlugin)
.where(MCPPlugin.user_id == user_id, MCPPlugin.plugin_name == plugin_name)
.values(status="active", last_error=None)
)
await db.execute(stmt)
await db.commit()
logger.info(f"后台注册MCP插件成功: {plugin_name}")
return
else:
last_error = "连接失败"
except Exception as e:
last_error = str(e)
logger.warning(f"后台注册MCP插件异常 (尝试 {attempt + 1}/{max_retries + 1}): {plugin_name}, 错误: {e}")
# 所有重试都失败,更新数据库状态为error
logger.error(f"后台注册MCP插件最终失败 (已重试{max_retries}次): {plugin_name}, 错误: {last_error}")
try:
engine = await get_engine(user_id)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with AsyncSessionLocal() as db:
stmt = (
update(MCPPlugin)
.where(MCPPlugin.user_id == user_id, MCPPlugin.plugin_name == plugin_name)
.values(status="error", last_error=str(last_error)[:500] if last_error else "连接失败")
)
await db.execute(stmt)
await db.commit()
except Exception as db_error:
logger.error(f"更新插件状态失败: {db_error}")
async def _unregister_plugin_safe(user_id: str, plugin_name: str):
"""安全地在后台注销MCP插件"""
try:
await mcp_client.unregister(user_id, plugin_name)
logger.info(f"后台注销MCP插件成功: {plugin_name}")
except Exception as e:
logger.warning(f"后台注销MCP插件出错: {plugin_name}, 错误: {e}")
async def _register_plugin_to_facade(plugin: MCPPlugin, user_id: str) -> bool:
"""
将插件注册到统一门面
Args:
plugin: 插件对象
user_id: 用户ID
Returns:
是否注册成功
"""
if plugin.plugin_type in HTTP_PLUGIN_TYPES and plugin.server_url:
server_url = _validate_mcp_server_url(plugin.plugin_type, plugin.server_url)
return await mcp_client.register(MCPPluginConfig(
user_id=user_id,
plugin_name=plugin.plugin_name,
url=server_url,
plugin_type=plugin.plugin_type,
headers=plugin.headers,
timeout=plugin.config.get('timeout', 60.0) if plugin.config else 60.0
))
else:
logger.warning(f"暂不支持的插件类型: {plugin.plugin_type}")
return False
@router.get("", response_model=List[MCPPluginResponse])
async def list_plugins(
enabled_only: bool = Query(False, description="只返回启用的插件"),
category: Optional[str] = Query(None, description="按分类筛选"),
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
获取用户的所有MCP插件
"""
query = select(MCPPlugin).where(MCPPlugin.user_id == user.user_id)
if enabled_only:
query = query.where(MCPPlugin.enabled == True)
if category:
query = query.where(MCPPlugin.category == category)
query = query.order_by(MCPPlugin.sort_order, MCPPlugin.created_at)
result = await db.execute(query)
plugins = result.scalars().all()
logger.info(f"用户 {user.user_id} 查询插件列表,共 {len(plugins)}")
return plugins
@router.post("", response_model=MCPPluginResponse)
async def create_plugin(
data: MCPPluginCreate,
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
创建新的MCP插件
"""
# 检查插件名是否已存在
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.user_id == user.user_id,
MCPPlugin.plugin_name == data.plugin_name
)
)
existing = result.scalar_one_or_none()
if existing:
raise HTTPException(status_code=400, detail=f"插件名已存在: {data.plugin_name}")
# 创建插件数据
plugin_data = data.model_dump()
plugin_data["server_url"] = _validate_mcp_server_url(
plugin_data.get("plugin_type", "http"),
plugin_data.get("server_url")
)
# 如果没有提供display_name,使用plugin_name作为默认值
if not plugin_data.get("display_name"):
plugin_data["display_name"] = plugin_data["plugin_name"]
# 创建插件
plugin = MCPPlugin(
user_id=user.user_id,
**plugin_data
)
# 如果启用,设为pending状态等待后台连接
if plugin.enabled:
plugin.status = "pending"
db.add(plugin)
await db.commit()
await db.refresh(plugin)
# 如果启用,后台注册到统一门面(避免MCP操作阻塞导致超时)
if plugin.enabled:
asyncio.create_task(_register_plugin_background(
user_id=user.user_id,
plugin_name=plugin.plugin_name,
plugin_type=plugin.plugin_type,
server_url=plugin.server_url,
headers=plugin.headers,
config=plugin.config
))
logger.info(f"用户 {user.user_id} 创建插件: {plugin.plugin_name}MCP注册在后台执行)")
return plugin
@router.post("/simple", response_model=MCPPluginResponse)
async def create_plugin_simple(
data: MCPPluginSimpleCreate,
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
通过标准MCP配置JSON创建或更新插件(简化版)
接受格式:
{
"config_json": '{"mcpServers": {"exa": {"type": "http", "url": "...", "headers": {}}}}',
"category": "search"
}
自动从mcpServers中提取插件名称(取第一个键)
如果插件已存在,则更新;否则创建新插件
"""
try:
# 解析配置JSON
config = json.loads(data.config_json)
# 验证格式
if "mcpServers" not in config:
raise HTTPException(status_code=400, detail="配置JSON必须包含mcpServers字段")
servers = config["mcpServers"]
if not servers or len(servers) == 0:
raise HTTPException(status_code=400, detail="mcpServers不能为空")
# 自动提取第一个插件名称
plugin_name = list(servers.keys())[0]
server_config = servers[plugin_name]
logger.info(f"从配置中提取插件名称: {plugin_name}")
# 提取配置
server_type = server_config.get("type", "http")
if server_type not in ["http", "stdio", "streamable_http", "sse"]:
raise HTTPException(status_code=400, detail=f"不支持的服务器类型: {server_type}")
# 检查插件名是否已存在
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.user_id == user.user_id,
MCPPlugin.plugin_name == plugin_name
)
)
existing = result.scalar_one_or_none()
# 构建插件数据
plugin_data = {
"plugin_name": plugin_name,
"display_name": plugin_name,
"plugin_type": server_type,
"enabled": data.enabled,
"category": data.category,
"sort_order": 0
}
if server_type in HTTP_PLUGIN_TYPES:
plugin_data["server_url"] = _validate_mcp_server_url(server_type, server_config.get("url"))
plugin_data["headers"] = server_config.get("headers", {})
elif server_type == "stdio":
plugin_data["command"] = server_config.get("command")
plugin_data["args"] = server_config.get("args", [])
plugin_data["env"] = server_config.get("env", {})
if not plugin_data["command"]:
raise HTTPException(status_code=400, detail="Stdio类型插件必须提供command字段")
if existing:
# 更新现有插件
logger.info(f"插件 {plugin_name} 已存在,执行更新操作")
# 保存旧状态
old_enabled = existing.enabled
old_plugin_name = existing.plugin_name
# 更新字段
for key, value in plugin_data.items():
setattr(existing, key, value)
# 设置为pending状态,等待后台连接
if plugin_data.get("enabled"):
existing.status = "pending"
plugin = existing
await db.commit()
await db.refresh(plugin)
# 后台执行MCP操作(不阻塞请求)
if old_enabled:
# 注销旧插件(使用create_task在后台执行)
asyncio.create_task(_unregister_plugin_safe(user.user_id, old_plugin_name))
if plugin.enabled:
# 后台注册新插件
asyncio.create_task(_register_plugin_background(
user_id=user.user_id,
plugin_name=plugin.plugin_name,
plugin_type=plugin.plugin_type,
server_url=plugin.server_url,
headers=plugin.headers,
config=plugin.config
))
logger.info(f"用户 {user.user_id} 更新插件: {plugin_name}")
else:
# 创建新插件
plugin = MCPPlugin(
user_id=user.user_id,
**plugin_data
)
# 设置为pending状态,等待后台连接
if plugin_data.get("enabled"):
plugin.status = "pending"
db.add(plugin)
await db.commit()
await db.refresh(plugin)
# 后台执行MCP注册(不阻塞请求)
if plugin.enabled:
asyncio.create_task(_register_plugin_background(
user_id=user.user_id,
plugin_name=plugin.plugin_name,
plugin_type=plugin.plugin_type,
server_url=plugin.server_url,
headers=plugin.headers,
config=plugin.config
))
logger.info(f"用户 {user.user_id} 通过简化配置创建插件: {plugin_name}")
return plugin
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"配置JSON格式错误: {str(e)}")
except HTTPException:
raise
except Exception as e:
logger.error(f"创建插件失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"创建插件失败: {str(e)}")
@router.get("/{plugin_id}", response_model=MCPPluginResponse)
async def get_plugin(
plugin_id: str,
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
获取插件详情
"""
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.id == plugin_id,
MCPPlugin.user_id == user.user_id
)
)
plugin = result.scalar_one_or_none()
if not plugin:
raise HTTPException(status_code=404, detail="插件不存在")
return plugin
@router.put("/{plugin_id}", response_model=MCPPluginResponse)
async def update_plugin(
plugin_id: str,
data: MCPPluginUpdate,
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
更新插件配置
"""
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.id == plugin_id,
MCPPlugin.user_id == user.user_id
)
)
plugin = result.scalar_one_or_none()
if not plugin:
raise HTTPException(status_code=404, detail="插件不存在")
# 更新字段
update_data = data.model_dump(exclude_unset=True)
target_type = update_data.get("plugin_type", plugin.plugin_type)
if "server_url" in update_data or target_type in HTTP_PLUGIN_TYPES:
update_data["server_url"] = _validate_mcp_server_url(
target_type,
update_data.get("server_url", plugin.server_url)
)
for key, value in update_data.items():
setattr(plugin, key, value)
# 如果启用,设为pending状态等待后台连接
if plugin.enabled:
plugin.status = "pending"
plugin.last_error = None
await db.commit()
await db.refresh(plugin)
# 如果插件已启用,后台重新注册MCP连接
if plugin.enabled:
# 先后台注销旧连接
asyncio.create_task(_unregister_plugin_safe(user.user_id, plugin.plugin_name))
# 再后台注册新连接
asyncio.create_task(_register_plugin_background(
user_id=user.user_id,
plugin_name=plugin.plugin_name,
plugin_type=plugin.plugin_type,
server_url=plugin.server_url,
headers=plugin.headers,
config=plugin.config
))
logger.info(f"用户 {user.user_id} 更新插件: {plugin.plugin_name}MCP操作在后台执行)")
return plugin
@router.delete("/{plugin_id}")
async def delete_plugin(
plugin_id: str,
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
删除插件
"""
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.id == plugin_id,
MCPPlugin.user_id == user.user_id
)
)
plugin = result.scalar_one_or_none()
if not plugin:
raise HTTPException(status_code=404, detail="插件不存在")
# 保存插件信息用于后台注销
plugin_name = plugin.plugin_name
user_id = user.user_id
# 先删除数据库记录
await db.delete(plugin)
await db.commit()
# 后台从统一门面注销(避免MCP操作阻塞导致超时)
asyncio.create_task(_unregister_plugin_safe(user_id, plugin_name))
logger.info(f"用户 {user.user_id} 删除插件: {plugin_name}MCP注销在后台执行)")
return {"message": "插件已删除", "plugin_name": plugin_name}
@router.post("/{plugin_id}/toggle", response_model=MCPPluginResponse)
async def toggle_plugin(
plugin_id: str,
enabled: bool = Query(..., description="启用或禁用"),
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
启用或禁用插件
启用时:先更新数据库状态为pending,再通过后台任务注册MCP连接,
避免长时间持有数据库会话导致超时。
禁用时:先更新数据库状态,再通过后台任务注销MCP连接。
"""
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.id == plugin_id,
MCPPlugin.user_id == user.user_id
)
)
plugin = result.scalar_one_or_none()
if not plugin:
raise HTTPException(status_code=404, detail="插件不存在")
# 保存插件信息用于后续MCP操作
plugin_name = plugin.plugin_name
plugin_type = plugin.plugin_type
server_url = plugin.server_url
headers = plugin.headers
config = plugin.config
# 更新数据库状态
plugin.enabled = enabled
if enabled:
# 启用时先设为pending状态,等待后台MCP连接完成
plugin.status = "pending"
plugin.last_error = None
else:
plugin.status = "inactive"
await db.commit()
await db.refresh(plugin)
# 数据库操作完成后,通过后台任务进行MCP操作(避免长时间持有数据库会话)
if enabled:
# 启用:后台注册到统一门面
asyncio.create_task(_register_plugin_background(
user_id=user.user_id,
plugin_name=plugin_name,
plugin_type=plugin_type,
server_url=server_url,
headers=headers,
config=config
))
else:
# 禁用:后台从统一门面注销(不影响数据库状态)
asyncio.create_task(_unregister_plugin_safe(user.user_id, plugin_name))
action = "启用" if enabled else "禁用"
logger.info(f"用户 {user.user_id} {action}插件: {plugin_name}MCP操作在后台执行)")
return plugin
@router.post("/{plugin_id}/test", response_model=MCPTestResult)
async def test_plugin(
plugin_id: str,
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
测试插件连接并调用工具验证功能
使用MCPTestService进行测试。
如果插件会话尚未建立,会先在后台注册,需要再次调用测试。
"""
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.id == plugin_id,
MCPPlugin.user_id == user.user_id
)
)
plugin = result.scalar_one_or_none()
if not plugin:
raise HTTPException(status_code=404, detail="插件不存在")
if not plugin.enabled:
return MCPTestResult(
success=False,
message="插件未启用",
error="请先启用插件",
suggestions=["点击开关按钮启用插件"]
)
# 检查会话是否已注册
is_registered = mcp_client.is_registered(user.user_id, plugin.plugin_name)
session_status = mcp_client.get_session_status(user.user_id, plugin.plugin_name)
if not is_registered:
# 会话不存在或状态异常,需要在后台注册
logger.info(f"插件 {plugin.plugin_name} 会话不存在(状态: {session_status}),启动后台注册")
# 更新数据库状态为pending
plugin.status = "pending"
plugin.last_error = None
await db.commit()
# 在后台注册插件
asyncio.create_task(_register_plugin_background(
user_id=user.user_id,
plugin_name=plugin.plugin_name,
plugin_type=plugin.plugin_type,
server_url=plugin.server_url,
headers=plugin.headers,
config=plugin.config
))
return MCPTestResult(
success=False,
message="正在建立连接...",
error="插件会话正在初始化,请稍后重试",
suggestions=[
"插件正在连接MCP服务器",
"请等待2-3秒后再次点击测试",
"如果持续失败,请检查服务器地址是否正确"
]
)
# 会话已存在,直接执行测试
try:
test_result = await mcp_test_service.test_plugin_with_ai(plugin, user, db)
# 更新插件状态
if test_result.success:
plugin.status = "active"
plugin.last_error = None
else:
plugin.status = "error"
plugin.last_error = test_result.error
plugin.last_test_at = datetime.now()
await db.commit()
return test_result
except Exception as e:
logger.error(f"测试插件失败: {plugin.plugin_name}, 错误: {e}")
plugin.status = "error"
plugin.last_error = str(e)
plugin.last_test_at = datetime.now()
await db.commit()
raise HTTPException(status_code=500, detail=f"测试失败: {str(e)}")
async def _ensure_plugin_registered(
plugin: MCPPlugin,
user_id: str
) -> bool:
"""
确保插件已注册到统一门面
Args:
plugin: 插件对象
user_id: 用户ID
Returns:
是否成功
Raises:
HTTPException: 注册失败
"""
try:
# 使用ensure_registered方法,它会检查是否已注册
if plugin.plugin_type in HTTP_PLUGIN_TYPES and plugin.server_url:
server_url = _validate_mcp_server_url(plugin.plugin_type, plugin.server_url)
return await mcp_client.ensure_registered(
user_id=user_id,
plugin_name=plugin.plugin_name,
url=server_url,
plugin_type=plugin.plugin_type,
headers=plugin.headers
)
return False
except ValueError as e:
logger.info(f"插件 {plugin.plugin_name} 未注册,自动注册中...")
success = await _register_plugin_to_facade(plugin, user_id)
if not success:
raise HTTPException(
status_code=500,
detail=f"插件注册失败: {plugin.plugin_name}"
)
return True
@router.get("/{plugin_id}/status")
async def get_plugin_status(
plugin_id: str,
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""获取插件的实时状态(包括内存中的会话状态)"""
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.id == plugin_id,
MCPPlugin.user_id == user.user_id
)
)
plugin = result.scalar_one_or_none()
if not plugin:
raise HTTPException(status_code=404, detail="插件不存在")
session_stats = mcp_client.get_session_stats()
session_key = f"{user.user_id}:{plugin.plugin_name}"
session_info = next((s for s in session_stats.get("sessions", []) if s["key"] == session_key), None)
return {
"plugin_id": plugin_id,
"plugin_name": plugin.plugin_name,
"db_status": plugin.status,
"session_status": session_info["status"] if session_info else None,
"is_registered": session_info is not None,
"error_rate": session_info["error_rate"] if session_info else 0,
"in_sync": (plugin.status == session_info["status"]) if session_info else (plugin.status == "inactive"),
"timestamp": datetime.now().isoformat()
}
@router.get("/metrics")
async def get_metrics(
tool_name: Optional[str] = Query(None, description="工具名称(可选,获取特定工具的指标)"),
user: User = Depends(require_login)
):
"""
获取MCP工具调用指标
Query参数:
- tool_name: 可选,指定工具名称获取特定工具的指标
Returns:
工具调用指标字典,包含:
- total_calls: 总调用次数
- success_calls: 成功调用次数
- failed_calls: 失败调用次数
- success_rate: 成功率
- avg_duration_ms: 平均耗时(毫秒)
- last_call_time: 最后调用时间
"""
# 使用统一门面获取指标
metrics = mcp_client.get_metrics(tool_name)
return {
"metrics": metrics,
"tool_name": tool_name,
"timestamp": datetime.now().isoformat()
}
@router.get("/cache/stats")
async def get_cache_stats(
user: User = Depends(require_login)
):
"""
获取工具缓存统计信息
Returns:
缓存统计信息,包含:
- total_entries: 缓存条目总数
- total_hits: 缓存总命中次数
- cache_ttl_minutes: 缓存TTL(分钟)
- entries: 各缓存条目详情
"""
# 使用统一门面获取缓存统计
stats = mcp_client.get_cache_stats()
return {
"cache_stats": stats,
"timestamp": datetime.now().isoformat()
}
@router.get("/sessions/stats")
async def get_session_stats(
user: User = Depends(require_login)
):
"""
获取MCP会话统计信息
Returns:
会话统计信息,包含:
- total_sessions: 会话总数
- sessions: 各会话详情
"""
# 使用统一门面获取会话统计
stats = mcp_client.get_session_stats()
return {
"session_stats": stats,
"timestamp": datetime.now().isoformat()
}
@router.post("/cache/clear")
async def clear_cache(
user_id: Optional[str] = Query(None, description="用户ID(可选)"),
plugin_name: Optional[str] = Query(None, description="插件名称(可选)"),
user: User = Depends(require_login)
):
"""
清理工具缓存
Query参数:
- user_id: 可选,清理特定用户的缓存
- plugin_name: 可选,清理特定插件的缓存
说明:
- 不提供任何参数:清理所有缓存
- 只提供user_id:清理该用户的所有缓存
- 提供user_id和plugin_name:清理特定插件的缓存
"""
# 非管理员只能清理自己的缓存
if user_id and user_id != user.user_id:
raise HTTPException(status_code=403, detail="无权清理其他用户的缓存")
# 如果没有指定user_id,使用当前用户
target_user_id = user_id or user.user_id
# 使用统一门面清理缓存
mcp_client.clear_cache(target_user_id, plugin_name)
message = "已清理"
if plugin_name:
message += f"插件 {plugin_name} 的缓存"
elif target_user_id:
message += f"用户 {target_user_id} 的所有缓存"
else:
message += "所有缓存"
logger.info(f"用户 {user.user_id} {message}")
return {
"success": True,
"message": message,
"timestamp": datetime.now().isoformat()
}
@router.get("/{plugin_id}/tools")
async def get_plugin_tools(
plugin_id: str,
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
获取插件提供的工具列表
"""
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.id == plugin_id,
MCPPlugin.user_id == user.user_id
)
)
plugin = result.scalar_one_or_none()
if not plugin:
raise HTTPException(status_code=404, detail="插件不存在")
if not plugin.enabled:
raise HTTPException(status_code=400, detail="插件未启用")
try:
# 确保插件已注册
await _ensure_plugin_registered(plugin, user.user_id)
# 使用统一门面获取工具列表
tools = await mcp_client.get_tools(user.user_id, plugin.plugin_name)
# 更新数据库中的工具缓存
plugin.tools = tools
await db.commit()
return {
"plugin_name": plugin.plugin_name,
"tools": tools,
"count": len(tools)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取工具列表失败: {plugin.plugin_name}, 错误: {e}")
raise HTTPException(status_code=500, detail=f"获取工具列表失败: {str(e)}")
@router.post("/call")
async def call_mcp_tool(
data: MCPToolCall,
user: User = Depends(require_login),
db: AsyncSession = Depends(get_db)
):
"""
调用MCP工具
"""
# 获取插件
result = await db.execute(
select(MCPPlugin).where(
MCPPlugin.id == data.plugin_id,
MCPPlugin.user_id == user.user_id
)
)
plugin = result.scalar_one_or_none()
if not plugin:
raise HTTPException(status_code=404, detail="插件不存在")
if not plugin.enabled:
raise HTTPException(status_code=400, detail="插件未启用")
try:
# 确保插件已注册
await _ensure_plugin_registered(plugin, user.user_id)
# 使用统一门面调用工具
tool_result = await mcp_client.call_tool(
user_id=user.user_id,
plugin_name=plugin.plugin_name,
tool_name=data.tool_name,
arguments=data.arguments
)
return {
"success": True,
"plugin_name": plugin.plugin_name,
"tool_name": data.tool_name,
"result": tool_result
}
except HTTPException:
raise
except Exception as e:
logger.error(f"调用工具失败: {plugin.plugin_name}.{data.tool_name}, 错误: {e}")
raise HTTPException(status_code=500, detail=f"工具调用失败: {str(e)}")