mcp_tools.py 8.0 KB


  1. import json
  2. import subprocess
  3. import asyncio
  4. import sys
  5. import os
  6. from mcp import ClientSession, StdioServerParameters
  7. from mcp.client.stdio import stdio_client
  8. from pydantic import BaseModel
  9. class MCPTools:
  10. def __init__(self, config_file="mcp_config.json"):
  11. # 从配置文件加载MCP服务配置
  12. self.mcp_servers = self._load_config(config_file)
  13. def _load_config(self, config_file):
  14. """
  15. 从配置文件加载MCP服务配置
  16. Args:
  17. config_file (str): 配置文件路径
  18. Returns:
  19. dict: MCP服务配置
  20. """
  21. try:
  22. # 加载配置文件
  23. with open(config_file, 'r', encoding='utf-8') as f:
  24. config = json.load(f)
  25. return config.get("mcpServers", {})
  26. except Exception as e:
  27. print(f"加载MCP配置文件时出错: {e}")
  28. # 返回空配置
  29. return {}
  30. async def call_mcp_server(self, server_name, method, params=None):
  31. """
  32. 调用指定的MCP服务器
  33. Args:
  34. server_name (str): 服务器名称
  35. method (str): 要调用的方法
  36. params (dict): 方法参数
  37. Returns:
  38. dict: MCP服务器的响应
  39. """
  40. if server_name not in self.mcp_servers:
  41. return {"error": f"MCP server '{server_name}' not found"}
  42. server_config = self.mcp_servers[server_name]
  43. try:
  44. # 检查命令是否存在
  45. try:
  46. process = subprocess.Popen([server_config["command"]] + server_config["args"],
  47. stdout=subprocess.PIPE,
  48. stderr=subprocess.PIPE,
  49. shell=(sys.platform == 'win32'))
  50. process.terminate() # 立即终止,只是检查命令是否存在
  51. except FileNotFoundError:
  52. return {"error": f"命令未找到: {server_config['command']}"}
  53. # 检查service_mcp脚本是否存在 (仅当它是配置中的服务时)
  54. if server_name == "service_mcp" and server_config.get("args"):
  55. script_path = server_config["args"][0] if server_config["args"] else None
  56. if script_path and not os.path.exists(script_path):
  57. return {"error": f"Service MCP脚本不存在: {script_path}"}
  58. async with stdio_client(
  59. StdioServerParameters(
  60. command=server_config["command"],
  61. args=server_config["args"],
  62. env=None
  63. )
  64. ) as (read, write):
  65. async with ClientSession(read, write) as session:
  66. # 初始化MCP服务器
  67. await session.initialize()
  68. # 调用指定方法
  69. if method == "prompts/list":
  70. result = await session.list_prompts()
  71. elif method == "prompts/get" and params:
  72. result = await session.get_prompt(params["name"])
  73. elif method == "resources/list":
  74. result = await session.list_resources()
  75. elif method == "resources/read" and params:
  76. result = await session.read_resource(params["uri"])
  77. elif method == "tools/list":
  78. result = await session.list_tools()
  79. elif method == "tools/call" and params:
  80. result = await session.call_tool(params["name"], params.get("arguments", {}))
  81. else:
  82. # 通用方法调用
  83. result = await session.send_request(method, params or {})
  84. # 将结果转换为可序列化的字典
  85. return self._serialize_result(result)
  86. except Exception as e:
  87. return {"error": f"调用MCP服务时出错: {str(e)}"}
  88. def _serialize_result(self, result):
  89. """
  90. 将MCP结果转换为可JSON序列化的格式
  91. """
  92. if isinstance(result, BaseModel):
  93. return result.model_dump()
  94. elif isinstance(result, dict):
  95. return {key: self._serialize_result(value) for key, value in result.items()}
  96. elif isinstance(result, list):
  97. return [self._serialize_result(item) for item in result]
  98. else:
  99. return result
  100. def get_mcp_tool_list(self):
  101. """
  102. 获取MCP工具列表,用于添加到AI工具中
  103. 根据配置动态生成工具列表
  104. """
  105. tools = []
  106. # 为每个配置的MCP服务创建对应的工具
  107. for server_name, server_config in self.mcp_servers.items():
  108. tool_name = f"call_{server_name}_mcp"
  109. description = server_config.get("description", f"调用{server_name} MCP服务")
  110. tools.append({
  111. "type": "function",
  112. "function": {
  113. "name": tool_name,
  114. "description": description,
  115. "parameters": {
  116. "type": "object",
  117. "properties": {
  118. "method": {
  119. "type": "string",
  120. "description": "要调用的MCP方法,如tools/list, resources/list等"
  121. },
  122. "params": {
  123. "type": "object",
  124. "description": "方法参数"
  125. }
  126. },
  127. "required": ["method"]
  128. }
  129. }
  130. })
  131. return tools
  132. def call_mcp_tool(self, tool_name, parameters):
  133. """
  134. 执行MCP工具调用
  135. Args:
  136. tool_name (str): 工具名称
  137. parameters (dict): 工具参数
  138. Returns:
  139. str: 工具执行结果
  140. """
  141. try:
  142. # 从工具名称中提取服务器名称
  143. if tool_name.startswith("call_") and tool_name.endswith("_mcp"):
  144. server_name = tool_name[5:-4] # 移除 "call_" 前缀和 "_mcp" 后缀
  145. else:
  146. return f"未知的MCP工具格式: {tool_name}"
  147. if server_name not in self.mcp_servers:
  148. return f"未配置的MCP服务: {server_name}"
  149. # 在异步环境中运行
  150. result = asyncio.run(
  151. self.call_mcp_server(
  152. server_name,
  153. parameters.get("method", ""),
  154. parameters.get("params")
  155. )
  156. )
  157. return json.dumps(result, ensure_ascii=False)
  158. except Exception as e:
  159. return f"MCP工具调用失败: {str(e)}"
  160. async def test_mcp_connection(self, server_name):
  161. """
  162. 测试MCP服务连接
  163. Args:
  164. server_name (str): 服务器名称
  165. Returns:
  166. dict: 测试结果
  167. """
  168. if server_name not in self.mcp_servers:
  169. return {"error": f"MCP server '{server_name}' not found"}
  170. try:
  171. result = await self.call_mcp_server(server_name, "tools/list")
  172. return {
  173. "status": "success",
  174. "server": server_name,
  175. "tools_available": len(result.get("tools", [])) if isinstance(result, dict) else 0,
  176. "result": result
  177. }
  178. except Exception as e:
  179. return {
  180. "status": "error",
  181. "server": server_name,
  182. "error": str(e)
  183. }
  184. def get_available_servers(self):
  185. """
  186. 获取所有可用的MCP服务器名称
  187. Returns:
  188. list: 可用的MCP服务器名称列表
  189. """
  190. return list(self.mcp_servers.keys())