#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 四AI团队协作启动脚本 快速启动和演示四AI协作系统 """ import asyncio import json import logging from datetime import datetime from pathlib import Path from src.jixia.coordination.ai_team_collaboration import ( AITeamCollaboration, AIRole, MessageType, CollaborationType, WorkPhase ) # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) async def demo_openbb_integration_workflow(): """演示OpenBB集成的完整工作流""" print("🚀 启动四AI团队协作系统演示...") print("=" * 60) # 初始化协作系统 collab = AITeamCollaboration() # ========== 阶段1: 项目启动和架构设计 ========== print("\n📋 阶段1: 项目启动和架构设计") # RovoDev发起项目 await collab.broadcast_message( sender=AIRole.ROVODEV, content="""🎯 **OpenBB集成项目正式启动!** 项目目标:将OpenBB金融数据平台与稷下学宫八仙论道系统深度融合 各AI职责分工: - Qwen: 架构设计师,负责系统架构和技术选型 - Claude: 核心开发工程师,负责代码实现和API集成 - Gemini: 测试验证专家,负责功能测试和质量保证 - RovoDev: 项目整合专家,负责协调管理和文档整合 请大家查看各自的工作说明书,开始工作!""", message_type=MessageType.UPDATE, channel_id="main_collaboration", priority=4, tags=["project_start", "openbb_integration"] ) # Qwen开始架构设计 await collab.send_message( sender=AIRole.QWEN, content="""🏗️ **架构设计开始** 我已经开始OpenBB集成的架构设计工作,主要包括: 1. **数据抽象层设计** - 统一OpenBB与八仙系统的数据接口 2. **八仙数据路由** - 为每个八仙设计专属的数据获取策略 3. **智能降级机制** - OpenBB不可用时的备选方案 4. **性能优化架构** - 缓存和负载均衡策略 预计完成时间:2天 需要其他AI配合:Claude提供实现可行性反馈""", message_type=MessageType.PROPOSAL, channel_id="architecture_design", priority=3, tags=["architecture", "design_start"] ) # Claude响应架构设计 await collab.send_message( sender=AIRole.CLAUDE, content="""💻 **实现可行性评估** Qwen的架构设计方向很好!从实现角度我有几个建议: 1. **延迟导入策略** - 确保OpenBB未安装时系统正常运行 2. **异步数据处理** - 避免阻塞八仙论道的实时性 3. **缓存层设计** - 减少重复的API调用 4. **错误处理机制** - 优雅处理网络和数据异常 我可以在架构确定后立即开始实现工作。""", message_type=MessageType.ANSWER, channel_id="architecture_design", receiver=AIRole.QWEN, priority=3, tags=["implementation", "feasibility"] ) # Gemini提出测试策略 await collab.send_message( sender=AIRole.GEMINI, content="""🧪 **测试策略制定** 基于当前的架构讨论,我已经开始制定测试策略: 1. **单元测试** - 覆盖八仙数据路由和OpenBB集成 2. **集成测试** - 验证数据流和系统交互 3. **性能测试** - 确保响应时间和并发处理能力 4. **文化准确性测试** - 验证八仙特征的准确表达 需要架构文档确定后细化具体测试用例。""", message_type=MessageType.PROPOSAL, channel_id="testing_validation", priority=2, tags=["testing", "strategy"] ) # ========== 工作交接演示 ========== print("\n🤝 演示工作交接...") # 推进到设计完成阶段 await collab.advance_phase(WorkPhase.IMPLEMENTATION) # Qwen向Claude交接 await collab.handoff_work( from_ai=AIRole.QWEN, to_ai=AIRole.CLAUDE, task_description="基于架构设计实现OpenBB核心引擎", deliverables=[ "src/jixia/engines/enhanced_openbb_engine.py", "src/jixia/adapters/immortal_data_processor.py", "app/tabs/enhanced_openbb_tab.py" ], notes="""重点关注: 1. 八仙数据偏好的准确实现 2. 智能降级机制的稳定性 3. 文化特色功能的完整性""" ) # Claude接受任务并开始实现 await collab.send_message( sender=AIRole.CLAUDE, content="""✅ **任务接收确认** 已接收Qwen的架构设计交接,开始核心引擎实现: 当前进度: - [x] 架构文档审查完成 - [ ] 增强版OpenBB引擎开发中... - [ ] 八仙数据适配器实现 - [ ] Streamlit界面优化 预计完成时间:3天 如有技术问题会在代码实现频道讨论。""", message_type=MessageType.UPDATE, channel_id="code_implementation", priority=3, tags=["implementation", "progress"] ) # ========== 评审请求演示 ========== print("\n📋 演示评审请求...") await collab.request_review( sender=AIRole.CLAUDE, content="""📋 **代码评审请求** 已完成增强版OpenBB引擎的核心实现,请求评审: 主要特性: 1. ✅ 八仙专属数据路由器 2. ✅ 智能降级机制 3. ✅ 异步数据处理 4. ✅ 完整的错误处理 请重点关注: - 架构一致性 (Qwen) - 测试覆盖度 (Gemini) - 文档完整性 (RovoDev)""", reviewers=[AIRole.QWEN, AIRole.GEMINI, AIRole.ROVODEV], attachments=["src/jixia/engines/enhanced_openbb_engine.py"] ) # 各AI提供评审反馈 await collab.send_message( sender=AIRole.QWEN, content="""🏗️ **架构评审反馈** Claude的实现与架构设计高度一致!几个优化建议: ✅ 优点: - 八仙数据路由实现准确 - 降级机制设计合理 - 性能考虑周全 🔧 建议优化: - 可以增加更多的数据源provider - 缓存策略可以更加智能化 - 考虑添加实时数据流支持 总体评分:9/10,建议通过!""", message_type=MessageType.REVIEW, channel_id="cross_review", receiver=AIRole.CLAUDE, priority=3, tags=["review", "architecture", "approved"] ) await collab.send_message( sender=AIRole.GEMINI, content="""🧪 **测试评审反馈** 从测试角度评估Claude的实现: ✅ 测试友好性: - 代码结构清晰,易于测试 - 依赖注入设计良好 - 异常处理完善 📋 测试建议: - 需要补充Mock数据用于单元测试 - 建议增加性能基准测试 - 八仙特征测试用例需要完善 开始准备对应的测试套件!""", message_type=MessageType.REVIEW, channel_id="cross_review", receiver=AIRole.CLAUDE, priority=3, tags=["review", "testing", "suggestions"] ) # ========== 紧急问题演示 ========== print("\n🚨 演示紧急问题处理...") await collab.escalate_issue( reporter=AIRole.CLAUDE, issue_description="""OpenBB v4.3.0版本兼容性问题: 发现新版本OpenBB的API接口有重大变更,影响数据获取功能。 当前解决方案: 1. 临时锁定到v4.1.0版本 2. 准备适配新版本的兼容层 需要团队讨论优先级和解决方案。""", severity="high" ) # RovoDev协调解决 await collab.send_message( sender=AIRole.ROVODEV, content="""🎯 **紧急问题协调** 已接收Claude的问题报告,协调解决方案: 📋 行动计划: 1. **短期方案** (Claude负责): 锁定OpenBB v4.1.0版本,确保现有功能稳定 2. **中期方案** (Qwen设计): 设计兼容层架构,支持多版本OpenBB 3. **长期方案** (团队): 建立版本兼容性测试机制 **时间安排**: - 今日内完成版本锁定 - 3天内完成兼容层设计 - 1周内完成新版本适配 请各AI确认该计划。""", message_type=MessageType.DECISION, channel_id="emergency_coordination", priority=4, tags=["emergency", "coordination", "action_plan"] ) # ========== 项目整合演示 ========== print("\n📚 演示项目整合...") # 推进到整合阶段 await collab.advance_phase(WorkPhase.INTEGRATION) await collab.send_message( sender=AIRole.ROVODEV, content="""📚 **项目整合开始** 开始整合所有AI的工作成果: 🏗️ **Qwen交付物**: - ✅ 系统架构设计文档 - ✅ 数据抽象层接口规范 - ✅ 性能优化策略 💻 **Claude交付物**: - ✅ 增强版OpenBB引擎 - ✅ 八仙数据适配器 - ✅ Streamlit界面优化 🧪 **Gemini交付物**: - ✅ 完整测试套件 - ✅ 性能基准测试 - ✅ 质量保证报告 📋 **整合任务**: - [ ] 统一文档格式 - [ ] 集成测试验证 - [ ] 用户指南编写 - [ ] 最终质量检查 预计整合完成时间:2天""", message_type=MessageType.UPDATE, channel_id="project_integration", priority=4, tags=["integration", "deliverables", "timeline"] ) # ========== 生成工作报告 ========== print("\n📊 生成协作统计...") # 获取各AI的工作仪表板 for ai_role in AIRole: dashboard = collab.get_ai_dashboard(ai_role) print(f"\n🤖 {ai_role.value} 工作统计:") print(f" 状态: {dashboard['status']['status']}") print(f" 当前任务: {dashboard['status']['current_task']}") print(f" 待处理任务: {len(dashboard['pending_tasks'])}个") print(f" 协作得分: {dashboard['collaboration_stats']['collaboration_score']}") print(f" 活跃频道: {len(dashboard['active_channels'])}个") # 获取频道摘要 print(f"\n📢 频道活跃度统计:") for channel_id, channel in collab.channels.items(): summary = collab.get_channel_summary(channel_id) print(f" {summary['channel_name']}: {summary['total_messages']}条消息") print("\n🎉 四AI团队协作演示完成!") print("=" * 60) print("系统功能演示:") print("✅ 多频道协作通信") print("✅ 工作流程管理") print("✅ 任务交接机制") print("✅ 评审协作流程") print("✅ 紧急问题处理") print("✅ 项目整合管理") print("✅ 实时状态监控") print("\n🚀 可以启动Web界面进行可视化管理!") async def start_collaboration_system(): """启动协作系统的交互式版本""" import sys collab = AITeamCollaboration() print("🤖 四AI团队协作系统已启动") print("可用命令:") print(" send - 发送消息") print(" status - 查看状态") print(" channels - 查看频道") print(" dashboard - 查看AI仪表板") print(" handoff - 工作交接") print(" quit - 退出") # 检查是否在交互式环境中 if not sys.stdin.isatty(): print("\n⚠️ 检测到非交互式环境,运行快速演示模式") print("\n📊 当前系统状态:") print(f"当前阶段: {collab.current_phase.value}") for ai_role, status in collab.ai_status.items(): print(f"{ai_role.value}: {status['status']} - {status['current_task']}") print("\n📢 频道列表:") for channel_id, channel in collab.channels.items(): print(f"{channel.name} ({channel.channel_type.value}): {len(channel.message_history)}条消息") print("\n💡 要体验完整交互功能,请在真正的终端中运行:") print(" .venv/bin/python3 ai_collaboration_demo.py interactive") return while True: try: command = input("\n> ").strip().lower() if command == "quit": break elif command == "status": print(f"当前阶段: {collab.current_phase.value}") for ai_role, status in collab.ai_status.items(): print(f"{ai_role.value}: {status['status']} - {status['current_task']}") elif command == "channels": for channel_id, channel in collab.channels.items(): print(f"{channel.name} ({channel.channel_type.value}): {len(channel.message_history)}条消息") elif command.startswith("dashboard"): parts = command.split() if len(parts) > 1: try: ai_role = AIRole(parts[1].title()) dashboard = collab.get_ai_dashboard(ai_role) print(json.dumps(dashboard, indent=2, ensure_ascii=False, default=str)) except ValueError: print("无效的AI角色,可选:Qwen, Claude, Gemini, Rovodev") else: print("使用方法: dashboard ") elif command == "send": # 简化的消息发送 try: sender = input("发送者 (Qwen/Claude/Gemini/Rovodev): ") content = input("消息内容: ") channel = input("频道 (main_collaboration/architecture_design/etc): ") await collab.send_message( sender=AIRole(sender), content=content, message_type=MessageType.PROPOSAL, channel_id=channel or "main_collaboration" ) print("消息发送成功!") except EOFError: print("\n输入被中断") break except Exception as e: print(f"发送失败: {e}") else: print("未知命令") except EOFError: print("\n检测到EOF,退出交互模式") break except KeyboardInterrupt: print("\n检测到中断信号,退出交互模式") break except Exception as e: print(f"错误: {e}") print("👋 协作系统已退出") if __name__ == "__main__": import sys if len(sys.argv) > 1 and sys.argv[1] == "demo": # 运行演示 asyncio.run(demo_openbb_integration_workflow()) elif len(sys.argv) > 1 and sys.argv[1] == "interactive": # 交互式模式 asyncio.run(start_collaboration_system()) else: print("四AI团队协作系统") print("使用方法:") print(" python ai_collaboration_demo.py demo - 运行完整演示") print(" python ai_collaboration_demo.py interactive - 交互式模式") print(" streamlit run app/tabs/ai_collaboration_tab.py - 启动Web界面")