247 lines
9.8 KiB
Python
247 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
综合MCP测试脚本
|
||
测试LiteLLM与MCP服务器的集成
|
||
"""
|
||
|
||
import asyncio
|
||
import aiohttp
|
||
import json
|
||
import time
|
||
from typing import Dict, Any, Optional
|
||
|
||
class MCPTester:
|
||
def __init__(self, litellm_base_url: str = "http://localhost:12168", master_key: str = "sk-1234567890abcdef"):
|
||
self.litellm_base_url = litellm_base_url
|
||
self.master_key = master_key
|
||
self.session = None
|
||
|
||
async def __aenter__(self):
|
||
self.session = aiohttp.ClientSession()
|
||
return self
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
if self.session:
|
||
await self.session.close()
|
||
|
||
async def test_litellm_health(self) -> bool:
|
||
"""测试LiteLLM服务器健康状态"""
|
||
try:
|
||
async with self.session.get(f"{self.litellm_base_url}/health") as response:
|
||
if response.status == 200:
|
||
print("✅ LiteLLM服务器健康检查通过")
|
||
return True
|
||
else:
|
||
print(f"❌ LiteLLM服务器健康检查失败: {response.status}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ 无法连接到LiteLLM服务器: {e}")
|
||
return False
|
||
|
||
async def test_mcp_endpoint_direct(self, mcp_alias: str) -> bool:
|
||
"""直接测试MCP端点"""
|
||
try:
|
||
headers = {
|
||
"Authorization": f"Bearer {self.master_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
async with self.session.get(
|
||
f"{self.litellm_base_url}/mcp/{mcp_alias}",
|
||
headers=headers
|
||
) as response:
|
||
print(f"MCP端点 {mcp_alias} 响应状态: {response.status}")
|
||
|
||
if response.status == 200:
|
||
content_type = response.headers.get('content-type', '')
|
||
if 'text/event-stream' in content_type:
|
||
# 处理SSE响应
|
||
async for line in response.content:
|
||
line_str = line.decode('utf-8').strip()
|
||
if line_str.startswith('data: '):
|
||
data = line_str[6:] # 移除 'data: ' 前缀
|
||
try:
|
||
parsed_data = json.loads(data)
|
||
print(f"✅ MCP {mcp_alias} SSE响应: {json.dumps(parsed_data, indent=2)}")
|
||
return True
|
||
except json.JSONDecodeError:
|
||
print(f"⚠️ 无法解析SSE数据: {data}")
|
||
else:
|
||
text = await response.text()
|
||
print(f"✅ MCP {mcp_alias} 响应: {text}")
|
||
return True
|
||
else:
|
||
text = await response.text()
|
||
print(f"❌ MCP {mcp_alias} 请求失败: {text}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ 测试MCP端点 {mcp_alias} 时出错: {e}")
|
||
return False
|
||
|
||
async def test_mcp_tools_list(self, mcp_alias: str) -> Optional[Dict[str, Any]]:
|
||
"""测试MCP工具列表"""
|
||
try:
|
||
headers = {
|
||
"Authorization": f"Bearer {self.master_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# 构造JSON-RPC请求
|
||
jsonrpc_request = {
|
||
"jsonrpc": "2.0",
|
||
"method": "tools/list",
|
||
"params": {},
|
||
"id": 1
|
||
}
|
||
|
||
async with self.session.post(
|
||
f"{self.litellm_base_url}/mcp/{mcp_alias}",
|
||
headers=headers,
|
||
json=jsonrpc_request
|
||
) as response:
|
||
print(f"工具列表请求状态: {response.status}")
|
||
|
||
if response.status == 200:
|
||
result = await response.json()
|
||
print(f"✅ MCP {mcp_alias} 工具列表: {json.dumps(result, indent=2)}")
|
||
return result
|
||
else:
|
||
text = await response.text()
|
||
print(f"❌ 获取工具列表失败: {text}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"❌ 测试工具列表时出错: {e}")
|
||
return None
|
||
|
||
async def test_mcp_tool_call(self, mcp_alias: str, tool_name: str, arguments: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||
"""测试MCP工具调用"""
|
||
try:
|
||
headers = {
|
||
"Authorization": f"Bearer {self.master_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# 构造JSON-RPC请求
|
||
jsonrpc_request = {
|
||
"jsonrpc": "2.0",
|
||
"method": "tools/call",
|
||
"params": {
|
||
"name": tool_name,
|
||
"arguments": arguments
|
||
},
|
||
"id": 2
|
||
}
|
||
|
||
async with self.session.post(
|
||
f"{self.litellm_base_url}/mcp/{mcp_alias}",
|
||
headers=headers,
|
||
json=jsonrpc_request
|
||
) as response:
|
||
print(f"工具调用请求状态: {response.status}")
|
||
|
||
if response.status == 200:
|
||
result = await response.json()
|
||
print(f"✅ MCP {mcp_alias} 工具调用结果: {json.dumps(result, indent=2)}")
|
||
return result
|
||
else:
|
||
text = await response.text()
|
||
print(f"❌ 工具调用失败: {text}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"❌ 测试工具调用时出错: {e}")
|
||
return None
|
||
|
||
async def test_direct_mcp_server(self, url: str) -> bool:
|
||
"""直接测试MCP服务器"""
|
||
try:
|
||
print(f"\n🔍 直接测试MCP服务器: {url}")
|
||
|
||
# 测试初始化
|
||
async with self.session.get(url) as response:
|
||
print(f"直接MCP服务器响应状态: {response.status}")
|
||
|
||
if response.status == 200:
|
||
content_type = response.headers.get('content-type', '')
|
||
if 'text/event-stream' in content_type:
|
||
async for line in response.content:
|
||
line_str = line.decode('utf-8').strip()
|
||
if line_str.startswith('data: '):
|
||
data = line_str[6:]
|
||
try:
|
||
parsed_data = json.loads(data)
|
||
print(f"✅ 直接MCP服务器SSE响应: {json.dumps(parsed_data, indent=2)}")
|
||
return True
|
||
except json.JSONDecodeError:
|
||
print(f"⚠️ 无法解析SSE数据: {data}")
|
||
break
|
||
else:
|
||
text = await response.text()
|
||
print(f"✅ 直接MCP服务器响应: {text}")
|
||
return True
|
||
else:
|
||
text = await response.text()
|
||
print(f"❌ 直接MCP服务器请求失败: {text}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ 直接测试MCP服务器时出错: {e}")
|
||
return False
|
||
|
||
async def run_comprehensive_test(self):
|
||
"""运行综合测试"""
|
||
print("🚀 开始MCP综合测试\n")
|
||
|
||
# 1. 测试LiteLLM健康状态
|
||
print("1️⃣ 测试LiteLLM服务器健康状态")
|
||
health_ok = await self.test_litellm_health()
|
||
|
||
if not health_ok:
|
||
print("❌ LiteLLM服务器不可用,停止测试")
|
||
return
|
||
|
||
# 2. 测试本地MCP服务器
|
||
print("\n2️⃣ 测试本地MCP服务器")
|
||
await self.test_direct_mcp_server("http://localhost:8080/mcp")
|
||
|
||
# 3. 测试通过LiteLLM访问本地MCP
|
||
print("\n3️⃣ 测试通过LiteLLM访问本地MCP")
|
||
test_endpoint_ok = await self.test_mcp_endpoint_direct("test")
|
||
|
||
if test_endpoint_ok:
|
||
# 4. 测试工具列表
|
||
print("\n4️⃣ 测试本地MCP工具列表")
|
||
tools_result = await self.test_mcp_tools_list("test")
|
||
|
||
if tools_result and 'result' in tools_result and 'tools' in tools_result['result']:
|
||
tools = tools_result['result']['tools']
|
||
print(f"发现 {len(tools)} 个工具")
|
||
|
||
# 5. 测试工具调用
|
||
print("\n5️⃣ 测试工具调用")
|
||
for tool in tools[:3]: # 测试前3个工具
|
||
tool_name = tool['name']
|
||
print(f"\n测试工具: {tool_name}")
|
||
|
||
if tool_name == "echo":
|
||
await self.test_mcp_tool_call("test", "echo", {"message": "Hello MCP!"})
|
||
elif tool_name == "get_time":
|
||
await self.test_mcp_tool_call("test", "get_time", {})
|
||
elif tool_name == "calculate":
|
||
await self.test_mcp_tool_call("test", "calculate", {"expression": "2+2*3"})
|
||
|
||
# 6. 测试DeepWiki MCP
|
||
print("\n6️⃣ 测试DeepWiki MCP")
|
||
await self.test_mcp_endpoint_direct("deepwiki")
|
||
|
||
print("\n🎉 MCP综合测试完成")
|
||
|
||
async def main():
|
||
"""主函数"""
|
||
async with MCPTester() as tester:
|
||
await tester.run_comprehensive_test()
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main()) |