import asyncio from mcp_tools import MCPTools async def test_all_mcp_services(): """ 测试所有配置的MCP服务 """ print("开始测试MCP服务...") print("=" * 50) # 初始化MCP工具 mcp_tools = MCPTools() # 获取所有配置的服务器 servers = mcp_tools.get_available_servers() if not servers: print("未找到任何配置的MCP服务") return print(f"找到 {len(servers)} 个MCP服务: {', '.join(servers)}") print() # 测试每个服务器 results = [] for server_name in servers: print(f"正在测试服务: {server_name}") try: result = await mcp_tools.test_mcp_connection(server_name) results.append(result) if result.get("status") == "success": tools_count = result.get("tools_available", 0) print(f" ✓ 连接成功 - 可用工具数: {tools_count}") else: print(f" ✗ 连接失败 - 错误: {result.get('error', '未知错误')}") except Exception as e: error_result = { "status": "error", "server": server_name, "error": str(e) } results.append(error_result) print(f" ✗ 测试异常 - 错误: {str(e)}") print() # 输出汇总结果 print("=" * 50) print("测试结果汇总:") success_count = sum(1 for r in results if r.get("status") == "success") print(f"总服务数: {len(servers)}") print(f"成功连接: {success_count}") print(f"连接失败: {len(servers) - success_count}") # 详细结果 print("\n详细结果:") for result in results: server = result.get("server", "未知") status = result.get("status", "未知") if status == "success": print(f" {server}: ✓ 成功") else: print(f" {server}: ✗ 失败 - {result.get('error', '未知错误')}") def main(): """ 主函数 """ try: asyncio.run(test_all_mcp_services()) except KeyboardInterrupt: print("\n测试被用户中断") except Exception as e: print(f"测试过程中发生错误: {e}") if __name__ == "__main__": main()