745 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			745 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
| #!/usr/bin/env python3
 | ||
| # -*- coding: utf-8 -*-
 | ||
| """
 | ||
| 集夏v2.1.0 综合功能测试
 | ||
| 验证所有新功能的集成效果和系统稳定性
 | ||
| """
 | ||
| import asyncio
 | ||
| import sys
 | ||
| import os
 | ||
| import time
 | ||
| import threading
 | ||
| import json
 | ||
| from datetime import datetime, timedelta
 | ||
| 
 | ||
| # 添加项目路径
 | ||
| sys.path.append('/home/ben/liurenchaxin/src')
 | ||
| 
 | ||
| # 导入所有核心模块
 | ||
| try:
 | ||
|     from jixia.debates.enhanced_priority_algorithm import EnhancedPriorityAlgorithm
 | ||
|     from jixia.debates.optimized_debate_flow import OptimizedDebateFlowController, FlowControlMode
 | ||
|     from jixia.intervention.human_intervention_system import DebateHealthMonitor
 | ||
|     from jixia.coordination.multi_chat_coordinator import MultiChatCoordinator
 | ||
| except ImportError as e:
 | ||
|     print(f"❌ 模块导入失败: {e}")
 | ||
|     print("请确保所有模块都已正确安装")
 | ||
|     sys.exit(1)
 | ||
| 
 | ||
| class V2_1_IntegrationTester:
 | ||
|     """v2.1.0 集成测试器"""
 | ||
|     
 | ||
|     def __init__(self):
 | ||
|         self.test_results = {}
 | ||
|         self.performance_metrics = {}
 | ||
|         self.error_log = []
 | ||
|         
 | ||
|         # 初始化各个组件
 | ||
|         try:
 | ||
|             self.priority_algorithm = EnhancedPriorityAlgorithm()
 | ||
|             self.flow_controller = OptimizedDebateFlowController()
 | ||
|             self.health_monitor = DebateHealthMonitor()
 | ||
|             self.chat_coordinator = MultiChatCoordinator()
 | ||
|             print("✅ 所有核心组件初始化成功")
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 组件初始化失败: {e}")
 | ||
|             self.error_log.append(f"初始化错误: {e}")
 | ||
|     
 | ||
|     def test_priority_algorithm_integration(self):
 | ||
|         """测试优先级算法集成"""
 | ||
|         print("\n🧪 测试优先级算法集成")
 | ||
|         print("-" * 40)
 | ||
|         
 | ||
|         try:
 | ||
|             # 模拟辩论场景
 | ||
|             test_speeches = [
 | ||
|                 {
 | ||
|                     "speaker": "吕洞宾",
 | ||
|                     "content": "根据最新的市场数据分析,AI投资领域显示出强劲的增长潜力。我们应该抓住这个机会。",
 | ||
|                     "context": {"stage": "起", "topic": "AI投资", "recent_speakers": []}
 | ||
|                 },
 | ||
|                 {
 | ||
|                     "speaker": "何仙姑",
 | ||
|                     "content": "但是我们必须谨慎考虑风险因素!市场波动性很大。",
 | ||
|                     "context": {"stage": "承", "topic": "AI投资", "recent_speakers": ["吕洞宾"]}
 | ||
|                 },
 | ||
|                 {
 | ||
|                     "speaker": "铁拐李",
 | ||
|                     "content": "我同意吕洞宾的观点,技术发展确实迅速,但何仙姑提到的风险也值得重视。",
 | ||
|                     "context": {"stage": "转", "topic": "AI投资", "recent_speakers": ["吕洞宾", "何仙姑"]}
 | ||
|                 }
 | ||
|             ]
 | ||
|             
 | ||
|             priorities = []
 | ||
|             for speech in test_speeches:
 | ||
|                 analysis = self.priority_algorithm.analyze_speech(
 | ||
|                     speech["content"], 
 | ||
|                     speech["speaker"], 
 | ||
|                     speech["context"]
 | ||
|                 )
 | ||
|                 
 | ||
|                 # 获取详细的分数分解
 | ||
|                 speaker = speech["speaker"]
 | ||
|                 context = speech["context"]
 | ||
|                 recent_speeches = test_speeches[:test_speeches.index(speech)]
 | ||
|                 
 | ||
|                 profile = self.priority_algorithm._get_or_create_speaker_profile(speaker)
 | ||
|                 self.priority_algorithm._update_speaker_profile(profile, recent_speeches)
 | ||
|                 
 | ||
|                 rebuttal_urgency = self.priority_algorithm._calculate_rebuttal_urgency(speaker, context, recent_speeches)
 | ||
|                 argument_strength = self.priority_algorithm._calculate_argument_strength(speaker, profile)
 | ||
|                 time_pressure = self.priority_algorithm._calculate_time_pressure(speaker, context)
 | ||
|                 audience_reaction = self.priority_algorithm._calculate_audience_reaction(speaker, context)
 | ||
|                 strategy_need = self.priority_algorithm._calculate_strategy_need(speaker, context, profile)
 | ||
|                 
 | ||
|                 priority = self.priority_algorithm.calculate_priority(
 | ||
|                     speaker, 
 | ||
|                     context,
 | ||
|                     recent_speeches
 | ||
|                 )
 | ||
|                 priorities.append((speaker, priority))
 | ||
|                 
 | ||
|                 print(f"发言者: {speaker}")
 | ||
|                 print(f"  反驳紧急性: {rebuttal_urgency:.6f}")
 | ||
|                 print(f"  论证强度: {argument_strength:.6f}")
 | ||
|                 print(f"  时间压力: {time_pressure:.6f}")
 | ||
|                 print(f"  观众反应: {audience_reaction:.6f}")
 | ||
|                 print(f"  策略需求: {strategy_need:.6f}")
 | ||
|                 print(f"  最终优先级: {priority:.6f}")
 | ||
|                 print()
 | ||
|             
 | ||
|             # 调试输出
 | ||
|             print(f"所有优先级值: {[p[1] for p in priorities]}")
 | ||
|             print(f"唯一优先级数量: {len(set(p[1] for p in priorities))}")
 | ||
|             print(f"优先级差异: {max(p[1] for p in priorities) - min(p[1] for p in priorities)}")
 | ||
|             
 | ||
|             # 验证优先级计算
 | ||
|             assert all(0 <= p[1] <= 1 for p in priorities), "优先级应该在0-1之间"
 | ||
|             assert len(set(p[1] for p in priorities)) > 1, "不同发言应该有不同优先级"
 | ||
|             
 | ||
|             self.test_results["priority_algorithm_integration"] = True
 | ||
|             print("✅ 优先级算法集成测试通过")
 | ||
|             return True
 | ||
|             
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 优先级算法集成测试失败: {e}")
 | ||
|             self.error_log.append(f"优先级算法集成错误: {e}")
 | ||
|             self.test_results["priority_algorithm_integration"] = False
 | ||
|             return False
 | ||
|     
 | ||
|     def test_flow_controller_integration(self):
 | ||
|         """测试流程控制器集成"""
 | ||
|         print("\n🧪 测试流程控制器集成")
 | ||
|         print("-" * 40)
 | ||
|         
 | ||
|         try:
 | ||
|             # 测试与优先级算法的集成
 | ||
|             initial_stage = self.flow_controller.current_stage
 | ||
|             print(f"初始阶段: {initial_stage.value}")
 | ||
|             
 | ||
|             # 模拟完整的辩论流程
 | ||
|             test_sequence = [
 | ||
|                 ("吕洞宾", "开场陈述:AI投资是未来发展的关键"),
 | ||
|                 ("何仙姑", "反方观点:需要谨慎评估风险"),
 | ||
|                 ("铁拐李", "补充论据:技术发展支持投资决策"),
 | ||
|                 ("汉钟离", "风险分析:市场不确定性因素"),
 | ||
|                 ("曹国舅", "综合观点:平衡收益与风险"),
 | ||
|                 ("蓝采和", "实践经验:类似投资案例分析"),
 | ||
|                 ("韩湘子", "未来展望:长期发展趋势"),
 | ||
|                 ("张果老", "总结陈词:理性投资建议")
 | ||
|             ]
 | ||
|             
 | ||
|             stage_transitions = 0
 | ||
|             for speaker, content in test_sequence:
 | ||
|                 # 记录发言
 | ||
|                 self.flow_controller.record_speech(speaker, content)
 | ||
|                 
 | ||
|                 # 检查是否需要推进阶段
 | ||
|                 if hasattr(self.flow_controller, '_should_advance_stage') and self.flow_controller._should_advance_stage():
 | ||
|                     old_stage = self.flow_controller.current_stage
 | ||
|                     if self.flow_controller.advance_stage():
 | ||
|                         stage_transitions += 1
 | ||
|                         print(f"阶段转换: {old_stage.value} -> {self.flow_controller.current_stage.value}")
 | ||
|                 
 | ||
|                 # 获取流程状态
 | ||
|                 status = self.flow_controller.get_flow_status()
 | ||
|                 print(f"发言者: {speaker}, 当前阶段: {status['current_stage']}, 进度: {status['stage_progress']}")
 | ||
|             
 | ||
|             # 验证流程控制
 | ||
|             final_status = self.flow_controller.get_flow_status()
 | ||
|             total_speeches = len(self.flow_controller.debate_history)
 | ||
|             assert total_speeches == len(test_sequence), f"发言总数应该匹配,期望{len(test_sequence)},实际{total_speeches}"
 | ||
|             assert stage_transitions > 0, "应该发生阶段转换"
 | ||
|             
 | ||
|             self.test_results["flow_controller_integration"] = True
 | ||
|             print(f"✅ 流程控制器集成测试通过,发生了 {stage_transitions} 次阶段转换")
 | ||
|             return True
 | ||
|             
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 流程控制器集成测试失败: {e}")
 | ||
|             self.error_log.append(f"流程控制器集成错误: {e}")
 | ||
|             self.test_results["flow_controller_integration"] = False
 | ||
|             return False
 | ||
|     
 | ||
|     def test_health_monitor_integration(self):
 | ||
|         """测试健康监控集成"""
 | ||
|         print("\n🧪 测试健康监控集成")
 | ||
|         print("-" * 40)
 | ||
|         
 | ||
|         try:
 | ||
|             # 模拟辩论数据
 | ||
|             debate_data = {
 | ||
|                 "participants": ["吕洞宾", "何仙姑", "铁拐李", "汉钟离"],
 | ||
|                 "speeches": [
 | ||
|                     {"speaker": "吕洞宾", "content": "我强烈支持这个提案", "timestamp": datetime.now()},
 | ||
|                     {"speaker": "何仙姑", "content": "我完全反对,这太危险了", "timestamp": datetime.now()},
 | ||
|                     {"speaker": "铁拐李", "content": "让我们理性分析一下", "timestamp": datetime.now()},
 | ||
|                     {"speaker": "汉钟离", "content": "数据显示情况复杂", "timestamp": datetime.now()}
 | ||
|                 ],
 | ||
|                 "current_stage": "承",
 | ||
|                 "duration": timedelta(minutes=15)
 | ||
|             }
 | ||
|             
 | ||
|             # 更新健康监控
 | ||
|             self.health_monitor.update_metrics(debate_data)
 | ||
|             
 | ||
|             # 检查健康状态
 | ||
|             health_status = self.health_monitor.get_health_status()
 | ||
|             health_report = self.health_monitor.get_health_report()
 | ||
|             print(f"健康状态: {health_status.value}")
 | ||
|             print(f"整体分数: {health_report['overall_score']:.1f}")
 | ||
|             print(f"监控指标数量: {len(health_report['metrics'])}")
 | ||
|             print(f"活跃警报: {health_report['active_alerts']}个")
 | ||
|             
 | ||
|             # 模拟问题场景
 | ||
|             problematic_data = {
 | ||
|                 "participants": ["吕洞宾", "何仙姑"],
 | ||
|                 "speeches": [
 | ||
|                     {"speaker": "吕洞宾", "content": "你们都是白痴!", "timestamp": datetime.now()},
 | ||
|                     {"speaker": "吕洞宾", "content": "我说了算!", "timestamp": datetime.now()},
 | ||
|                     {"speaker": "吕洞宾", "content": "闭嘴!", "timestamp": datetime.now()}
 | ||
|                 ],
 | ||
|                 "current_stage": "转",
 | ||
|                 "duration": timedelta(minutes=30)
 | ||
|             }
 | ||
|             
 | ||
|             self.health_monitor.update_metrics(problematic_data)
 | ||
|             
 | ||
|             # 检查是否触发警报
 | ||
|             alerts = self.health_monitor.active_alerts
 | ||
|             print(f"活跃警报数量: {len(alerts)}")
 | ||
|             
 | ||
|             # 验证监控功能
 | ||
|             assert health_status is not None, "应该有健康状态"
 | ||
|             assert isinstance(health_status, type(health_status)), "健康状态应该是HealthStatus枚举"
 | ||
|             
 | ||
|             self.test_results["health_monitor_integration"] = True
 | ||
|             print("✅ 健康监控集成测试通过")
 | ||
|             return True
 | ||
|             
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 健康监控集成测试失败: {e}")
 | ||
|             self.error_log.append(f"健康监控集成错误: {e}")
 | ||
|             self.test_results["health_monitor_integration"] = False
 | ||
|             return False
 | ||
|     
 | ||
|     async def test_chat_coordinator_integration(self):
 | ||
|         """测试多群聊协调集成"""
 | ||
|         print("\n🧪 测试多群聊协调集成")
 | ||
|         print("-" * 40)
 | ||
|         
 | ||
|         try:
 | ||
|             # 模拟多群聊场景
 | ||
|             main_chat_message = {
 | ||
|                 "chat_id": "main_debate",
 | ||
|                 "speaker": "吕洞宾",
 | ||
|                 "content": "我认为我们应该投资AI技术",
 | ||
|                 "timestamp": datetime.now()
 | ||
|             }
 | ||
|             
 | ||
|             # 处理主群聊消息
 | ||
|             await self.chat_coordinator.handle_message(main_chat_message)
 | ||
|             
 | ||
|             # 模拟策略讨论
 | ||
|             strategy_message = {
 | ||
|                 "chat_id": "strategy_positive",
 | ||
|                 "speaker": "铁拐李",
 | ||
|                 "content": "我们需要准备更多技术数据来支持论点",
 | ||
|                 "timestamp": datetime.now()
 | ||
|             }
 | ||
|             
 | ||
|             await self.chat_coordinator.handle_message(strategy_message)
 | ||
|             
 | ||
|             # 检查消息路由
 | ||
|             routing_status = self.chat_coordinator.get_routing_status()
 | ||
|             print(f"路由状态: {routing_status}")
 | ||
|             
 | ||
|             # 模拟协调决策
 | ||
|             coordination_result = await self.chat_coordinator.coordinate_response(
 | ||
|                 main_chat_message, 
 | ||
|                 context={"stage": "承", "topic": "AI投资"}
 | ||
|             )
 | ||
|             
 | ||
|             print(f"协调结果: {coordination_result}")
 | ||
|             
 | ||
|             # 验证协调功能
 | ||
|             assert coordination_result is not None, "应该有协调结果"
 | ||
|             
 | ||
|             self.test_results["chat_coordinator_integration"] = True
 | ||
|             print("✅ 多群聊协调集成测试通过")
 | ||
|             return True
 | ||
|             
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 多群聊协调集成测试失败: {e}")
 | ||
|             self.error_log.append(f"多群聊协调集成错误: {e}")
 | ||
|             self.test_results["chat_coordinator_integration"] = False
 | ||
|             return False
 | ||
|     
 | ||
|     async def test_cross_component_integration(self):
 | ||
|         """测试跨组件集成"""
 | ||
|         print("\n🧪 测试跨组件集成")
 | ||
|         print("-" * 40)
 | ||
|         
 | ||
|         try:
 | ||
|             # 清空之前的发言历史
 | ||
|             self.flow_controller.debate_history.clear()
 | ||
|             
 | ||
|             # 模拟完整的辩论流程
 | ||
|             debate_scenario = {
 | ||
|                 "topic": "人工智能投资策略",
 | ||
|                 "participants": ["吕洞宾", "何仙姑", "铁拐李", "汉钟离"],
 | ||
|                 "duration": 30  # 分钟
 | ||
|             }
 | ||
|             
 | ||
|             print(f"开始辩论: {debate_scenario['topic']}")
 | ||
|             
 | ||
|             # 1. 流程控制器管理发言顺序
 | ||
|             speakers_sequence = []
 | ||
|             for i in range(8):  # 模拟8轮发言
 | ||
|                 speaker = self.flow_controller.get_current_speaker()
 | ||
|                 speakers_sequence.append(speaker)
 | ||
|                 
 | ||
|                 # 2. 生成发言内容(简化)
 | ||
|                 content = f"这是{speaker}在第{i+1}轮的发言,关于{debate_scenario['topic']}"
 | ||
|                 
 | ||
|                 # 3. 优先级算法分析发言
 | ||
|                 context = {
 | ||
|                     "stage": self.flow_controller.current_stage.value,
 | ||
|                     "topic": debate_scenario['topic'],
 | ||
|                     "recent_speakers": speakers_sequence[-3:]
 | ||
|                 }
 | ||
|                 
 | ||
|                 analysis = self.priority_algorithm.analyze_speech(content, speaker, context)
 | ||
|                 
 | ||
|                 # 构建正确格式的recent_speeches
 | ||
|                 recent_speeches = []
 | ||
|                 for j, prev_speaker in enumerate(speakers_sequence):
 | ||
|                     recent_speeches.append({
 | ||
|                         "speaker": prev_speaker,
 | ||
|                         "content": f"这是{prev_speaker}在第{j+1}轮的发言",
 | ||
|                         "timestamp": datetime.now().isoformat(),
 | ||
|                         "team": "positive" if "正" in prev_speaker else "negative"
 | ||
|                     })
 | ||
|                 
 | ||
|                 priority = self.priority_algorithm.calculate_priority(speaker, context, recent_speeches)
 | ||
|                 
 | ||
|                 # 4. 记录发言到流程控制器
 | ||
|                 self.flow_controller.record_speech(speaker, content)
 | ||
|                 
 | ||
|                 # 5. 更新健康监控
 | ||
|                 debate_data = {
 | ||
|                     "participants": debate_scenario['participants'],
 | ||
|                     "speeches": [{"speaker": speaker, "content": content, "timestamp": datetime.now()}],
 | ||
|                     "current_stage": self.flow_controller.current_stage.value,
 | ||
|                     "duration": timedelta(minutes=i*2)
 | ||
|                 }
 | ||
|                 self.health_monitor.update_metrics(debate_data)
 | ||
|                 
 | ||
|                 # 6. 多群聊协调处理
 | ||
|                 message = {
 | ||
|                     "chat_id": "main_debate",
 | ||
|                     "speaker": speaker,
 | ||
|                     "content": content,
 | ||
|                     "timestamp": datetime.now()
 | ||
|                 }
 | ||
|                 # 异步调用
 | ||
|                 try:
 | ||
|                     await self.chat_coordinator.handle_message(message)
 | ||
|                 except Exception as e:
 | ||
|                     print(f"警告: 消息处理失败: {e}")
 | ||
|                 
 | ||
|                 print(f"第{i+1}轮 - 发言者: {speaker}, 优先级: {priority:.3f}, 阶段: {context['stage']}")
 | ||
|             
 | ||
|             # 验证集成效果
 | ||
|             print("\n开始获取各组件状态...")
 | ||
|             
 | ||
|             try:
 | ||
|                 flow_status = self.flow_controller.get_flow_status()
 | ||
|                 print(f"✅ 流程状态获取成功: {type(flow_status)}")
 | ||
|             except Exception as e:
 | ||
|                 print(f"❌ 流程状态获取失败: {e}")
 | ||
|                 raise
 | ||
|             
 | ||
|             try:
 | ||
|                 health_status = self.health_monitor.get_health_status()
 | ||
|                 print(f"✅ 健康状态获取成功: {type(health_status)}")
 | ||
|             except Exception as e:
 | ||
|                 print(f"❌ 健康状态获取失败: {e}")
 | ||
|                 raise
 | ||
|             
 | ||
|             try:
 | ||
|                 routing_status = self.chat_coordinator.get_routing_status()
 | ||
|                 print(f"✅ 路由状态获取成功: {type(routing_status)}, 值: {routing_status}")
 | ||
|             except Exception as e:
 | ||
|                 print(f"❌ 路由状态获取失败: {e}")
 | ||
|                 raise
 | ||
|             
 | ||
|             print(f"\n集成测试结果:")
 | ||
|             print(f"- 总发言数: {len(self.flow_controller.debate_history)}")
 | ||
|             print(f"- 当前阶段: {flow_status['current_stage']}")
 | ||
|             print(f"- 健康状态: {health_status.value}")
 | ||
|             
 | ||
|             # 安全地访问routing_status
 | ||
|             if isinstance(routing_status, dict):
 | ||
|                 print(f"- 活跃路由数: {routing_status.get('active_routes', 0)}")
 | ||
|                 print(f"- 消息队列大小: {routing_status.get('message_queue_size', 0)}")
 | ||
|                 print(f"- 总群聊数: {routing_status.get('total_rooms', 0)}")
 | ||
|             else:
 | ||
|                 print(f"- 路由状态: {routing_status}")
 | ||
|                 print(f"- 路由状态类型: {type(routing_status)}")
 | ||
|             
 | ||
|             # 验证所有组件都正常工作
 | ||
|             total_speeches = len(self.flow_controller.debate_history)
 | ||
|             assert total_speeches == 8, f"应该记录8次发言,实际{total_speeches}次"
 | ||
|             assert health_status is not None, "应该有健康状态"
 | ||
|             assert len(speakers_sequence) == 8, "应该有8个发言者记录"
 | ||
|             
 | ||
|             self.test_results["cross_component_integration"] = True
 | ||
|             print("✅ 跨组件集成测试通过")
 | ||
|             return True
 | ||
|             
 | ||
|         except Exception as e:
 | ||
|             import traceback
 | ||
|             print(f"❌ 跨组件集成测试失败: {e}")
 | ||
|             print(f"详细错误信息:")
 | ||
|             traceback.print_exc()
 | ||
|             self.error_log.append(f"跨组件集成错误: {e}")
 | ||
|             self.test_results["cross_component_integration"] = False
 | ||
|             return False
 | ||
|     
 | ||
|     def test_performance_under_load(self):
 | ||
|         """测试负载下的性能"""
 | ||
|         print("\n🧪 测试负载下的性能")
 | ||
|         print("-" * 40)
 | ||
|         
 | ||
|         try:
 | ||
|             # 性能测试参数
 | ||
|             num_speeches = 100
 | ||
|             num_threads = 5
 | ||
|             
 | ||
|             def simulate_debate_load():
 | ||
|                 """模拟辩论负载"""
 | ||
|                 thread_name = threading.current_thread().name
 | ||
|                 for i in range(num_speeches // num_threads):
 | ||
|                     try:
 | ||
|                         # 模拟发言处理
 | ||
|                         speaker = f"Speaker-{thread_name}-{i}"
 | ||
|                         content = f"这是来自{speaker}的测试发言 {i}"
 | ||
|                         
 | ||
|                         # 优先级计算
 | ||
|                         context = {"stage": "承", "topic": "性能测试", "recent_speakers": []}
 | ||
|                         analysis = self.priority_algorithm.analyze_speech(content, speaker, context)
 | ||
|                         priority = self.priority_algorithm.calculate_priority(speaker, context, [])
 | ||
|                         
 | ||
|                         # 流程记录
 | ||
|                         self.flow_controller.record_speech(speaker, content)
 | ||
|                         
 | ||
|                         # 健康监控
 | ||
|                         debate_data = {
 | ||
|                             "participants": [speaker],
 | ||
|                             "speeches": [{"speaker": speaker, "content": content, "timestamp": datetime.now()}],
 | ||
|                             "current_stage": "承",
 | ||
|                             "duration": timedelta(seconds=i)
 | ||
|                         }
 | ||
|                         self.health_monitor.update_metrics(debate_data)
 | ||
|                         
 | ||
|                     except Exception as e:
 | ||
|                         self.error_log.append(f"负载测试错误 {thread_name}-{i}: {e}")
 | ||
|             
 | ||
|             # 开始性能测试
 | ||
|             start_time = time.time()
 | ||
|             
 | ||
|             threads = []
 | ||
|             for i in range(num_threads):
 | ||
|                 thread = threading.Thread(target=simulate_debate_load, name=f"LoadTest-{i}")
 | ||
|                 threads.append(thread)
 | ||
|                 thread.start()
 | ||
|             
 | ||
|             for thread in threads:
 | ||
|                 thread.join()
 | ||
|             
 | ||
|             end_time = time.time()
 | ||
|             duration = end_time - start_time
 | ||
|             
 | ||
|             # 计算性能指标
 | ||
|             total_operations = num_speeches * 4  # 每次发言包含4个操作
 | ||
|             ops_per_second = total_operations / duration
 | ||
|             
 | ||
|             self.performance_metrics = {
 | ||
|                 "total_operations": total_operations,
 | ||
|                 "duration": duration,
 | ||
|                 "ops_per_second": ops_per_second,
 | ||
|                 "avg_operation_time": duration / total_operations * 1000,  # 毫秒
 | ||
|                 "concurrent_threads": num_threads,
 | ||
|                 "errors": len([e for e in self.error_log if "负载测试错误" in e])
 | ||
|             }
 | ||
|             
 | ||
|             print(f"性能测试结果:")
 | ||
|             print(f"- 总操作数: {total_operations}")
 | ||
|             print(f"- 执行时间: {duration:.3f} 秒")
 | ||
|             print(f"- 操作速度: {ops_per_second:.1f} 操作/秒")
 | ||
|             print(f"- 平均操作时间: {self.performance_metrics['avg_operation_time']:.2f} 毫秒")
 | ||
|             print(f"- 并发线程: {num_threads}")
 | ||
|             print(f"- 错误数量: {self.performance_metrics['errors']}")
 | ||
|             
 | ||
|             # 性能验证
 | ||
|             assert ops_per_second > 100, "操作速度应该超过100操作/秒"
 | ||
|             assert self.performance_metrics['errors'] == 0, "不应该有错误"
 | ||
|             
 | ||
|             self.test_results["performance_under_load"] = True
 | ||
|             print("✅ 负载性能测试通过")
 | ||
|             return True
 | ||
|             
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 负载性能测试失败: {e}")
 | ||
|             self.error_log.append(f"负载性能测试错误: {e}")
 | ||
|             self.test_results["performance_under_load"] = False
 | ||
|             return False
 | ||
|     
 | ||
|     def test_data_consistency(self):
 | ||
|         """测试数据一致性"""
 | ||
|         print("\n🧪 测试数据一致性")
 | ||
|         print("-" * 40)
 | ||
|         
 | ||
|         try:
 | ||
|             # 为了确保数据一致性测试的准确性,创建新的flow_controller实例
 | ||
|             from jixia.debates.optimized_debate_flow import OptimizedDebateFlowController, FlowControlMode
 | ||
|             test_flow_controller = OptimizedDebateFlowController()
 | ||
|             
 | ||
|             # 模拟数据操作
 | ||
|             test_data = {
 | ||
|                 "speakers": ["吕洞宾", "何仙姑", "铁拐李"],
 | ||
|                 "speeches": [
 | ||
|                     "AI投资具有巨大潜力",
 | ||
|                     "但风险也不容忽视",
 | ||
|                     "我们需要平衡收益与风险"
 | ||
|                 ]
 | ||
|             }
 | ||
|             
 | ||
|             # 1. 保存流程控制器数据
 | ||
|             for i, (speaker, content) in enumerate(zip(test_data["speakers"], test_data["speeches"])):
 | ||
|                 test_flow_controller.record_speech(speaker, content)
 | ||
|                 print(f"记录发言 {i+1}: {speaker} - {content[:30]}...")
 | ||
|             
 | ||
|             print(f"当前debate_history长度: {len(test_flow_controller.debate_history)}")
 | ||
|             
 | ||
|             flow_data_file = "test_flow_consistency.json"
 | ||
|             test_flow_controller.save_flow_data(flow_data_file)
 | ||
|             
 | ||
|             # 2. 保存健康监控数据
 | ||
|             debate_data = {
 | ||
|                 "participants": test_data["speakers"],
 | ||
|                 "speeches": [
 | ||
|                     {"speaker": s, "content": c, "timestamp": datetime.now()}
 | ||
|                     for s, c in zip(test_data["speakers"], test_data["speeches"])
 | ||
|                 ],
 | ||
|                 "current_stage": "承",
 | ||
|                 "duration": timedelta(minutes=10)
 | ||
|             }
 | ||
|             self.health_monitor.update_metrics(debate_data)
 | ||
|             
 | ||
|             health_data_file = "test_health_consistency.json"
 | ||
|             self.health_monitor.save_monitoring_data(health_data_file)
 | ||
|             
 | ||
|             # 3. 验证数据文件
 | ||
|             assert os.path.exists(flow_data_file), "流程数据文件应该存在"
 | ||
|             assert os.path.exists(health_data_file), "健康数据文件应该存在"
 | ||
|             
 | ||
|             # 4. 读取并验证数据内容
 | ||
|             with open(flow_data_file, 'r', encoding='utf-8') as f:
 | ||
|                 flow_data = json.load(f)
 | ||
|             
 | ||
|             with open(health_data_file, 'r', encoding='utf-8') as f:
 | ||
|                 health_data = json.load(f)
 | ||
|             
 | ||
|             # 调试信息
 | ||
|             print(f"读取的flow_data中debate_history长度: {len(flow_data.get('debate_history', []))}")
 | ||
|             print(f"debate_history内容: {flow_data.get('debate_history', [])}")
 | ||
|             
 | ||
|             # 验证数据完整性
 | ||
|             actual_count = len(flow_data.get("debate_history", []))
 | ||
|             assert actual_count == 3, f"应该有3条发言记录,实际有{actual_count}条"
 | ||
|             assert "health_metrics" in health_data, "应该包含健康指标"
 | ||
|             assert "monitoring_config" in health_data, "应该包含监控配置"
 | ||
|             
 | ||
|             print(f"数据一致性验证:")
 | ||
|             print(f"- 流程数据记录: {len(flow_data['debate_history'])} 条")
 | ||
|             print(f"- 健康数据大小: {os.path.getsize(health_data_file)} 字节")
 | ||
|             print(f"- 流程数据大小: {os.path.getsize(flow_data_file)} 字节")
 | ||
|             
 | ||
|             # 清理测试文件
 | ||
|             os.remove(flow_data_file)
 | ||
|             os.remove(health_data_file)
 | ||
|             
 | ||
|             self.test_results["data_consistency"] = True
 | ||
|             print("✅ 数据一致性测试通过")
 | ||
|             return True
 | ||
|             
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 数据一致性测试失败: {e}")
 | ||
|             self.error_log.append(f"数据一致性错误: {e}")
 | ||
|             self.test_results["data_consistency"] = False
 | ||
|             return False
 | ||
|     
 | ||
|     def generate_comprehensive_report(self):
 | ||
|         """生成综合测试报告"""
 | ||
|         print("\n" + "=" * 60)
 | ||
|         print("📊 集夏v2.1.0 综合测试报告")
 | ||
|         print("=" * 60)
 | ||
|         
 | ||
|         # 测试结果统计
 | ||
|         total_tests = len(self.test_results)
 | ||
|         passed_tests = sum(1 for result in self.test_results.values() if result)
 | ||
|         failed_tests = total_tests - passed_tests
 | ||
|         pass_rate = (passed_tests / total_tests) * 100 if total_tests > 0 else 0
 | ||
|         
 | ||
|         print(f"\n🎯 测试结果统计:")
 | ||
|         print(f"- 总测试数: {total_tests}")
 | ||
|         print(f"- 通过测试: {passed_tests}")
 | ||
|         print(f"- 失败测试: {failed_tests}")
 | ||
|         print(f"- 通过率: {pass_rate:.1f}%")
 | ||
|         
 | ||
|         # 详细测试结果
 | ||
|         print(f"\n📋 详细测试结果:")
 | ||
|         for test_name, result in self.test_results.items():
 | ||
|             status = "✅ 通过" if result else "❌ 失败"
 | ||
|             print(f"- {test_name}: {status}")
 | ||
|         
 | ||
|         # 性能指标
 | ||
|         if self.performance_metrics:
 | ||
|             print(f"\n⚡ 性能指标:")
 | ||
|             for metric, value in self.performance_metrics.items():
 | ||
|                 if isinstance(value, float):
 | ||
|                     print(f"- {metric}: {value:.3f}")
 | ||
|                 else:
 | ||
|                     print(f"- {metric}: {value}")
 | ||
|         
 | ||
|         # 错误日志
 | ||
|         if self.error_log:
 | ||
|             print(f"\n🚨 错误日志 ({len(self.error_log)} 条):")
 | ||
|             for i, error in enumerate(self.error_log[:5], 1):  # 只显示前5条
 | ||
|                 print(f"- {i}. {error}")
 | ||
|             if len(self.error_log) > 5:
 | ||
|                 print(f"- ... 还有 {len(self.error_log) - 5} 条错误")
 | ||
|         
 | ||
|         # 系统状态
 | ||
|         print(f"\n🔧 系统状态:")
 | ||
|         try:
 | ||
|             flow_status = self.flow_controller.get_flow_status()
 | ||
|             health_status = self.health_monitor.get_health_status()
 | ||
|             
 | ||
|             print(f"- 流程控制器: 正常 (总发言: {flow_status.get('total_speeches', 0)})")
 | ||
|             print(f"- 健康监控: 正常 (状态: {health_status.value})")
 | ||
|             print(f"- 优先级算法: 正常")
 | ||
|             print(f"- 多群聊协调: 正常")
 | ||
|         except Exception as e:
 | ||
|             print(f"- 系统状态检查失败: {e}")
 | ||
|         
 | ||
|         # 总结
 | ||
|         print(f"\n🎉 测试总结:")
 | ||
|         if pass_rate >= 90:
 | ||
|             print("🟢 系统状态优秀!所有核心功能运行正常,可以发布v2.1.0版本。")
 | ||
|         elif pass_rate >= 70:
 | ||
|             print("🟡 系统状态良好,但有部分功能需要优化。建议修复后再发布。")
 | ||
|         else:
 | ||
|             print("🔴 系统存在重大问题,需要进行全面修复后才能发布。")
 | ||
|         
 | ||
|         return {
 | ||
|             "pass_rate": pass_rate,
 | ||
|             "total_tests": total_tests,
 | ||
|             "passed_tests": passed_tests,
 | ||
|             "failed_tests": failed_tests,
 | ||
|             "performance_metrics": self.performance_metrics,
 | ||
|             "error_count": len(self.error_log)
 | ||
|         }
 | ||
|     
 | ||
|     async def run_all_tests(self):
 | ||
|         """运行所有测试"""
 | ||
|         print("🚀 开始集夏v2.1.0综合功能测试")
 | ||
|         print("=" * 60)
 | ||
|         
 | ||
|         # 同步测试方法
 | ||
|         sync_test_methods = [
 | ||
|             self.test_priority_algorithm_integration,
 | ||
|             self.test_flow_controller_integration,
 | ||
|             self.test_health_monitor_integration,
 | ||
|             self.test_performance_under_load,
 | ||
|             self.test_data_consistency
 | ||
|         ]
 | ||
|         
 | ||
|         # 异步测试方法
 | ||
|         async_test_methods = [
 | ||
|             self.test_chat_coordinator_integration,
 | ||
|             self.test_cross_component_integration
 | ||
|         ]
 | ||
|         
 | ||
|         start_time = time.time()
 | ||
|         
 | ||
|         # 运行同步测试
 | ||
|         for test_method in sync_test_methods:
 | ||
|             try:
 | ||
|                 test_method()
 | ||
|             except Exception as e:
 | ||
|                 print(f"❌ 测试执行异常: {e}")
 | ||
|                 self.error_log.append(f"测试执行异常: {e}")
 | ||
|         
 | ||
|         # 运行异步测试
 | ||
|         for test_method in async_test_methods:
 | ||
|             try:
 | ||
|                 await test_method()
 | ||
|             except Exception as e:
 | ||
|                 print(f"❌ 测试执行异常: {e}")
 | ||
|                 self.error_log.append(f"测试执行异常: {e}")
 | ||
|         
 | ||
|         end_time = time.time()
 | ||
|         total_duration = end_time - start_time
 | ||
|         
 | ||
|         print(f"\n⏱️ 总测试时间: {total_duration:.3f} 秒")
 | ||
|         
 | ||
|         # 生成综合报告
 | ||
|         return self.generate_comprehensive_report()
 | ||
| 
 | ||
| async def main():
 | ||
|     """主函数"""
 | ||
|     tester = V2_1_IntegrationTester()
 | ||
|     report = await tester.run_all_tests()
 | ||
|     
 | ||
|     # 保存测试报告
 | ||
|     report_file = "v2_1_comprehensive_test_report.json"
 | ||
|     with open(report_file, 'w', encoding='utf-8') as f:
 | ||
|         json.dump({
 | ||
|             "timestamp": datetime.now().isoformat(),
 | ||
|             "version": "v2.1.0",
 | ||
|             "test_results": tester.test_results,
 | ||
|             "performance_metrics": tester.performance_metrics,
 | ||
|             "error_log": tester.error_log,
 | ||
|             "summary": report
 | ||
|         }, f, ensure_ascii=False, indent=2)
 | ||
|     
 | ||
|     print(f"\n📄 详细测试报告已保存到: {report_file}")
 | ||
|     
 | ||
|     return report["pass_rate"] >= 70  # 70%通过率作为发布标准
 | ||
| 
 | ||
| if __name__ == "__main__":
 | ||
|     success = asyncio.run(main())
 | ||
|     sys.exit(0 if success else 1) |