liurenchaxin/modules/legacy-support/litellm/comprehensive_mcp_test.py

247 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
综合MCP测试脚本
测试LiteLLM与MCP服务器的集成
"""
import asyncio
import aiohttp
import json
import time
from typing import Dict, Any, Optional
class MCPTester:
def __init__(self, litellm_base_url: str = "http://localhost:12168", master_key: str = "sk-1234567890abcdef"):
self.litellm_base_url = litellm_base_url
self.master_key = master_key
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def test_litellm_health(self) -> bool:
"""测试LiteLLM服务器健康状态"""
try:
async with self.session.get(f"{self.litellm_base_url}/health") as response:
if response.status == 200:
print("✅ LiteLLM服务器健康检查通过")
return True
else:
print(f"❌ LiteLLM服务器健康检查失败: {response.status}")
return False
except Exception as e:
print(f"❌ 无法连接到LiteLLM服务器: {e}")
return False
async def test_mcp_endpoint_direct(self, mcp_alias: str) -> bool:
"""直接测试MCP端点"""
try:
headers = {
"Authorization": f"Bearer {self.master_key}",
"Content-Type": "application/json"
}
async with self.session.get(
f"{self.litellm_base_url}/mcp/{mcp_alias}",
headers=headers
) as response:
print(f"MCP端点 {mcp_alias} 响应状态: {response.status}")
if response.status == 200:
content_type = response.headers.get('content-type', '')
if 'text/event-stream' in content_type:
# 处理SSE响应
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data: '):
data = line_str[6:] # 移除 'data: ' 前缀
try:
parsed_data = json.loads(data)
print(f"✅ MCP {mcp_alias} SSE响应: {json.dumps(parsed_data, indent=2)}")
return True
except json.JSONDecodeError:
print(f"⚠️ 无法解析SSE数据: {data}")
else:
text = await response.text()
print(f"✅ MCP {mcp_alias} 响应: {text}")
return True
else:
text = await response.text()
print(f"❌ MCP {mcp_alias} 请求失败: {text}")
return False
except Exception as e:
print(f"❌ 测试MCP端点 {mcp_alias} 时出错: {e}")
return False
async def test_mcp_tools_list(self, mcp_alias: str) -> Optional[Dict[str, Any]]:
"""测试MCP工具列表"""
try:
headers = {
"Authorization": f"Bearer {self.master_key}",
"Content-Type": "application/json"
}
# 构造JSON-RPC请求
jsonrpc_request = {
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": 1
}
async with self.session.post(
f"{self.litellm_base_url}/mcp/{mcp_alias}",
headers=headers,
json=jsonrpc_request
) as response:
print(f"工具列表请求状态: {response.status}")
if response.status == 200:
result = await response.json()
print(f"✅ MCP {mcp_alias} 工具列表: {json.dumps(result, indent=2)}")
return result
else:
text = await response.text()
print(f"❌ 获取工具列表失败: {text}")
return None
except Exception as e:
print(f"❌ 测试工具列表时出错: {e}")
return None
async def test_mcp_tool_call(self, mcp_alias: str, tool_name: str, arguments: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""测试MCP工具调用"""
try:
headers = {
"Authorization": f"Bearer {self.master_key}",
"Content-Type": "application/json"
}
# 构造JSON-RPC请求
jsonrpc_request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
},
"id": 2
}
async with self.session.post(
f"{self.litellm_base_url}/mcp/{mcp_alias}",
headers=headers,
json=jsonrpc_request
) as response:
print(f"工具调用请求状态: {response.status}")
if response.status == 200:
result = await response.json()
print(f"✅ MCP {mcp_alias} 工具调用结果: {json.dumps(result, indent=2)}")
return result
else:
text = await response.text()
print(f"❌ 工具调用失败: {text}")
return None
except Exception as e:
print(f"❌ 测试工具调用时出错: {e}")
return None
async def test_direct_mcp_server(self, url: str) -> bool:
"""直接测试MCP服务器"""
try:
print(f"\n🔍 直接测试MCP服务器: {url}")
# 测试初始化
async with self.session.get(url) as response:
print(f"直接MCP服务器响应状态: {response.status}")
if response.status == 200:
content_type = response.headers.get('content-type', '')
if 'text/event-stream' in content_type:
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data: '):
data = line_str[6:]
try:
parsed_data = json.loads(data)
print(f"✅ 直接MCP服务器SSE响应: {json.dumps(parsed_data, indent=2)}")
return True
except json.JSONDecodeError:
print(f"⚠️ 无法解析SSE数据: {data}")
break
else:
text = await response.text()
print(f"✅ 直接MCP服务器响应: {text}")
return True
else:
text = await response.text()
print(f"❌ 直接MCP服务器请求失败: {text}")
return False
except Exception as e:
print(f"❌ 直接测试MCP服务器时出错: {e}")
return False
async def run_comprehensive_test(self):
"""运行综合测试"""
print("🚀 开始MCP综合测试\n")
# 1. 测试LiteLLM健康状态
print("1⃣ 测试LiteLLM服务器健康状态")
health_ok = await self.test_litellm_health()
if not health_ok:
print("❌ LiteLLM服务器不可用停止测试")
return
# 2. 测试本地MCP服务器
print("\n2⃣ 测试本地MCP服务器")
await self.test_direct_mcp_server("http://localhost:8080/mcp")
# 3. 测试通过LiteLLM访问本地MCP
print("\n3⃣ 测试通过LiteLLM访问本地MCP")
test_endpoint_ok = await self.test_mcp_endpoint_direct("test")
if test_endpoint_ok:
# 4. 测试工具列表
print("\n4⃣ 测试本地MCP工具列表")
tools_result = await self.test_mcp_tools_list("test")
if tools_result and 'result' in tools_result and 'tools' in tools_result['result']:
tools = tools_result['result']['tools']
print(f"发现 {len(tools)} 个工具")
# 5. 测试工具调用
print("\n5⃣ 测试工具调用")
for tool in tools[:3]: # 测试前3个工具
tool_name = tool['name']
print(f"\n测试工具: {tool_name}")
if tool_name == "echo":
await self.test_mcp_tool_call("test", "echo", {"message": "Hello MCP!"})
elif tool_name == "get_time":
await self.test_mcp_tool_call("test", "get_time", {})
elif tool_name == "calculate":
await self.test_mcp_tool_call("test", "calculate", {"expression": "2+2*3"})
# 6. 测试DeepWiki MCP
print("\n6⃣ 测试DeepWiki MCP")
await self.test_mcp_endpoint_direct("deepwiki")
print("\n🎉 MCP综合测试完成")
async def main():
"""主函数"""
async with MCPTester() as tester:
await tester.run_comprehensive_test()
if __name__ == "__main__":
asyncio.run(main())