tools.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import json
  2. import urllib.request
  3. import urllib.parse
  4. from html.parser import HTMLParser
  5. from mcp_tools import MCPTools
  6. class MLStripper(HTMLParser):
  7. def __init__(self):
  8. super().__init__()
  9. self.reset()
  10. self.fed = []
  11. def handle_data(self, d):
  12. self.fed.append(d)
  13. def get_data(self):
  14. return ''.join(self.fed)
  15. def strip_tags(html):
  16. s = MLStripper()
  17. s.feed(html)
  18. return s.get_data()
  19. class Tools:
  20. def __init__(self):
  21. self.mcp_tools = MCPTools()
  22. def get_tool_list(self):
  23. # 基础工具
  24. tools = [
  25. {
  26. "type": "function",
  27. "function": {
  28. "name": "read_webpage",
  29. "description": "读取指定URL网页的文本内容",
  30. "parameters": {"type": "object", "properties": {"url": {"type": "string", "description": "要读取的网页URL"}}}
  31. }
  32. }
  33. ]
  34. # 获取所有MCP服务的实际工具
  35. mcp_tools = self.mcp_tools.get_all_mcp_tools_sync()
  36. # 为每个MCP工具创建独立的工具定义
  37. for tool in mcp_tools:
  38. tools.append({
  39. "type": "function",
  40. "function": {
  41. "name": tool.get("name", "unknown_tool"),
  42. "description": tool.get("description", "未知工具"),
  43. "parameters": tool.get("inputSchema", {
  44. "type": "object",
  45. "properties": {},
  46. "required": []
  47. })
  48. }
  49. })
  50. return tools
  51. def call_tool(self, tool_name, parameters):
  52. print(f"🔧 正在执行工具: {tool_name}({parameters})")
  53. # 处理原有的工具
  54. if tool_name == "read_webpage":
  55. # 实现读取网页内容功能
  56. try:
  57. url = parameters["url"]
  58. req = urllib.request.Request(url, headers={
  59. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
  60. })
  61. with urllib.request.urlopen(req) as response:
  62. html = response.read().decode('utf-8')
  63. text = strip_tags(html)
  64. return text
  65. except Exception as e:
  66. return f"读取网页错误: {str(e)}"
  67. # 处理MCP工具
  68. else:
  69. # 根据工具名称确定所属的MCP服务
  70. server_name = self.mcp_tools.get_server_for_tool(tool_name)
  71. if server_name:
  72. # 构造正确的MCP调用参数
  73. mcp_params = {
  74. "method": "tools/call",
  75. "params": {
  76. "name": tool_name,
  77. "arguments": parameters
  78. }
  79. }
  80. mcp_tool_name = f"call_{server_name}_mcp"
  81. result = self.mcp_tools.call_mcp_tool(mcp_tool_name, mcp_params)
  82. # 解析结果并检查是否有错误
  83. try:
  84. result_dict = json.loads(result)
  85. if "error" in result_dict:
  86. error_msg = result_dict['error']
  87. return f"MCP调用错误: {error_msg}"
  88. except json.JSONDecodeError:
  89. pass # 如果不是JSON格式,直接返回原始结果
  90. return result
  91. else:
  92. return f"未知工具: {tool_name}"
  93. if __name__ == "__main__":
  94. tools = Tools()
  95. print(tools.get_tool_list())