#!/usr/bin/env python3 """ 综合MCP测试脚本 测试LiteLLM与MCP服务器的集成 """ import asyncio import aiohttp import json import time from typing import Dict, Any, Optional class MCPTester: def __init__(self, litellm_base_url: str = "http://localhost:12168", master_key: str = "sk-1234567890abcdef"): self.litellm_base_url = litellm_base_url self.master_key = master_key self.session = None async def __aenter__(self): self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def test_litellm_health(self) -> bool: """测试LiteLLM服务器健康状态""" try: async with self.session.get(f"{self.litellm_base_url}/health") as response: if response.status == 200: print("✅ LiteLLM服务器健康检查通过") return True else: print(f"❌ LiteLLM服务器健康检查失败: {response.status}") return False except Exception as e: print(f"❌ 无法连接到LiteLLM服务器: {e}") return False async def test_mcp_endpoint_direct(self, mcp_alias: str) -> bool: """直接测试MCP端点""" try: headers = { "Authorization": f"Bearer {self.master_key}", "Content-Type": "application/json" } async with self.session.get( f"{self.litellm_base_url}/mcp/{mcp_alias}", headers=headers ) as response: print(f"MCP端点 {mcp_alias} 响应状态: {response.status}") if response.status == 200: content_type = response.headers.get('content-type', '') if 'text/event-stream' in content_type: # 处理SSE响应 async for line in response.content: line_str = line.decode('utf-8').strip() if line_str.startswith('data: '): data = line_str[6:] # 移除 'data: ' 前缀 try: parsed_data = json.loads(data) print(f"✅ MCP {mcp_alias} SSE响应: {json.dumps(parsed_data, indent=2)}") return True except json.JSONDecodeError: print(f"⚠️ 无法解析SSE数据: {data}") else: text = await response.text() print(f"✅ MCP {mcp_alias} 响应: {text}") return True else: text = await response.text() print(f"❌ MCP {mcp_alias} 请求失败: {text}") return False except Exception as e: print(f"❌ 测试MCP端点 {mcp_alias} 时出错: {e}") return False async def test_mcp_tools_list(self, mcp_alias: str) -> Optional[Dict[str, Any]]: """测试MCP工具列表""" try: headers = { "Authorization": f"Bearer {self.master_key}", "Content-Type": "application/json" } # 构造JSON-RPC请求 jsonrpc_request = { "jsonrpc": "2.0", "method": "tools/list", "params": {}, "id": 1 } async with self.session.post( f"{self.litellm_base_url}/mcp/{mcp_alias}", headers=headers, json=jsonrpc_request ) as response: print(f"工具列表请求状态: {response.status}") if response.status == 200: result = await response.json() print(f"✅ MCP {mcp_alias} 工具列表: {json.dumps(result, indent=2)}") return result else: text = await response.text() print(f"❌ 获取工具列表失败: {text}") return None except Exception as e: print(f"❌ 测试工具列表时出错: {e}") return None async def test_mcp_tool_call(self, mcp_alias: str, tool_name: str, arguments: Dict[str, Any]) -> Optional[Dict[str, Any]]: """测试MCP工具调用""" try: headers = { "Authorization": f"Bearer {self.master_key}", "Content-Type": "application/json" } # 构造JSON-RPC请求 jsonrpc_request = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": tool_name, "arguments": arguments }, "id": 2 } async with self.session.post( f"{self.litellm_base_url}/mcp/{mcp_alias}", headers=headers, json=jsonrpc_request ) as response: print(f"工具调用请求状态: {response.status}") if response.status == 200: result = await response.json() print(f"✅ MCP {mcp_alias} 工具调用结果: {json.dumps(result, indent=2)}") return result else: text = await response.text() print(f"❌ 工具调用失败: {text}") return None except Exception as e: print(f"❌ 测试工具调用时出错: {e}") return None async def test_direct_mcp_server(self, url: str) -> bool: """直接测试MCP服务器""" try: print(f"\n🔍 直接测试MCP服务器: {url}") # 测试初始化 async with self.session.get(url) as response: print(f"直接MCP服务器响应状态: {response.status}") if response.status == 200: content_type = response.headers.get('content-type', '') if 'text/event-stream' in content_type: async for line in response.content: line_str = line.decode('utf-8').strip() if line_str.startswith('data: '): data = line_str[6:] try: parsed_data = json.loads(data) print(f"✅ 直接MCP服务器SSE响应: {json.dumps(parsed_data, indent=2)}") return True except json.JSONDecodeError: print(f"⚠️ 无法解析SSE数据: {data}") break else: text = await response.text() print(f"✅ 直接MCP服务器响应: {text}") return True else: text = await response.text() print(f"❌ 直接MCP服务器请求失败: {text}") return False except Exception as e: print(f"❌ 直接测试MCP服务器时出错: {e}") return False async def run_comprehensive_test(self): """运行综合测试""" print("🚀 开始MCP综合测试\n") # 1. 测试LiteLLM健康状态 print("1️⃣ 测试LiteLLM服务器健康状态") health_ok = await self.test_litellm_health() if not health_ok: print("❌ LiteLLM服务器不可用,停止测试") return # 2. 测试本地MCP服务器 print("\n2️⃣ 测试本地MCP服务器") await self.test_direct_mcp_server("http://localhost:8080/mcp") # 3. 测试通过LiteLLM访问本地MCP print("\n3️⃣ 测试通过LiteLLM访问本地MCP") test_endpoint_ok = await self.test_mcp_endpoint_direct("test") if test_endpoint_ok: # 4. 测试工具列表 print("\n4️⃣ 测试本地MCP工具列表") tools_result = await self.test_mcp_tools_list("test") if tools_result and 'result' in tools_result and 'tools' in tools_result['result']: tools = tools_result['result']['tools'] print(f"发现 {len(tools)} 个工具") # 5. 测试工具调用 print("\n5️⃣ 测试工具调用") for tool in tools[:3]: # 测试前3个工具 tool_name = tool['name'] print(f"\n测试工具: {tool_name}") if tool_name == "echo": await self.test_mcp_tool_call("test", "echo", {"message": "Hello MCP!"}) elif tool_name == "get_time": await self.test_mcp_tool_call("test", "get_time", {}) elif tool_name == "calculate": await self.test_mcp_tool_call("test", "calculate", {"expression": "2+2*3"}) # 6. 测试DeepWiki MCP print("\n6️⃣ 测试DeepWiki MCP") await self.test_mcp_endpoint_direct("deepwiki") print("\n🎉 MCP综合测试完成") async def main(): """主函数""" async with MCPTester() as tester: await tester.run_comprehensive_test() if __name__ == "__main__": asyncio.run(main())