123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- import json
- import subprocess
- import asyncio
- import sys
- import os
- from mcp import ClientSession, StdioServerParameters
- from mcp.client.stdio import stdio_client
- from pydantic import BaseModel
- class MCPTools:
- def __init__(self, config_file="mcp_config.json"):
- # 从配置文件加载MCP服务配置
- self.mcp_servers = self._load_config(config_file)
- # 缓存每个服务的工具列表
- self._tools_cache = {}
- def _load_config(self, config_file):
- """
- 从配置文件加载MCP服务配置
-
- Args:
- config_file (str): 配置文件路径
-
- Returns:
- dict: MCP服务配置
- """
- try:
- # 加载配置文件
- with open(config_file, 'r', encoding='utf-8') as f:
- config = json.load(f)
- return config.get("mcpServers", {})
- except Exception as e:
- print(f"加载MCP配置文件时出错: {e}")
- # 返回空配置
- return {}
- async def call_mcp_server(self, server_name, method, params=None):
- """
- 调用指定的MCP服务器
-
- Args:
- server_name (str): 服务器名称
- method (str): 要调用的方法
- params (dict): 方法参数
-
- Returns:
- dict: MCP服务器的响应
- """
- if server_name not in self.mcp_servers:
- return {"error": f"MCP server '{server_name}' not found"}
- server_config = self.mcp_servers[server_name]
-
- try:
- # 检查命令是否存在
- try:
- process = subprocess.Popen([server_config["command"]] + server_config["args"],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- shell=(sys.platform == 'win32'))
- process.terminate() # 立即终止,只是检查命令是否存在
- except FileNotFoundError:
- return {"error": f"命令未找到: {server_config['command']}"}
-
- # 检查service_mcp脚本是否存在 (仅当它是配置中的服务时)
- if server_name == "service_mcp" and server_config.get("args"):
- script_path = server_config["args"][0] if server_config["args"] else None
- if script_path and not os.path.exists(script_path):
- return {"error": f"Service MCP脚本不存在: {script_path}"}
-
- async with stdio_client(
- StdioServerParameters(
- command=server_config["command"],
- args=server_config["args"],
- env=None
- )
- ) as (read, write):
- async with ClientSession(read, write) as session:
- # 初始化MCP服务器
- await session.initialize()
-
- # 调用指定方法
- if method == "prompts/list":
- result = await session.list_prompts()
- elif method == "prompts/get" and params:
- result = await session.get_prompt(params["name"])
- elif method == "resources/list":
- result = await session.list_resources()
- elif method == "resources/read" and params:
- result = await session.read_resource(params["uri"])
- elif method == "tools/list":
- result = await session.list_tools()
- elif method == "tools/call" and params:
- result = await session.call_tool(params["name"], params.get("arguments", {}))
- else:
- # 通用方法调用
- result = await session.send_request(method, params or {})
-
- # 将结果转换为可序列化的字典
- return self._serialize_result(result)
- except Exception as e:
- return {"error": f"调用MCP服务时出错: {str(e)}"}
- def _serialize_result(self, result):
- """
- 将MCP结果转换为可JSON序列化的格式
- """
- if isinstance(result, BaseModel):
- return result.model_dump()
- elif isinstance(result, dict):
- return {key: self._serialize_result(value) for key, value in result.items()}
- elif isinstance(result, list):
- return [self._serialize_result(item) for item in result]
- else:
- return result
- def get_mcp_tool_list(self):
- """
- 获取MCP工具列表,用于添加到AI工具中
- 根据配置动态生成工具列表
- """
- tools = []
-
- # 为每个配置的MCP服务创建对应的工具
- for server_name, server_config in self.mcp_servers.items():
- tool_name = f"call_{server_name}_mcp"
- description = server_config.get("description", f"调用{server_name} MCP服务")
-
- tools.append({
- "type": "function",
- "function": {
- "name": tool_name,
- "description": description,
- "parameters": {
- "type": "object",
- "properties": {
- "method": {
- "type": "string",
- "description": "要调用的MCP方法,如tools/list, tools/call等"
- },
- "params": {
- "type": "object",
- "description": "方法参数"
- }
- },
- "required": ["method"]
- }
- }
- })
-
- return tools
- async def get_actual_mcp_tools(self, server_name):
- """
- 获取指定MCP服务的实际工具列表
-
- Args:
- server_name (str): 服务器名称
-
- Returns:
- list: 实际的工具列表
- """
- if server_name not in self.mcp_servers:
- return []
-
- try:
- result = await self.call_mcp_server(server_name, "tools/list")
- if isinstance(result, dict) and "tools" in result:
- return result["tools"]
- return []
- except Exception as e:
- print(f"获取 {server_name} 工具列表时出错: {e}")
- return []
- def get_all_mcp_tools_sync(self):
- """
- 同步获取所有MCP服务的实际工具列表
-
- Returns:
- list: 所有MCP服务的工具列表
- """
- all_tools = []
- for server_name in self.mcp_servers.keys():
- try:
- tools = asyncio.run(self.get_actual_mcp_tools(server_name))
- # 缓存工具列表及对应的服务器名称
- self._tools_cache[server_name] = tools
- all_tools.extend(tools)
- except Exception as e:
- print(f"获取 {server_name} 工具列表时出错: {e}")
- return all_tools
- def get_server_for_tool(self, tool_name):
- """
- 根据工具名称获取对应的服务器名称
-
- Args:
- tool_name (str): 工具名称
-
- Returns:
- str or None: 服务器名称或None
- """
- # 遍历缓存的工具列表查找对应的服务器
- for server_name, tools in self._tools_cache.items():
- for tool in tools:
- if tool.get("name") == tool_name:
- return server_name
- return None
- def call_mcp_tool(self, tool_name, parameters):
- """
- 执行MCP工具调用
-
- Args:
- tool_name (str): 工具名称
- parameters (dict): 工具参数
-
- Returns:
- str: 工具执行结果
- """
- try:
- # 从工具名称中提取服务器名称
- if tool_name.startswith("call_") and tool_name.endswith("_mcp"):
- server_name = tool_name[5:-4] # 移除 "call_" 前缀和 "_mcp" 后缀
- else:
- return f"未知的MCP工具格式: {tool_name}"
- if server_name not in self.mcp_servers:
- return f"未配置的MCP服务: {server_name}"
- # 在异步环境中运行
- result = asyncio.run(
- self.call_mcp_server(
- server_name,
- parameters.get("method", ""),
- parameters.get("params")
- )
- )
-
- return json.dumps(result, ensure_ascii=False)
- except Exception as e:
- return f"MCP工具调用失败: {str(e)}"
- async def test_mcp_connection(self, server_name):
- """
- 测试MCP服务连接
-
- Args:
- server_name (str): 服务器名称
-
- Returns:
- dict: 测试结果
- """
- if server_name not in self.mcp_servers:
- return {"error": f"MCP server '{server_name}' not found"}
-
- try:
- result = await self.call_mcp_server(server_name, "tools/list")
- # 缓存工具列表
- if isinstance(result, dict) and "tools" in result:
- self._tools_cache[server_name] = result["tools"]
-
- return {
- "status": "success",
- "server": server_name,
- "tools_available": len(result.get("tools", [])) if isinstance(result, dict) else 0,
- "result": result
- }
- except Exception as e:
- return {
- "status": "error",
- "server": server_name,
- "error": str(e)
- }
-
- def get_available_servers(self):
- """
- 获取所有可用的MCP服务器名称
-
- Returns:
- list: 可用的MCP服务器名称列表
- """
- return list(self.mcp_servers.keys())
|