#!/usr/bin/env python3 """ 改进的MCP服务器测试脚本 """ import asyncio import json import subprocess import sys from typing import Dict, Any, List, Optional async def test_mcp_server(server_name: str, command: List[str], env: Dict[str, str] = None): """测试MCP服务器""" print(f"\n=== 测试 {server_name} 服务器 ===") # 设置环境变量 process_env = {} if env: process_env.update(env) try: # 启动服务器进程 process = await asyncio.create_subprocess_exec( *command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=process_env ) # 读取并忽略所有非JSON输出 buffer = "" while True: line = await process.stdout.readline() if not line: break line_str = line.decode().strip() buffer += line_str + "\n" # 尝试解析JSON try: data = json.loads(line_str) if "jsonrpc" in data: print(f"收到JSON响应: {json.dumps(data, indent=2, ensure_ascii=False)}") break except json.JSONDecodeError: # 不是JSON,继续读取 continue # 如果没有找到JSON响应,显示缓冲区内容 if "jsonrpc" not in locals(): print(f"未找到JSON响应,原始输出: {buffer}") return # 初始化请求 init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} } } } # 发送初始化请求 process.stdin.write((json.dumps(init_request) + "\n").encode()) await process.stdin.drain() # 读取初始化响应 init_response = await read_json_response(process) if init_response: print(f"初始化成功") # 获取工具列表 tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list" } # 发送工具列表请求 process.stdin.write((json.dumps(tools_request) + "\n").encode()) await process.stdin.drain() # 读取工具列表响应 tools_response = await read_json_response(process) if tools_response: print(f"工具列表获取成功") # 如果有搜索工具,测试搜索功能 if "result" in tools_response and "tools" in tools_response["result"]: for tool in tools_response["result"]["tools"]: tool_name = tool.get("name") if tool_name and ("search" in tool_name or "document" in tool_name): print(f"\n测试工具: {tool_name}") # 测试搜索工具 search_request = { "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": tool_name, "arguments": { "query": "测试查询", "limit": 3 } } } # 发送搜索请求 process.stdin.write((json.dumps(search_request) + "\n").encode()) await process.stdin.drain() # 读取搜索响应 search_response = await read_json_response(process) if search_response: print(f"搜索测试成功") if "result" in search_response and "content" in search_response["result"]: for content in search_response["result"]["content"]: if content.get("type") == "text": print(f"搜索结果: {content.get('text', '')[:100]}...") break # 关闭进程 process.stdin.close() await process.wait() except Exception as e: print(f"测试 {server_name} 服务器时出错: {e}") async def read_json_response(process): """读取JSON响应""" buffer = "" while True: line = await process.stdout.readline() if not line: break line_str = line.decode().strip() buffer += line_str + "\n" # 尝试解析JSON try: data = json.loads(line_str) if "jsonrpc" in data: return data except json.JSONDecodeError: # 不是JSON,继续读取 continue # 如果没有找到JSON响应,返回None return None async def main(): """主函数""" print("开始测试MCP服务器...") # 测试context7服务器 await test_mcp_server( "context7", ["npx", "-y", "@upstash/context7-mcp"], {"DEFAULT_MINIMUM_TOKENS": ""} ) # 测试qdrant服务器 await test_mcp_server( "qdrant", ["ssh", "ben@dev1", "cd /home/ben/qdrant && source venv/bin/activate && python qdrant_mcp_server.py"], { "QDRANT_URL": "http://dev1:6333", "QDRANT_API_KEY": "313131", "COLLECTION_NAME": "mcp", "EMBEDDING_MODEL": "bge-m3" } ) # 测试qdrant-ollama服务器 await test_mcp_server( "qdrant-ollama", ["ssh", "ben@dev1", "cd /home/ben/qdrant && source venv/bin/activate && ./start_mcp_server.sh"], { "QDRANT_URL": "http://dev1:6333", "QDRANT_API_KEY": "313131", "COLLECTION_NAME": "ollama_mcp", "OLLAMA_MODEL": "nomic-embed-text", "OLLAMA_URL": "http://dev1:11434" } ) print("\n所有测试完成。") if __name__ == "__main__": asyncio.run(main())