mcp_api_server.py 15 KB


  1. import json
  2. import subprocess
  3. import asyncio
  4. import sys
  5. from mcp import ClientSession, StdioServerParameters
  6. from mcp.client.stdio import stdio_client
  7. from pydantic import BaseModel
  8. from http.server import HTTPServer, BaseHTTPRequestHandler
  9. from urllib.parse import urlparse, parse_qs
  10. class MCPServerManager:
  11. def __init__(self, config_file="mcp_config.json"):
  12. # 从配置文件加载MCP服务配置
  13. self.mcp_servers = self._load_config(config_file)
  14. # 缓存每个服务的工具列表
  15. self._tools_cache = {}
  16. # 存储活动的会话
  17. self._active_sessions = {}
  18. def _load_config(self, config_file):
  19. """
  20. 从配置文件加载MCP服务配置
  21. Args:
  22. config_file (str): 配置文件路径
  23. Returns:
  24. dict: MCP服务配置
  25. """
  26. try:
  27. # 加载配置文件
  28. with open(config_file, 'r', encoding='utf-8') as f:
  29. config = json.load(f)
  30. return config.get("mcpServers", {})
  31. except Exception as e:
  32. print(f"加载MCP配置文件时出错: {e}")
  33. # 返回空配置
  34. return {}
  35. async def _test_server_command(self, server_name):
  36. """
  37. 测试服务器命令是否可用
  38. Args:
  39. server_name (str): 服务器名称
  40. Returns:
  41. bool: 命令是否可用
  42. """
  43. if server_name not in self.mcp_servers:
  44. return False
  45. server_config = self.mcp_servers[server_name]
  46. try:
  47. # 检查命令是否存在
  48. process = subprocess.Popen([server_config["command"]] + server_config["args"],
  49. stdout=subprocess.PIPE,
  50. stderr=subprocess.PIPE,
  51. shell=(sys.platform == 'win32'))
  52. # 给进程一点时间启动
  53. await asyncio.sleep(0.5)
  54. # 检查进程是否仍在运行
  55. if process.poll() is None:
  56. # 进程仍在运行,终止它
  57. process.terminate()
  58. try:
  59. process.wait(timeout=1)
  60. except subprocess.TimeoutExpired:
  61. process.kill()
  62. return True
  63. else:
  64. # 进程已退出,检查返回码
  65. _, stderr = process.communicate()
  66. if process.returncode == 0 or process.returncode == 1:
  67. # 返回码为0(成功)或1(一般错误)表示命令存在
  68. return True
  69. else:
  70. print(f"命令执行失败 {server_name}: {stderr.decode()}")
  71. return False
  72. except FileNotFoundError:
  73. print(f"命令未找到: {server_config['command']}")
  74. return False
  75. except Exception as e:
  76. print(f"测试命令时出错 {server_name}: {e}")
  77. return False
  78. async def call_mcp_server(self, server_name, method, params=None):
  79. """
  80. 调用指定的MCP服务器
  81. Args:
  82. server_name (str): 服务器名称
  83. method (str): 要调用的方法
  84. params (dict): 方法参数
  85. Returns:
  86. dict: MCP服务器的响应
  87. """
  88. if server_name not in self.mcp_servers:
  89. return {"error": f"MCP server '{server_name}' not found"}
  90. # 检查命令是否可用
  91. if not await self._test_server_command(server_name):
  92. return {"error": f"MCP server '{server_name}' 命令不可用"}
  93. try:
  94. # 创建临时会话(每次调用都创建新会话)
  95. server_config = self.mcp_servers[server_name]
  96. async with stdio_client(
  97. StdioServerParameters(
  98. command=server_config["command"],
  99. args=server_config["args"],
  100. env=None
  101. )
  102. ) as (read, write):
  103. async with ClientSession(read, write) as session:
  104. # 初始化MCP服务器
  105. await session.initialize()
  106. # 调用指定方法
  107. if method == "prompts/list":
  108. result = await session.list_prompts()
  109. elif method == "prompts/get" and params:
  110. result = await session.get_prompt(params["name"])
  111. elif method == "resources/list":
  112. result = await session.list_resources()
  113. elif method == "resources/read" and params:
  114. result = await session.read_resource(params["uri"])
  115. elif method == "tools/list":
  116. result = await session.list_tools()
  117. elif method == "tools/call" and params:
  118. result = await session.call_tool(params["name"], params.get("arguments", {}))
  119. else:
  120. # 通用方法调用
  121. result = await session.send_request(method, params or {})
  122. # 将结果转换为可序列化的字典
  123. return self._serialize_result(result)
  124. except Exception as e:
  125. return {"error": f"调用MCP服务时出错: {str(e)}"}
  126. def _serialize_result(self, result):
  127. """
  128. 将MCP结果转换为可JSON序列化的格式
  129. """
  130. if isinstance(result, BaseModel):
  131. return result.model_dump()
  132. elif isinstance(result, dict):
  133. return {key: self._serialize_result(value) for key, value in result.items()}
  134. elif isinstance(result, list):
  135. return [self._serialize_result(item) for item in result]
  136. else:
  137. return result
  138. async def get_actual_mcp_tools(self, server_name):
  139. """
  140. 获取指定MCP服务的实际工具列表
  141. Args:
  142. server_name (str): 服务器名称
  143. Returns:
  144. list: 实际的工具列表
  145. """
  146. if server_name not in self.mcp_servers:
  147. return []
  148. try:
  149. result = await self.call_mcp_server(server_name, "tools/list")
  150. if isinstance(result, dict) and "tools" in result:
  151. # 缓存工具列表
  152. self._tools_cache[server_name] = result["tools"]
  153. return result["tools"]
  154. return []
  155. except Exception as e:
  156. print(f"获取 {server_name} 工具列表时出错: {e}")
  157. return []
  158. async def get_all_mcp_tools(self):
  159. """
  160. 获取所有MCP服务的实际工具列表
  161. Returns:
  162. list: 所有MCP服务的工具列表,每个工具名称后缀添加_mcp
  163. """
  164. all_tools = []
  165. for server_name in self.mcp_servers.keys():
  166. try:
  167. tools = await self.get_actual_mcp_tools(server_name)
  168. # 为每个工具名称添加_mcp后缀以区分
  169. for tool in tools:
  170. # 创建工具副本并修改名称
  171. tool_copy = tool.copy()
  172. tool_copy['name'] = f"{tool['name']}_mcp"
  173. all_tools.append(tool_copy)
  174. except Exception as e:
  175. print(f"获取 {server_name} 工具列表时出错: {e}")
  176. return all_tools
  177. async def call_mcp_tool(self, tool_name, arguments):
  178. """
  179. 执行MCP工具调用
  180. Args:
  181. tool_name (str): 工具名称(需要以_mcp结尾)
  182. arguments (dict): 工具参数
  183. Returns:
  184. dict: 工具执行结果
  185. """
  186. try:
  187. # 验证工具名称格式
  188. if not tool_name.endswith("_mcp"):
  189. return {"error": f"无效的工具名称格式: {tool_name}"}
  190. # 移除_mcp后缀获取原始工具名称
  191. original_tool_name = tool_name[:-4]
  192. # 查找工具所属的服务器
  193. server_name = self._find_server_for_tool(original_tool_name)
  194. if not server_name:
  195. return {"error": f"未找到工具 {tool_name} 对应的MCP服务器"}
  196. # 调用工具
  197. result = await self.call_mcp_server(
  198. server_name,
  199. "tools/call",
  200. {
  201. "name": original_tool_name,
  202. "arguments": arguments
  203. }
  204. )
  205. return result
  206. except Exception as e:
  207. return {"error": f"MCP工具调用失败: {str(e)}"}
  208. def _find_server_for_tool(self, tool_name):
  209. """
  210. 根据工具名称查找对应的服务器名称
  211. Args:
  212. tool_name (str): 工具名称
  213. Returns:
  214. str or None: 服务器名称或None
  215. """
  216. # 遍历缓存的工具列表查找对应的服务器
  217. for server_name, tools in self._tools_cache.items():
  218. for tool in tools:
  219. if tool.get("name") == tool_name:
  220. return server_name
  221. return None
  222. async def initialize_all_servers(self):
  223. """
  224. 初始化所有MCP服务器并获取工具列表
  225. """
  226. print("正在测试MCP服务器连接...")
  227. for server_name in self.mcp_servers.keys():
  228. if await self._test_server_command(server_name):
  229. print(f"✓ {server_name} 命令可用")
  230. else:
  231. print(f"✗ {server_name} 命令不可用")
  232. class MCPAPIHandler(BaseHTTPRequestHandler):
  233. """
  234. MCP API HTTP请求处理器
  235. """
  236. # 类变量,用于存储MCPServerManager实例
  237. server_manager = None
  238. def _set_headers(self, status_code=200, content_type='application/json'):
  239. """
  240. 设置HTTP响应头
  241. """
  242. self.send_response(status_code)
  243. self.send_header('Content-type', content_type)
  244. self.send_header('Access-Control-Allow-Origin', '*')
  245. self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
  246. self.send_header('Access-Control-Allow-Headers', 'Content-Type')
  247. self.end_headers()
  248. def do_OPTIONS(self):
  249. """
  250. 处理CORS预检请求
  251. """
  252. self._set_headers()
  253. def do_GET(self):
  254. """
  255. 处理GET请求
  256. """
  257. parsed_path = urlparse(self.path)
  258. # 处理 /tools 接口
  259. if parsed_path.path == '/tools':
  260. self._handle_tools_request()
  261. else:
  262. self._set_headers(404)
  263. self.wfile.write(json.dumps({"error": "未找到接口"}).encode('utf-8'))
  264. def do_POST(self):
  265. """
  266. 处理POST请求
  267. """
  268. parsed_path = urlparse(self.path)
  269. # 处理 /call 接口
  270. if parsed_path.path == '/call':
  271. self._handle_call_request()
  272. else:
  273. self._set_headers(404)
  274. self.wfile.write(json.dumps({"error": "未找到接口"}).encode('utf-8'))
  275. def _handle_tools_request(self):
  276. """
  277. 处理 /tools 请求,返回所有MCP工具列表
  278. """
  279. try:
  280. # 在新的事件循环中运行异步代码
  281. loop = asyncio.new_event_loop()
  282. asyncio.set_event_loop(loop)
  283. tools = loop.run_until_complete(
  284. self.server_manager.get_all_mcp_tools()
  285. )
  286. loop.close()
  287. self._set_headers()
  288. self.wfile.write(json.dumps(tools, ensure_ascii=False).encode('utf-8'))
  289. except Exception as e:
  290. self._set_headers(500)
  291. self.wfile.write(json.dumps({"error": str(e)}).encode('utf-8'))
  292. def _handle_call_request(self):
  293. """
  294. 处理 /call 请求,调用指定的MCP工具
  295. """
  296. try:
  297. # 读取请求体
  298. content_length = int(self.headers['Content-Length'])
  299. post_data = self.rfile.read(content_length)
  300. # 解析JSON数据
  301. data = json.loads(post_data.decode('utf-8'))
  302. # 获取必要参数
  303. tool_name = data.get('name')
  304. arguments = data.get('arguments', {})
  305. if not tool_name:
  306. self._set_headers(400)
  307. self.wfile.write(json.dumps({"error": "缺少工具名称"}).encode('utf-8'))
  308. return
  309. # 在新的事件循环中运行异步代码
  310. loop = asyncio.new_event_loop()
  311. asyncio.set_event_loop(loop)
  312. result = loop.run_until_complete(
  313. self.server_manager.call_mcp_tool(tool_name, arguments)
  314. )
  315. loop.close()
  316. # 检查是否有错误
  317. if isinstance(result, dict) and "error" in result:
  318. self._set_headers(500)
  319. else:
  320. self._set_headers()
  321. self.wfile.write(json.dumps(result, ensure_ascii=False).encode('utf-8'))
  322. except json.JSONDecodeError:
  323. self._set_headers(400)
  324. self.wfile.write(json.dumps({"error": "无效的JSON格式"}).encode('utf-8'))
  325. except Exception as e:
  326. self._set_headers(500)
  327. self.wfile.write(json.dumps({"error": str(e)}).encode('utf-8'))
  328. class MCPAPIServer:
  329. """
  330. MCP API服务器主类
  331. """
  332. def __init__(self, host='localhost', port=8000, config_file='mcp_config.json'):
  333. self.host = host
  334. self.port = port
  335. self.server_manager = MCPServerManager(config_file)
  336. # 将server_manager设置为请求处理器的类变量
  337. MCPAPIHandler.server_manager = self.server_manager
  338. def start(self):
  339. """
  340. 启动MCP API服务器
  341. """
  342. # 初始化所有服务器
  343. print("正在初始化MCP服务器...")
  344. loop = asyncio.new_event_loop()
  345. asyncio.set_event_loop(loop)
  346. loop.run_until_complete(self.server_manager.initialize_all_servers())
  347. loop.close()
  348. print("MCP服务器初始化完成")
  349. # 启动HTTP服务器
  350. server_address = (self.host, self.port)
  351. httpd = HTTPServer(server_address, MCPAPIHandler)
  352. print(f"启动MCP API服务器: http://{self.host}:{self.port}")
  353. httpd.serve_forever()
  354. if __name__ == "__main__":
  355. # 创建并启动MCP API服务器
  356. api_server = MCPAPIServer()
  357. api_server.start()