关习习 1 hónapja
szülő
commit
71bc5b3b3c
9 módosított fájl, 741 hozzáadás és 2 törlés
  1. 2 0
      .gitignore
  2. 123 2
      README.md
  3. 5 0
      key.example.json
  4. 197 0
      main.py
  5. 24 0
      mcp_config.json
  6. 220 0
      mcp_tools.py
  7. 8 0
      requirements.txt
  8. 77 0
      test_mcp_services.py
  9. 85 0
      tools.py

+ 2 - 0
.gitignore

@@ -58,3 +58,5 @@ docs/_build/
 # PyBuilder
 target/
 
+.idea
+key.json

+ 123 - 2
README.md

@@ -1,3 +1,124 @@
-# agent_bot
+# Agent Bot MCP智能体助手
 
-智能体demo样例
+一个基于大语言模型和MCP工具调用的智能体助手示例项目。
+
+### 安装依赖
+
+```bash
+pip install -r requirements.txt
+```
+
+### 配置
+
+1. 复制 [key.example.json](file:///D:/我的项目/myai_pack/agent_bot/key.example.json) 文件并重命名为 [key.json](file:///D:/我的项目/myai_pack/agent_bot/key.json):
+```json
+{
+  "openai_url": "你的API地址",
+  "api_key": "你的API密钥",
+  "model": "使用的模型名称"
+}
+```
+
+2. MCP服务配置在 [mcp_config.json](file:///D:/我的项目/myai_pack/agent_bot/mcp_config.json) 中,默认包含:
+   - context7: 上下文相关工具服务
+   - fetch: 网络请求服务
+   - bingcn: 必应中文搜索服务
+
+### 运行项目
+
+```bash
+python main.py
+```
+
+输入"退出"、"quit"或"exit"结束对话。
+
+## 工具说明
+
+### 内置工具
+
+1. **duck_search**
+   - 功能:基于DuckDuckGo的网络搜索
+   - 参数:`word` (搜索关键词)
+
+2. **read_webpage**
+   - 功能:读取指定URL网页的文本内容
+   - 参数:`url` (网页地址)
+
+### MCP工具
+
+项目支持通过MCP协议调用外部服务:
+
+1. **context7**
+   - 功能:上下文相关工具和服务
+   - 可用工具数:2个
+
+2. **fetch**
+   - 功能:网络请求工具
+   - 可用工具数:4个
+
+3. **bingcn**
+   - 功能:必应中文搜索
+   - 可用工具数:2个
+
+### 测试MCP服务
+
+使用以下命令测试所有MCP服务的可用性:
+
+```bash
+python test_mcp_services.py
+```
+
+测试结果将保存在 `mcp_test_results.json` 文件中。
+
+## 项目结构
+
+```
+.
+├── main.py                 # 主程序入口
+├── tools.py                # 工具实现
+├── mcp_tools.py            # MCP工具集成
+├── test_mcp_services.py    # MCP服务测试工具
+├── mcp_config.json         # MCP服务配置
+├── key.json               # API配置文件
+├── key.example.json       # API配置示例文件
+└── README.md              # 项目说明文档
+```
+
+## 使用示例
+
+启动程序后,可以与智能体进行对话,例如:
+
+```
+用户:帮我搜索一下人工智能的发展历史
+助手:正在为您搜索人工智能的发展历史...
+[执行搜索并返回结果]
+```
+
+```
+用户:帮我读取这个网页的内容:https://example.com
+助手:正在读取网页内容...
+[读取并返回网页文本内容]
+```
+
+## 扩展开发
+
+### 添加新工具
+
+1. 在 [tools.py](file:///D:/我的项目/myai_pack/agent_bot/tools.py) 中的 [get_tool_list](file:///D:/我的项目/myai_pack/agent_bot/tools.py#L25-L45) 方法中添加工具定义
+2. 在 [call_tool](file:///D:/我的项目/myai_pack/agent_bot/tools.py#L53-L101) 方法中实现工具逻辑
+
+### 添加MCP服务
+
+1. 在 [mcp_config.json](file:///D:/我的项目/myai_pack/agent_bot/mcp_config.json) 中添加服务配置
+2. 服务将自动在工具列表中可用
+
+## 注意事项
+
+1. 确保网络连接正常
+2. 确保API密钥有效
+3. MCP服务需要相应的npm包支持
+4. 某些网站可能有反爬虫机制,影响网页读取功能
+
+## 许可证
+
+本项目基于MIT许可证发布,详情请查看 [LICENSE](file:///D:/我的项目/myai_pack/agent_bot/LICENSE) 文件。

+ 5 - 0
key.example.json

@@ -0,0 +1,5 @@
+{
+  "openai_url": "YOUR_OPENAI_URL",
+  "api_key": "YOUR_API_KEY",
+  "model": "YOUR_MODEL"
+}

+ 197 - 0
main.py

@@ -0,0 +1,197 @@
+import openai
+import json
+from tools import Tools
+
+with open('key.json', 'r', encoding='utf-8') as f:
+    config = json.load(f)
+
+openai_url = config.get("openai_url")
+api_key = config.get("api_key")
+model = config.get("model")
+
+
+client = openai.OpenAI(
+    base_url=openai_url,
+    api_key=api_key,
+)
+
+tool = Tools()
+
+# 流式输出处理函数
+def stream_response(messages, tools=None):
+    full_response = ""
+    tool_calls = []
+
+    # 使用client对象调用API
+    response = client.chat.completions.create(
+        model=model,
+        messages=messages,
+        tools=tools,
+        tool_choice="auto",
+        stream=True
+    )
+
+    # 处理流式响应
+    for chunk in response:
+        # 提取delta内容
+        delta = chunk.choices[0].delta
+
+        # 处理文本内容(最终回答)
+        if hasattr(delta, 'content') and delta.content:
+            content = delta.content
+            full_response += content
+            print(content, end="", flush=True)  # 实时输出到控制台
+
+        # 处理工具调用(tool_calls)
+        if hasattr(delta, 'tool_calls') and delta.tool_calls:
+            for tool_call in delta.tool_calls:
+                # 初始化tool_calls数组(如果需要)
+                while len(tool_calls) <= tool_call.index:
+                    tool_calls.append({
+                        "id": "",
+                        "type": "function",
+                        "function": {
+                            "name": "",
+                            "arguments": ""
+                        }
+                    })
+
+                # 更新工具调用信息
+                if tool_call.id:
+                    tool_calls[tool_call.index]["id"] = tool_call.id
+
+                if tool_call.function:
+                    if tool_call.function.name:
+                        tool_calls[tool_call.index]["function"]["name"] = tool_call.function.name
+                    if tool_call.function.arguments:
+                        tool_calls[tool_call.index]["function"]["arguments"] += tool_call.function.arguments
+
+    # 如果有工具调用,处理并返回
+    if tool_calls:
+        # 解析工具调用参数并执行
+        executed_tools = []
+        for tool_call in tool_calls:
+            try:
+                arguments = json.loads(tool_call["function"]["arguments"])
+            except json.JSONDecodeError:
+                arguments = {}
+
+            tool_name = tool_call["function"]["name"]
+            tool_id = tool_call["id"]
+
+            # 执行工具
+            tool_result = tool.call_tool(tool_name, arguments)
+
+            executed_tools.append({
+                "name": tool_name,
+                "arguments": arguments,
+                "result": tool_result,
+                "id": tool_id
+            })
+
+        return {
+            "content": full_response,
+            "tool_calls": executed_tools
+        }
+
+    # 返回完整响应和工具调用信息
+    return {
+        "content": full_response,
+        "tool_calls": None
+    }
+
+
+# 主函数:处理多轮工具调用的流式输出
+def run_conversation(messages):
+    # 系统提示
+    system_prompt = """
+你是一个可以调用工具的AI助手。请根据用户需求,判断是否需要调用工具:
+1. 若需要,可调用工具列表中的工具(可多次调用,直到获取足够信息);
+2. 调用工具后,需等待工具返回结果,再决定是否继续调用或整理结果回答用户;
+3. 工具返回结果会以函数消息的形式提供给你,请基于所有信息生成最终回答。
+你的回答需要保持严谨和专业,假如你无法从工具调用结果和系统提示中获取足够的信息,请不要随意回答,请直接告诉用户暂无详情
+"""
+
+    # 添加系统提示到消息历史(如果还没有)
+    if not any(msg.get("role") == "system" for msg in messages):
+        messages.insert(0, {"role": "system", "content": system_prompt})
+
+    print("\n🤖 助手: ", end="", flush=True)
+
+    # 多轮工具调用循环
+    while True:
+        # 流式获取模型响应
+        response = stream_response(messages, tool.get_tool_list())
+
+        # 检查是否需要调用工具
+        if response["tool_calls"]:
+            tool_calls = response["tool_calls"]
+
+            # 添加助手的工具调用到消息历史
+            tool_call_list = []
+            for tool_call in tool_calls:
+                tool_call_list.append({
+                    "id": tool_call["id"],
+                    "type": "function",
+                    "function": {
+                        "name": tool_call["name"],
+                        "arguments": json.dumps(tool_call["arguments"])
+                    }
+                })
+
+            messages.append({
+                "role": "assistant",
+                "tool_calls": tool_call_list
+            })
+
+            # 执行所有工具并将结果添加到对话历史
+            for tool_call in tool_calls:
+                tool_name = tool_call["name"]
+                tool_result = tool_call["result"]
+                tool_id = tool_call["id"]
+
+                messages.append({
+                    "role": "tool",
+                    "tool_call_id": tool_id,
+                    "name": tool_name,
+                    "content": tool_result
+                })
+
+                print(f"\n\n🔍 工具结果 ({tool_name}): {tool_result}")
+
+            print("\n🤖 助手: ", end="", flush=True)
+        else:
+            # 没有工具调用,对话结束
+            messages.append({
+                "role": "assistant",
+                "content": response["content"]
+            })
+            break
+
+    return response["content"]
+
+
+# 多轮对话主循环
+def main():
+    # 初始化对话历史
+    messages = []
+    
+    print("开始多轮对话(输入'exit'结束对话)")
+    
+    while True:
+        user_input = input("\n👨‍💻 用户: ").strip()
+        
+        if user_input.lower() in ['退出', 'quit', 'exit']:
+            print("对话结束")
+            break
+            
+        if user_input:
+            messages.append({"role": "user", "content": user_input})
+            run_conversation(messages)
+        else:
+            print("请输入有效内容")
+
+
+# 示例:运行对话
+if __name__ == "__main__":
+    main()

+ 24 - 0
mcp_config.json

@@ -0,0 +1,24 @@
+{
+  "mcpServers": {
+    "context7": {
+      "command": "npx",
+      "args": [
+        "-y",
+        "@upstash/context7-mcp@latest"
+      ]
+    },
+    "fetch": {
+      "command": "npx",
+      "args": [
+        "-y",
+        "@tokenizin/mcp-npx-fetch"
+      ]
+    },
+    "bingcn": {
+      "command": "npx",
+      "args": [
+        "bing-cn-mcp"
+      ]
+    }
+  }
+}

+ 220 - 0
mcp_tools.py

@@ -0,0 +1,220 @@
+import json
+import subprocess
+import asyncio
+import sys
+import os
+from mcp import ClientSession, StdioServerParameters
+from mcp.client.stdio import stdio_client
+from pydantic import BaseModel
+
+class MCPTools:
+    def __init__(self, config_file="mcp_config.json"):
+        # 从配置文件加载MCP服务配置
+        self.mcp_servers = self._load_config(config_file)
+
+    def _load_config(self, config_file):
+        """
+        从配置文件加载MCP服务配置
+        
+        Args:
+            config_file (str): 配置文件路径
+            
+        Returns:
+            dict: MCP服务配置
+        """
+        try:
+            # 加载配置文件
+            with open(config_file, 'r', encoding='utf-8') as f:
+                config = json.load(f)
+            return config.get("mcpServers", {})
+        except Exception as e:
+            print(f"加载MCP配置文件时出错: {e}")
+            # 返回空配置
+            return {}
+
+    async def call_mcp_server(self, server_name, method, params=None):
+        """
+        调用指定的MCP服务器
+        
+        Args:
+            server_name (str): 服务器名称
+            method (str): 要调用的方法
+            params (dict): 方法参数
+            
+        Returns:
+            dict: MCP服务器的响应
+        """
+        if server_name not in self.mcp_servers:
+            return {"error": f"MCP server '{server_name}' not found"}
+
+        server_config = self.mcp_servers[server_name]
+        
+        try:
+            # 检查命令是否存在
+            try:
+                process = subprocess.Popen([server_config["command"]] + server_config["args"], 
+                                         stdout=subprocess.PIPE, 
+                                         stderr=subprocess.PIPE,
+                                         shell=(sys.platform == 'win32'))
+                process.terminate()  # 立即终止,只是检查命令是否存在
+            except FileNotFoundError:
+                return {"error": f"命令未找到: {server_config['command']}"}
+            
+            # 检查service_mcp脚本是否存在 (仅当它是配置中的服务时)
+            if server_name == "service_mcp" and server_config.get("args"):
+                script_path = server_config["args"][0] if server_config["args"] else None
+                if script_path and not os.path.exists(script_path):
+                    return {"error": f"Service MCP脚本不存在: {script_path}"}
+            
+            async with stdio_client(
+                StdioServerParameters(
+                    command=server_config["command"],
+                    args=server_config["args"],
+                    env=None
+                )
+            ) as (read, write):
+                async with ClientSession(read, write) as session:
+                    # 初始化MCP服务器
+                    await session.initialize()
+                    
+                    # 调用指定方法
+                    if method == "prompts/list":
+                        result = await session.list_prompts()
+                    elif method == "prompts/get" and params:
+                        result = await session.get_prompt(params["name"])
+                    elif method == "resources/list":
+                        result = await session.list_resources()
+                    elif method == "resources/read" and params:
+                        result = await session.read_resource(params["uri"])
+                    elif method == "tools/list":
+                        result = await session.list_tools()
+                    elif method == "tools/call" and params:
+                        result = await session.call_tool(params["name"], params.get("arguments", {}))
+                    else:
+                        # 通用方法调用
+                        result = await session.send_request(method, params or {})
+                    
+                    # 将结果转换为可序列化的字典
+                    return self._serialize_result(result)
+        except Exception as e:
+            return {"error": f"调用MCP服务时出错: {str(e)}"}
+
+    def _serialize_result(self, result):
+        """
+        将MCP结果转换为可JSON序列化的格式
+        """
+        if isinstance(result, BaseModel):
+            return result.model_dump()
+        elif isinstance(result, dict):
+            return {key: self._serialize_result(value) for key, value in result.items()}
+        elif isinstance(result, list):
+            return [self._serialize_result(item) for item in result]
+        else:
+            return result
+
+    def get_mcp_tool_list(self):
+        """
+        获取MCP工具列表,用于添加到AI工具中
+        根据配置动态生成工具列表
+        """
+        tools = []
+        
+        # 为每个配置的MCP服务创建对应的工具
+        for server_name, server_config in self.mcp_servers.items():
+            tool_name = f"call_{server_name}_mcp"
+            description = server_config.get("description", f"调用{server_name} MCP服务")
+            
+            tools.append({
+                "type": "function",
+                "function": {
+                    "name": tool_name,
+                    "description": description,
+                    "parameters": {
+                        "type": "object",
+                        "properties": {
+                            "method": {
+                                "type": "string",
+                                "description": "要调用的MCP方法,如tools/list, resources/list等"
+                            },
+                            "params": {
+                                "type": "object",
+                                "description": "方法参数"
+                            }
+                        },
+                        "required": ["method"]
+                    }
+                }
+            })
+        
+        return tools
+
+    def call_mcp_tool(self, tool_name, parameters):
+        """
+        执行MCP工具调用
+        
+        Args:
+            tool_name (str): 工具名称
+            parameters (dict): 工具参数
+            
+        Returns:
+            str: 工具执行结果
+        """
+        try:
+            # 从工具名称中提取服务器名称
+            if tool_name.startswith("call_") and tool_name.endswith("_mcp"):
+                server_name = tool_name[5:-4]  # 移除 "call_" 前缀和 "_mcp" 后缀
+            else:
+                return f"未知的MCP工具格式: {tool_name}"
+
+            if server_name not in self.mcp_servers:
+                return f"未配置的MCP服务: {server_name}"
+
+            # 在异步环境中运行
+            result = asyncio.run(
+                self.call_mcp_server(
+                    server_name, 
+                    parameters.get("method", ""), 
+                    parameters.get("params")
+                )
+            )
+            
+            return json.dumps(result, ensure_ascii=False)
+        except Exception as e:
+            return f"MCP工具调用失败: {str(e)}"
+
+    async def test_mcp_connection(self, server_name):
+        """
+        测试MCP服务连接
+        
+        Args:
+            server_name (str): 服务器名称
+            
+        Returns:
+            dict: 测试结果
+        """
+        if server_name not in self.mcp_servers:
+            return {"error": f"MCP server '{server_name}' not found"}
+            
+        try:
+            result = await self.call_mcp_server(server_name, "tools/list")
+            return {
+                "status": "success",
+                "server": server_name,
+                "tools_available": len(result.get("tools", [])) if isinstance(result, dict) else 0,
+                "result": result
+            }
+        except Exception as e:
+            return {
+                "status": "error",
+                "server": server_name,
+                "error": str(e)
+            }
+            
+    def get_available_servers(self):
+        """
+        获取所有可用的MCP服务器名称
+        
+        Returns:
+            list: 可用的MCP服务器名称列表
+        """
+        return list(self.mcp_servers.keys())

+ 8 - 0
requirements.txt

@@ -0,0 +1,8 @@
+openai
+json
+subprocess
+asyncio
+sys
+os
+mcp
+pydantic

+ 77 - 0
test_mcp_services.py

@@ -0,0 +1,77 @@
+import asyncio
+from mcp_tools import MCPTools
+
+async def test_all_mcp_services():
+    """
+    测试所有配置的MCP服务
+    """
+    print("开始测试MCP服务...")
+    print("=" * 50)
+    
+    # 初始化MCP工具
+    mcp_tools = MCPTools()
+    
+    # 获取所有配置的服务器
+    servers = mcp_tools.get_available_servers()
+    
+    if not servers:
+        print("未找到任何配置的MCP服务")
+        return
+    
+    print(f"找到 {len(servers)} 个MCP服务: {', '.join(servers)}")
+    print()
+    
+    # 测试每个服务器
+    results = []
+    for server_name in servers:
+        print(f"正在测试服务: {server_name}")
+        try:
+            result = await mcp_tools.test_mcp_connection(server_name)
+            results.append(result)
+            
+            if result.get("status") == "success":
+                tools_count = result.get("tools_available", 0)
+                print(f"  ✓ 连接成功 - 可用工具数: {tools_count}")
+            else:
+                print(f"  ✗ 连接失败 - 错误: {result.get('error', '未知错误')}")
+        except Exception as e:
+            error_result = {
+                "status": "error",
+                "server": server_name,
+                "error": str(e)
+            }
+            results.append(error_result)
+            print(f"  ✗ 测试异常 - 错误: {str(e)}")
+        print()
+    
+    # 输出汇总结果
+    print("=" * 50)
+    print("测试结果汇总:")
+    success_count = sum(1 for r in results if r.get("status") == "success")
+    print(f"总服务数: {len(servers)}")
+    print(f"成功连接: {success_count}")
+    print(f"连接失败: {len(servers) - success_count}")
+    
+    # 详细结果
+    print("\n详细结果:")
+    for result in results:
+        server = result.get("server", "未知")
+        status = result.get("status", "未知")
+        if status == "success":
+            print(f"  {server}: ✓ 成功")
+        else:
+            print(f"  {server}: ✗ 失败 - {result.get('error', '未知错误')}")
+def main():
+    """
+    主函数
+    """
+    try:
+        asyncio.run(test_all_mcp_services())
+    except KeyboardInterrupt:
+        print("\n测试被用户中断")
+    except Exception as e:
+        print(f"测试过程中发生错误: {e}")
+
+
+if __name__ == "__main__":
+    main()

+ 85 - 0
tools.py

@@ -0,0 +1,85 @@
+import json
+import urllib.request
+import urllib.parse
+from html.parser import HTMLParser
+from mcp_tools import MCPTools
+
+class MLStripper(HTMLParser):
+    def __init__(self):
+        super().__init__()
+        self.reset()
+        self.fed = []
+    
+    def handle_data(self, d):
+        self.fed.append(d)
+    
+    def get_data(self):
+        return ''.join(self.fed)
+
+def strip_tags(html):
+    s = MLStripper()
+    s.feed(html)
+    return s.get_data()
+
+class Tools:
+    def __init__(self):
+        self.mcp_tools = MCPTools()
+
+    def get_tool_list(self):
+        tools = [
+            {
+                "type": "function",
+                "function": {
+                    "name": "read_webpage",
+                    "description": "读取指定URL网页的文本内容",
+                    "parameters": {"type": "object", "properties": {"url": {"type": "string", "description": "要读取的网页URL"}}}
+                }
+            }
+        ]
+
+        # 添加MCP工具
+        tools.extend(self.mcp_tools.get_mcp_tool_list())
+        
+        return tools
+
+    def call_tool(self, tool_name, parameters):
+        print(f"🔧 正在执行工具: {tool_name}({parameters})")
+
+        # 处理原有的工具
+        if tool_name == "read_webpage":
+            # 实现读取网页内容功能
+            try:
+                url = parameters["url"]
+                req = urllib.request.Request(url, headers={
+                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
+                })
+                with urllib.request.urlopen(req) as response:
+                    html = response.read().decode('utf-8')
+                    text = strip_tags(html)
+                    return text
+            except Exception as e:
+                return f"读取网页错误: {str(e)}"
+        
+        # 处理MCP工具
+        elif tool_name.endswith("_mcp"):
+            result = self.mcp_tools.call_mcp_tool(tool_name, parameters)
+            # 解析结果并检查是否有错误
+            try:
+                result_dict = json.loads(result)
+                if "error" in result_dict:
+                    error_msg = result_dict['error']
+                    # 提供更详细的错误信息和建议
+                    if "unhandled errors in a TaskGroup" in error_msg:
+                        server_name = tool_name[5:-4]  # 提取服务名
+                        return f"MCP调用错误: {server_name}服务内部出现未处理的异常。这通常表示服务端存在问题。错误详情: {error_msg}"
+                    return f"MCP调用错误: {error_msg}"
+            except json.JSONDecodeError:
+                pass  # 如果不是JSON格式,直接返回原始结果
+            return result
+
+        return f"工具 {tool_name} 执行完成"
+
+if __name__ == "__main__":
+    tools = Tools()
+    print(tools.get_tool_list())
+    print(tools.call_tool("read_webpage", {"url": "https://www.baidu.com"}))