#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多群聊协调系统测试脚本 """ import asyncio import sys import os from datetime import datetime # 添加项目路径 sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) from jixia.coordination.multi_chat_coordinator import ( MultiChatCoordinator, ChatType, MessagePriority, CoordinationAction ) async def test_basic_messaging(): """测试基本消息功能""" print("\n🧪 测试基本消息功能") print("=" * 50) coordinator = MultiChatCoordinator() # 测试正常消息发送 message1 = await coordinator.send_message( "main_debate", "正1", "AI投资是未来科技发展的重要驱动力", MessagePriority.NORMAL, ["观点", "AI", "投资"] ) print(f"✅ 消息1发送成功: {message1.id}") message2 = await coordinator.send_message( "main_debate", "反1", "AI投资存在泡沫风险,需要谨慎对待", MessagePriority.NORMAL, ["反驳", "风险", "投资"] ) print(f"✅ 消息2发送成功: {message2.id}") # 测试内部讨论 message3 = await coordinator.send_message( "positive_internal", "正2", "我们需要收集更多AI成功案例的数据", MessagePriority.HIGH, ["策略", "数据"] ) print(f"✅ 内部消息发送成功: {message3.id}") return coordinator async def test_escalation_rules(): """测试升级规则""" print("\n🚨 测试升级规则") print("=" * 50) coordinator = MultiChatCoordinator() # 发送紧急消息,应该触发升级规则 urgent_message = await coordinator.send_message( "main_debate", "正3", "系统检测到异常行为,需要紧急干预", MessagePriority.URGENT, ["紧急", "系统"] ) print(f"🚨 紧急消息发送: {urgent_message.id}") # 等待协调规则处理 await asyncio.sleep(0.1) # 检查Human干预群是否收到升级消息 human_room = coordinator.chat_rooms["human_intervention"] escalated_messages = [msg for msg in human_room.message_history if "升级" in msg.tags] if escalated_messages: print(f"✅ 升级规则生效,Human干预群收到 {len(escalated_messages)} 条升级消息") for msg in escalated_messages: print(f" 📨 {msg.sender}: {msg.content[:100]}...") else: print("❌ 升级规则未生效") return coordinator async def test_broadcast_rules(): """测试广播规则""" print("\n📢 测试广播规则") print("=" * 50) coordinator = MultiChatCoordinator() # 在策略会议群发送决策消息 strategy_message = await coordinator.send_message( "strategy_meeting", "系统", "决策:采用数据驱动的论证策略", MessagePriority.HIGH, ["决策", "策略"] ) print(f"📋 策略决策发送: {strategy_message.id}") # 等待协调规则处理 await asyncio.sleep(0.1) # 检查内部讨论群是否收到广播消息 broadcast_count = 0 for room_id, room in coordinator.chat_rooms.items(): if room.chat_type == ChatType.INTERNAL_DISCUSSION: broadcast_messages = [msg for msg in room.message_history if "广播" in msg.tags] if broadcast_messages: broadcast_count += len(broadcast_messages) print(f"✅ {room.name} 收到 {len(broadcast_messages)} 条广播消息") if broadcast_count > 0: print(f"✅ 广播规则生效,总共发送 {broadcast_count} 条广播消息") else: print("❌ 广播规则未生效") return coordinator async def test_filter_rules(): """测试过滤规则""" print("\n🔍 测试过滤规则") print("=" * 50) coordinator = MultiChatCoordinator() # 发送低质量消息 low_quality_message = await coordinator.send_message( "main_debate", "正4", "好的", MessagePriority.LOW ) print(f"📝 低质量消息发送: {low_quality_message.id}") # 等待协调规则处理 await asyncio.sleep(0.1) # 检查消息是否被过滤 if low_quality_message.metadata.get("filtered"): print(f"✅ 过滤规则生效,消息被标记为已过滤") print(f" 过滤原因: {low_quality_message.metadata.get('filter_reason')}") else: print("❌ 过滤规则未生效") return coordinator async def test_discussion_merging(): """测试讨论合并""" print("\n🔗 测试讨论合并") print("=" * 50) coordinator = MultiChatCoordinator() # 发送相关消息 messages = [ ("main_debate", "正1", "AI技术的发展速度令人惊叹", ["AI", "技术"]), ("positive_internal", "正2", "我们应该强调AI技术的创新价值", ["AI", "创新"]), ("negative_internal", "反1", "AI技术也带来了就业问题", ["AI", "就业"]), ] sent_messages = [] for chat_id, sender, content, tags in messages: msg = await coordinator.send_message(chat_id, sender, content, tags=tags) sent_messages.append(msg) print(f"📨 发送消息: {sender} - {content[:30]}...") # 发送触发合并的消息 trigger_message = await coordinator.send_message( "main_debate", "系统", "需要整合关于AI技术的所有讨论", tags=["AI", "整合"] ) print(f"🔗 触发合并消息: {trigger_message.id}") # 等待协调规则处理 await asyncio.sleep(0.1) # 检查策略会议群是否收到合并摘要 strategy_room = coordinator.chat_rooms["strategy_meeting"] merge_messages = [msg for msg in strategy_room.message_history if "合并" in msg.tags or "摘要" in msg.tags] if merge_messages: print(f"✅ 讨论合并生效,策略会议群收到 {len(merge_messages)} 条摘要") for msg in merge_messages: print(f" 📋 摘要: {msg.content[:100]}...") else: print("❌ 讨论合并未生效") return coordinator async def test_permission_system(): """测试权限系统""" print("\n🔐 测试权限系统") print("=" * 50) coordinator = MultiChatCoordinator() # 测试正常权限 try: normal_message = await coordinator.send_message( "main_debate", "正1", "这是一条正常消息" ) print(f"✅ 正常权限测试通过: {normal_message.id}") except Exception as e: print(f"❌ 正常权限测试失败: {e}") # 测试无权限用户 try: unauthorized_message = await coordinator.send_message( "main_debate", "未授权用户", "这是一条未授权消息" ) print(f"❌ 权限系统失效,未授权用户发送成功: {unauthorized_message.id}") except PermissionError as e: print(f"✅ 权限系统正常,拒绝未授权用户: {e}") except Exception as e: print(f"❌ 权限系统异常: {e}") # 测试内部群权限 try: internal_message = await coordinator.send_message( "positive_internal", "正2", "内部策略讨论" ) print(f"✅ 内部群权限测试通过: {internal_message.id}") except Exception as e: print(f"❌ 内部群权限测试失败: {e}") # 测试跨团队权限 try: cross_team_message = await coordinator.send_message( "positive_internal", "反1", "反方试图进入正方内部群" ) print(f"❌ 跨团队权限控制失效: {cross_team_message.id}") except PermissionError as e: print(f"✅ 跨团队权限控制正常: {e}") except Exception as e: print(f"❌ 跨团队权限控制异常: {e}") return coordinator async def test_system_status(): """测试系统状态""" print("\n📊 测试系统状态") print("=" * 50) coordinator = MultiChatCoordinator() # 发送一些测试消息 test_messages = [ ("main_debate", "正1", "测试消息1"), ("main_debate", "反1", "测试消息2"), ("positive_internal", "正2", "内部消息1"), ("negative_internal", "反2", "内部消息2"), ("strategy_meeting", "系统", "策略消息1"), ] for chat_id, sender, content in test_messages: await coordinator.send_message(chat_id, sender, content) # 获取系统状态 status = coordinator.get_chat_status() print(f"📈 系统状态报告:") print(f" 总群聊数: {status['total_rooms']}") print(f" 活跃群聊数: {status['active_rooms']}") print(f" 总消息数: {status['total_messages']}") print(f" 待处理消息: {status['pending_messages']}") print(f" 协调规则数: {status['coordination_rules']}") print(f" 活跃规则数: {status['active_rules']}") print(f"\n📋 群聊详情:") for room_id, room_info in status['rooms'].items(): print(f" 🏠 {room_info['name']} ({room_info['type']})") print(f" 参与者: {room_info['participants']} 人") print(f" 消息数: {room_info['messages']}") print(f" 活跃状态: {'✅' if room_info['is_active'] else '❌'}") # 测试数据保存 try: coordinator.save_coordination_data("test_coordination_data.json") print(f"\n💾 数据保存测试通过") except Exception as e: print(f"\n❌ 数据保存测试失败: {e}") return coordinator async def test_performance(): """测试性能""" print("\n⚡ 测试性能") print("=" * 50) coordinator = MultiChatCoordinator() # 批量发送消息测试 start_time = datetime.now() message_count = 100 print(f"📤 发送 {message_count} 条消息...") for i in range(message_count): chat_id = "main_debate" if i % 2 == 0 else "positive_internal" sender = f"测试用户{i % 4 + 1}" if chat_id == "positive_internal": sender = f"正{i % 4 + 1}" content = f"性能测试消息 {i + 1}: 这是一条用于性能测试的消息内容" try: await coordinator.send_message(chat_id, sender, content) except PermissionError: # 忽略权限错误,继续测试 pass end_time = datetime.now() duration = (end_time - start_time).total_seconds() print(f"⏱️ 性能测试结果:") print(f" 总耗时: {duration:.3f} 秒") print(f" 平均每条消息: {duration/message_count*1000:.2f} 毫秒") print(f" 消息处理速度: {message_count/duration:.1f} 条/秒") # 获取最终状态 final_status = coordinator.get_chat_status() print(f" 最终消息总数: {final_status['total_messages']}") return coordinator async def run_all_tests(): """运行所有测试""" print("🚀 多群聊协调系统测试开始") print("=" * 60) tests = [ ("基本消息功能", test_basic_messaging), ("升级规则", test_escalation_rules), ("广播规则", test_broadcast_rules), ("过滤规则", test_filter_rules), ("讨论合并", test_discussion_merging), ("权限系统", test_permission_system), ("系统状态", test_system_status), ("性能测试", test_performance), ] results = [] for test_name, test_func in tests: try: print(f"\n🧪 开始测试: {test_name}") coordinator = await test_func() results.append((test_name, "✅ 通过", None)) print(f"✅ {test_name} 测试完成") except Exception as e: results.append((test_name, "❌ 失败", str(e))) print(f"❌ {test_name} 测试失败: {e}") # 测试结果总结 print("\n" + "=" * 60) print("📊 测试结果总结") print("=" * 60) passed = 0 failed = 0 for test_name, status, error in results: print(f"{status} {test_name}") if error: print(f" 错误: {error}") if "✅" in status: passed += 1 else: failed += 1 print(f"\n📈 测试统计:") print(f" 通过: {passed}") print(f" 失败: {failed}") print(f" 总计: {passed + failed}") print(f" 成功率: {passed/(passed+failed)*100:.1f}%") if failed == 0: print("\n🎉 所有测试通过!多群聊协调系统运行正常。") else: print(f"\n⚠️ 有 {failed} 个测试失败,需要检查相关功能。") if __name__ == "__main__": asyncio.run(run_all_tests())