356 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			356 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
| #!/usr/bin/env python3
 | ||
| """
 | ||
| MongoDB Swarm Integration Example
 | ||
| MongoDB与Swarm集成的完整使用示例
 | ||
| 
 | ||
| 这个示例展示了如何:
 | ||
| 1. 设置MongoDB MCP服务器
 | ||
| 2. 创建Swarm代理
 | ||
| 3. 执行各种数据库操作
 | ||
| 4. 处理错误和异常情况
 | ||
| """
 | ||
| 
 | ||
| import asyncio
 | ||
| import json
 | ||
| import os
 | ||
| import sys
 | ||
| import time
 | ||
| from datetime import datetime
 | ||
| from typing import Dict, List, Any
 | ||
| 
 | ||
| # 添加项目路径
 | ||
| sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
 | ||
| 
 | ||
| try:
 | ||
|     from src.mcp.swarm_mongodb_client import SwarmMongoDBClient, create_mongodb_functions
 | ||
|     from src.mcp.mongodb_mcp_config import MongoDBMCPConfig, SwarmMongoDBIntegration
 | ||
| except ImportError as e:
 | ||
|     print(f"导入错误: {e}")
 | ||
|     print("请确保已安装所需依赖: pip install pymongo requests")
 | ||
|     sys.exit(1)
 | ||
| 
 | ||
| # 模拟Swarm框架(如果没有实际的Swarm库)
 | ||
| class MockSwarm:
 | ||
|     """模拟Swarm客户端"""
 | ||
|     
 | ||
|     def __init__(self):
 | ||
|         self.agents = {}
 | ||
|     
 | ||
|     def register_agent(self, agent):
 | ||
|         """注册代理"""
 | ||
|         self.agents[agent.name] = agent
 | ||
|         print(f"✅ 注册代理: {agent.name}")
 | ||
|     
 | ||
|     def run(self, agent_name: str, message: str) -> str:
 | ||
|         """运行代理"""
 | ||
|         if agent_name not in self.agents:
 | ||
|             return f"错误: 代理 '{agent_name}' 不存在"
 | ||
|         
 | ||
|         agent = self.agents[agent_name]
 | ||
|         return agent.process_message(message)
 | ||
| 
 | ||
| class MockAgent:
 | ||
|     """模拟Swarm代理"""
 | ||
|     
 | ||
|     def __init__(self, name: str, instructions: str, functions: List[callable]):
 | ||
|         self.name = name
 | ||
|         self.instructions = instructions
 | ||
|         self.functions = {func.__name__: func for func in functions}
 | ||
|         self.conversation_history = []
 | ||
|     
 | ||
|     def process_message(self, message: str) -> str:
 | ||
|         """处理用户消息"""
 | ||
|         self.conversation_history.append({"role": "user", "content": message})
 | ||
|         
 | ||
|         # 简单的意图识别和函数调用
 | ||
|         response = self._analyze_and_execute(message)
 | ||
|         
 | ||
|         self.conversation_history.append({"role": "assistant", "content": response})
 | ||
|         return response
 | ||
|     
 | ||
|     def _analyze_and_execute(self, message: str) -> str:
 | ||
|         """分析消息并执行相应函数"""
 | ||
|         message_lower = message.lower()
 | ||
|         
 | ||
|         try:
 | ||
|             # 查询操作
 | ||
|             if any(keyword in message_lower for keyword in ['查询', '查找', '搜索', 'find', 'query', '显示']):
 | ||
|                 if '集合' in message_lower or 'collection' in message_lower:
 | ||
|                     return self.functions['mongodb_collections']()
 | ||
|                 else:
 | ||
|                     # 提取集合名称(简化处理)
 | ||
|                     collection_name = self._extract_collection_name(message)
 | ||
|                     return self.functions['mongodb_query'](collection_name, message)
 | ||
|             
 | ||
|             # 插入操作
 | ||
|             elif any(keyword in message_lower for keyword in ['插入', '添加', '创建', 'insert', 'add', 'create']):
 | ||
|                 collection_name = self._extract_collection_name(message)
 | ||
|                 # 这里需要更复杂的解析来提取文档内容
 | ||
|                 sample_doc = {"message": message, "timestamp": datetime.now().isoformat()}
 | ||
|                 return self.functions['mongodb_insert'](collection_name, sample_doc, "用户请求插入")
 | ||
|             
 | ||
|             # 统计操作
 | ||
|             elif any(keyword in message_lower for keyword in ['统计', '状态', 'stats', 'status', '信息']):
 | ||
|                 collection_name = self._extract_collection_name(message)
 | ||
|                 return self.functions['mongodb_stats'](collection_name)
 | ||
|             
 | ||
|             # 更新操作
 | ||
|             elif any(keyword in message_lower for keyword in ['更新', '修改', 'update', 'modify']):
 | ||
|                 collection_name = self._extract_collection_name(message)
 | ||
|                 query = {"message": {"$regex": "test"}}
 | ||
|                 update = {"$set": {"updated_at": datetime.now().isoformat()}}
 | ||
|                 return self.functions['mongodb_update'](collection_name, query, update, "用户请求更新")
 | ||
|             
 | ||
|             else:
 | ||
|                 return f"我理解您想要进行数据库操作,但需要更具体的指令。\n\n可用操作:\n- 查询数据: '查询users集合'\n- 插入数据: '向users集合插入数据'\n- 查看统计: '显示users集合统计信息'\n- 列出集合: '显示所有集合'"
 | ||
|         
 | ||
|         except Exception as e:
 | ||
|             return f"执行操作时出错: {str(e)}"
 | ||
|     
 | ||
|     def _extract_collection_name(self, message: str) -> str:
 | ||
|         """从消息中提取集合名称(简化实现)"""
 | ||
|         # 简单的关键词匹配
 | ||
|         common_collections = ['users', 'products', 'orders', 'logs', 'test', 'data']
 | ||
|         
 | ||
|         for collection in common_collections:
 | ||
|             if collection in message.lower():
 | ||
|                 return collection
 | ||
|         
 | ||
|         # 默认返回test集合
 | ||
|         return 'test'
 | ||
| 
 | ||
| 
 | ||
| class MongoDBSwarmDemo:
 | ||
|     """MongoDB Swarm集成演示"""
 | ||
|     
 | ||
|     def __init__(self):
 | ||
|         self.config = MongoDBMCPConfig.from_env()
 | ||
|         self.mongodb_client = None
 | ||
|         self.swarm = MockSwarm()
 | ||
|         self.setup_complete = False
 | ||
|     
 | ||
|     def setup(self) -> bool:
 | ||
|         """设置演示环境"""
 | ||
|         print("🚀 开始设置MongoDB Swarm集成演示...")
 | ||
|         
 | ||
|         try:
 | ||
|             # 1. 创建MongoDB客户端
 | ||
|             print(f"📊 连接到MongoDB MCP服务器: {self.config.mcp_server_url}")
 | ||
|             self.mongodb_client = SwarmMongoDBClient(
 | ||
|                 mcp_server_url=self.config.mcp_server_url,
 | ||
|                 default_database=self.config.default_database
 | ||
|             )
 | ||
|             
 | ||
|             # 2. 测试连接
 | ||
|             print(f"🔗 连接到数据库: {self.config.default_database}")
 | ||
|             result = self.mongodb_client.connect(self.config.default_database)
 | ||
|             
 | ||
|             if not result.get("success"):
 | ||
|                 print(f"❌ 数据库连接失败: {result.get('error')}")
 | ||
|                 print("💡 请确保MongoDB MCP服务器正在运行")
 | ||
|                 return False
 | ||
|             
 | ||
|             print(f"✅ 数据库连接成功")
 | ||
|             
 | ||
|             # 3. 创建MongoDB函数
 | ||
|             mongodb_functions = create_mongodb_functions(self.mongodb_client)
 | ||
|             print(f"🔧 创建了 {len(mongodb_functions)} 个MongoDB函数")
 | ||
|             
 | ||
|             # 4. 创建Swarm代理
 | ||
|             agent = MockAgent(
 | ||
|                 name="MongoDB助手",
 | ||
|                 instructions="你是一个MongoDB数据库专家,帮助用户管理和查询数据库。",
 | ||
|                 functions=[func["function"] for func in mongodb_functions]
 | ||
|             )
 | ||
|             
 | ||
|             self.swarm.register_agent(agent)
 | ||
|             
 | ||
|             self.setup_complete = True
 | ||
|             print("✅ 设置完成!")
 | ||
|             return True
 | ||
|             
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 设置失败: {str(e)}")
 | ||
|             return False
 | ||
|     
 | ||
|     def run_demo_scenarios(self):
 | ||
|         """运行演示场景"""
 | ||
|         if not self.setup_complete:
 | ||
|             print("❌ 请先完成设置")
 | ||
|             return
 | ||
|         
 | ||
|         print("\n" + "="*60)
 | ||
|         print("🎯 开始运行MongoDB Swarm演示场景")
 | ||
|         print("="*60)
 | ||
|         
 | ||
|         scenarios = [
 | ||
|             {
 | ||
|                 "name": "查看数据库状态",
 | ||
|                 "message": "显示数据库连接状态和统计信息",
 | ||
|                 "description": "检查数据库连接和基本信息"
 | ||
|             },
 | ||
|             {
 | ||
|                 "name": "列出所有集合",
 | ||
|                 "message": "显示所有集合",
 | ||
|                 "description": "查看数据库中的所有集合"
 | ||
|             },
 | ||
|             {
 | ||
|                 "name": "插入测试数据",
 | ||
|                 "message": "向test集合插入一些测试数据",
 | ||
|                 "description": "创建示例文档"
 | ||
|             },
 | ||
|             {
 | ||
|                 "name": "查询测试数据",
 | ||
|                 "message": "查询test集合中的数据",
 | ||
|                 "description": "检索刚插入的数据"
 | ||
|             },
 | ||
|             {
 | ||
|                 "name": "获取集合统计",
 | ||
|                 "message": "显示test集合的统计信息",
 | ||
|                 "description": "查看集合的详细统计"
 | ||
|             },
 | ||
|             {
 | ||
|                 "name": "更新数据",
 | ||
|                 "message": "更新test集合中的数据",
 | ||
|                 "description": "修改现有文档"
 | ||
|             }
 | ||
|         ]
 | ||
|         
 | ||
|         for i, scenario in enumerate(scenarios, 1):
 | ||
|             print(f"\n📋 场景 {i}: {scenario['name']}")
 | ||
|             print(f"📝 描述: {scenario['description']}")
 | ||
|             print(f"💬 用户消息: {scenario['message']}")
 | ||
|             print("-" * 40)
 | ||
|             
 | ||
|             try:
 | ||
|                 response = self.swarm.run("MongoDB助手", scenario['message'])
 | ||
|                 print(f"🤖 代理响应:\n{response}")
 | ||
|             except Exception as e:
 | ||
|                 print(f"❌ 场景执行失败: {str(e)}")
 | ||
|             
 | ||
|             print("-" * 40)
 | ||
|             time.sleep(1)  # 短暂暂停
 | ||
|     
 | ||
|     def interactive_mode(self):
 | ||
|         """交互模式"""
 | ||
|         if not self.setup_complete:
 | ||
|             print("❌ 请先完成设置")
 | ||
|             return
 | ||
|         
 | ||
|         print("\n" + "="*60)
 | ||
|         print("🎮 进入交互模式")
 | ||
|         print("💡 输入 'quit' 或 'exit' 退出")
 | ||
|         print("💡 输入 'help' 查看可用命令")
 | ||
|         print("="*60)
 | ||
|         
 | ||
|         while True:
 | ||
|             try:
 | ||
|                 user_input = input("\n👤 您: ").strip()
 | ||
|                 
 | ||
|                 if user_input.lower() in ['quit', 'exit', '退出']:
 | ||
|                     print("👋 再见!")
 | ||
|                     break
 | ||
|                 
 | ||
|                 if user_input.lower() in ['help', '帮助']:
 | ||
|                     self._show_help()
 | ||
|                     continue
 | ||
|                 
 | ||
|                 if not user_input:
 | ||
|                     continue
 | ||
|                 
 | ||
|                 print("🤖 MongoDB助手: ", end="")
 | ||
|                 response = self.swarm.run("MongoDB助手", user_input)
 | ||
|                 print(response)
 | ||
|                 
 | ||
|             except KeyboardInterrupt:
 | ||
|                 print("\n👋 再见!")
 | ||
|                 break
 | ||
|             except Exception as e:
 | ||
|                 print(f"❌ 错误: {str(e)}")
 | ||
|     
 | ||
|     def _show_help(self):
 | ||
|         """显示帮助信息"""
 | ||
|         help_text = """
 | ||
| 🔧 可用命令示例:
 | ||
| 
 | ||
| 📊 查询操作:
 | ||
|   - "查询users集合"
 | ||
|   - "显示test集合中的数据"
 | ||
|   - "搜索products集合"
 | ||
| 
 | ||
| ➕ 插入操作:
 | ||
|   - "向users集合插入数据"
 | ||
|   - "添加新记录到test集合"
 | ||
| 
 | ||
| 📈 统计信息:
 | ||
|   - "显示users集合统计信息"
 | ||
|   - "查看数据库状态"
 | ||
| 
 | ||
| 📋 管理操作:
 | ||
|   - "显示所有集合"
 | ||
|   - "列出集合"
 | ||
| 
 | ||
| 🔄 更新操作:
 | ||
|   - "更新test集合中的数据"
 | ||
|   - "修改users集合"
 | ||
| 
 | ||
| 💡 提示: 请在命令中包含集合名称,如 'users', 'test', 'products' 等
 | ||
| """
 | ||
|         print(help_text)
 | ||
|     
 | ||
|     def cleanup(self):
 | ||
|         """清理资源"""
 | ||
|         if self.mongodb_client:
 | ||
|             self.mongodb_client.close()
 | ||
|             print("🧹 已清理MongoDB客户端连接")
 | ||
| 
 | ||
| 
 | ||
| def main():
 | ||
|     """主函数"""
 | ||
|     print("🎯 MongoDB Swarm集成演示")
 | ||
|     print("=" * 50)
 | ||
|     
 | ||
|     demo = MongoDBSwarmDemo()
 | ||
|     
 | ||
|     try:
 | ||
|         # 设置演示环境
 | ||
|         if not demo.setup():
 | ||
|             print("\n❌ 演示设置失败,请检查:")
 | ||
|             print("1. MongoDB服务是否运行")
 | ||
|             print("2. MongoDB MCP服务器是否启动")
 | ||
|             print("3. 网络连接是否正常")
 | ||
|             return
 | ||
|         
 | ||
|         # 选择运行模式
 | ||
|         print("\n🎮 选择运行模式:")
 | ||
|         print("1. 自动演示场景")
 | ||
|         print("2. 交互模式")
 | ||
|         print("3. 两者都运行")
 | ||
|         
 | ||
|         try:
 | ||
|             choice = input("\n请选择 (1/2/3): ").strip()
 | ||
|         except KeyboardInterrupt:
 | ||
|             print("\n👋 再见!")
 | ||
|             return
 | ||
|         
 | ||
|         if choice == "1":
 | ||
|             demo.run_demo_scenarios()
 | ||
|         elif choice == "2":
 | ||
|             demo.interactive_mode()
 | ||
|         elif choice == "3":
 | ||
|             demo.run_demo_scenarios()
 | ||
|             demo.interactive_mode()
 | ||
|         else:
 | ||
|             print("❌ 无效选择,运行自动演示")
 | ||
|             demo.run_demo_scenarios()
 | ||
|     
 | ||
|     except KeyboardInterrupt:
 | ||
|         print("\n👋 用户中断,正在退出...")
 | ||
|     except Exception as e:
 | ||
|         print(f"\n❌ 演示过程中出现错误: {str(e)}")
 | ||
|     finally:
 | ||
|         demo.cleanup()
 | ||
| 
 | ||
| 
 | ||
| if __name__ == "__main__":
 | ||
|     main() |