#!/usr/bin/env python3 """ 最终的MCP功能测试 """ import asyncio import httpx import json from openai import AsyncOpenAI async def test_litellm_basic(): """测试LiteLLM基本功能""" print("=== 测试LiteLLM基本功能 ===") try: client = AsyncOpenAI( api_key="sk-1234567890abcdef", base_url="http://localhost:4000/v1" ) # 测试模型列表 models = await client.models.list() print(f"可用模型: {[model.id for model in models.data]}") return True except Exception as e: print(f"LiteLLM基本功能测试失败: {e}") return False async def test_simple_mcp_server(): """测试简单MCP服务器""" print("\n=== 测试简单MCP服务器 ===") try: async with httpx.AsyncClient() as client: response = await client.get( "http://localhost:8080/mcp", headers={"Accept": "text/event-stream"}, timeout=5.0 ) if response.status_code == 200: content = response.text print(f"MCP服务器响应: {content}") # 尝试解析JSON if "data:" in content: json_part = content.split("data:")[1].strip() data = json.loads(json_part) print(f"解析的工具: {data.get('result', {}).get('tools', [])}") return True else: print(f"MCP服务器返回错误: {response.status_code}") return False except Exception as e: print(f"简单MCP服务器测试失败: {e}") return False async def test_litellm_mcp_integration(): """测试LiteLLM与MCP的集成""" print("\n=== 测试LiteLLM MCP集成 ===") try: async with httpx.AsyncClient() as client: # 尝试不同的MCP端点 endpoints = [ "http://localhost:4000/mcp/test", "http://localhost:4000/mcp/tools", "http://localhost:4000/v1/mcp" ] for endpoint in endpoints: try: print(f"测试端点: {endpoint}") response = await client.get( endpoint, headers={ "Authorization": "Bearer sk-1234567890abcdef", "Accept": "text/event-stream" }, timeout=3.0 ) print(f"状态码: {response.status_code}") if response.status_code == 200: print(f"响应: {response.text[:200]}...") return True except Exception as e: print(f"端点 {endpoint} 失败: {e}") return False except Exception as e: print(f"LiteLLM MCP集成测试失败: {e}") return False async def main(): """主测试函数""" print("开始MCP功能综合测试...\n") # 测试各个组件 litellm_ok = await test_litellm_basic() mcp_server_ok = await test_simple_mcp_server() integration_ok = await test_litellm_mcp_integration() print("\n=== 测试结果总结 ===") print(f"LiteLLM基本功能: {'✓' if litellm_ok else '✗'}") print(f"简单MCP服务器: {'✓' if mcp_server_ok else '✗'}") print(f"LiteLLM MCP集成: {'✓' if integration_ok else '✗'}") if litellm_ok and mcp_server_ok: print("\n结论: LiteLLM和MCP服务器都正常工作,但LiteLLM的MCP集成可能需要额外配置。") elif litellm_ok: print("\n结论: LiteLLM正常工作,但MCP功能有问题。") else: print("\n结论: LiteLLM基本功能有问题。") if __name__ == "__main__": asyncio.run(main())