import json import subprocess import asyncio import sys import os from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from pydantic import BaseModel class MCPTools: def __init__(self, config_file="mcp_config.json"): # 从配置文件加载MCP服务配置 self.mcp_servers = self._load_config(config_file) def _load_config(self, config_file): """ 从配置文件加载MCP服务配置 Args: config_file (str): 配置文件路径 Returns: dict: MCP服务配置 """ try: # 加载配置文件 with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) return config.get("mcpServers", {}) except Exception as e: print(f"加载MCP配置文件时出错: {e}") # 返回空配置 return {} async def call_mcp_server(self, server_name, method, params=None): """ 调用指定的MCP服务器 Args: server_name (str): 服务器名称 method (str): 要调用的方法 params (dict): 方法参数 Returns: dict: MCP服务器的响应 """ if server_name not in self.mcp_servers: return {"error": f"MCP server '{server_name}' not found"} server_config = self.mcp_servers[server_name] try: # 检查命令是否存在 try: process = subprocess.Popen([server_config["command"]] + server_config["args"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=(sys.platform == 'win32')) process.terminate() # 立即终止,只是检查命令是否存在 except FileNotFoundError: return {"error": f"命令未找到: {server_config['command']}"} # 检查service_mcp脚本是否存在 (仅当它是配置中的服务时) if server_name == "service_mcp" and server_config.get("args"): script_path = server_config["args"][0] if server_config["args"] else None if script_path and not os.path.exists(script_path): return {"error": f"Service MCP脚本不存在: {script_path}"} async with stdio_client( StdioServerParameters( command=server_config["command"], args=server_config["args"], env=None ) ) as (read, write): async with ClientSession(read, write) as session: # 初始化MCP服务器 await session.initialize() # 调用指定方法 if method == "prompts/list": result = await session.list_prompts() elif method == "prompts/get" and params: result = await session.get_prompt(params["name"]) elif method == "resources/list": result = await session.list_resources() elif method == "resources/read" and params: result = await session.read_resource(params["uri"]) elif method == "tools/list": result = await session.list_tools() elif method == "tools/call" and params: result = await session.call_tool(params["name"], params.get("arguments", {})) else: # 通用方法调用 result = await session.send_request(method, params or {}) # 将结果转换为可序列化的字典 return self._serialize_result(result) except Exception as e: return {"error": f"调用MCP服务时出错: {str(e)}"} def _serialize_result(self, result): """ 将MCP结果转换为可JSON序列化的格式 """ if isinstance(result, BaseModel): return result.model_dump() elif isinstance(result, dict): return {key: self._serialize_result(value) for key, value in result.items()} elif isinstance(result, list): return [self._serialize_result(item) for item in result] else: return result def get_mcp_tool_list(self): """ 获取MCP工具列表,用于添加到AI工具中 根据配置动态生成工具列表 """ tools = [] # 为每个配置的MCP服务创建对应的工具 for server_name, server_config in self.mcp_servers.items(): tool_name = f"call_{server_name}_mcp" description = server_config.get("description", f"调用{server_name} MCP服务") tools.append({ "type": "function", "function": { "name": tool_name, "description": description, "parameters": { "type": "object", "properties": { "method": { "type": "string", "description": "要调用的MCP方法,如tools/list, resources/list等" }, "params": { "type": "object", "description": "方法参数" } }, "required": ["method"] } } }) return tools def call_mcp_tool(self, tool_name, parameters): """ 执行MCP工具调用 Args: tool_name (str): 工具名称 parameters (dict): 工具参数 Returns: str: 工具执行结果 """ try: # 从工具名称中提取服务器名称 if tool_name.startswith("call_") and tool_name.endswith("_mcp"): server_name = tool_name[5:-4] # 移除 "call_" 前缀和 "_mcp" 后缀 else: return f"未知的MCP工具格式: {tool_name}" if server_name not in self.mcp_servers: return f"未配置的MCP服务: {server_name}" # 在异步环境中运行 result = asyncio.run( self.call_mcp_server( server_name, parameters.get("method", ""), parameters.get("params") ) ) return json.dumps(result, ensure_ascii=False) except Exception as e: return f"MCP工具调用失败: {str(e)}" async def test_mcp_connection(self, server_name): """ 测试MCP服务连接 Args: server_name (str): 服务器名称 Returns: dict: 测试结果 """ if server_name not in self.mcp_servers: return {"error": f"MCP server '{server_name}' not found"} try: result = await self.call_mcp_server(server_name, "tools/list") return { "status": "success", "server": server_name, "tools_available": len(result.get("tools", [])) if isinstance(result, dict) else 0, "result": result } except Exception as e: return { "status": "error", "server": server_name, "error": str(e) } def get_available_servers(self): """ 获取所有可用的MCP服务器名称 Returns: list: 可用的MCP服务器名称列表 """ return list(self.mcp_servers.keys())