745 lines
32 KiB
Python
745 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
集夏v2.1.0 综合功能测试
|
||
验证所有新功能的集成效果和系统稳定性
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
import os
|
||
import time
|
||
import threading
|
||
import json
|
||
from datetime import datetime, timedelta
|
||
|
||
# 添加项目路径
|
||
sys.path.append('/home/ben/liurenchaxin/src')
|
||
|
||
# 导入所有核心模块
|
||
try:
|
||
from jixia.debates.enhanced_priority_algorithm import EnhancedPriorityAlgorithm
|
||
from jixia.debates.optimized_debate_flow import OptimizedDebateFlowController, FlowControlMode
|
||
from jixia.intervention.human_intervention_system import DebateHealthMonitor
|
||
from jixia.coordination.multi_chat_coordinator import MultiChatCoordinator
|
||
except ImportError as e:
|
||
print(f"❌ 模块导入失败: {e}")
|
||
print("请确保所有模块都已正确安装")
|
||
sys.exit(1)
|
||
|
||
class V2_1_IntegrationTester:
|
||
"""v2.1.0 集成测试器"""
|
||
|
||
def __init__(self):
|
||
self.test_results = {}
|
||
self.performance_metrics = {}
|
||
self.error_log = []
|
||
|
||
# 初始化各个组件
|
||
try:
|
||
self.priority_algorithm = EnhancedPriorityAlgorithm()
|
||
self.flow_controller = OptimizedDebateFlowController()
|
||
self.health_monitor = DebateHealthMonitor()
|
||
self.chat_coordinator = MultiChatCoordinator()
|
||
print("✅ 所有核心组件初始化成功")
|
||
except Exception as e:
|
||
print(f"❌ 组件初始化失败: {e}")
|
||
self.error_log.append(f"初始化错误: {e}")
|
||
|
||
def test_priority_algorithm_integration(self):
|
||
"""测试优先级算法集成"""
|
||
print("\n🧪 测试优先级算法集成")
|
||
print("-" * 40)
|
||
|
||
try:
|
||
# 模拟辩论场景
|
||
test_speeches = [
|
||
{
|
||
"speaker": "吕洞宾",
|
||
"content": "根据最新的市场数据分析,AI投资领域显示出强劲的增长潜力。我们应该抓住这个机会。",
|
||
"context": {"stage": "起", "topic": "AI投资", "recent_speakers": []}
|
||
},
|
||
{
|
||
"speaker": "何仙姑",
|
||
"content": "但是我们必须谨慎考虑风险因素!市场波动性很大。",
|
||
"context": {"stage": "承", "topic": "AI投资", "recent_speakers": ["吕洞宾"]}
|
||
},
|
||
{
|
||
"speaker": "铁拐李",
|
||
"content": "我同意吕洞宾的观点,技术发展确实迅速,但何仙姑提到的风险也值得重视。",
|
||
"context": {"stage": "转", "topic": "AI投资", "recent_speakers": ["吕洞宾", "何仙姑"]}
|
||
}
|
||
]
|
||
|
||
priorities = []
|
||
for speech in test_speeches:
|
||
analysis = self.priority_algorithm.analyze_speech(
|
||
speech["content"],
|
||
speech["speaker"],
|
||
speech["context"]
|
||
)
|
||
|
||
# 获取详细的分数分解
|
||
speaker = speech["speaker"]
|
||
context = speech["context"]
|
||
recent_speeches = test_speeches[:test_speeches.index(speech)]
|
||
|
||
profile = self.priority_algorithm._get_or_create_speaker_profile(speaker)
|
||
self.priority_algorithm._update_speaker_profile(profile, recent_speeches)
|
||
|
||
rebuttal_urgency = self.priority_algorithm._calculate_rebuttal_urgency(speaker, context, recent_speeches)
|
||
argument_strength = self.priority_algorithm._calculate_argument_strength(speaker, profile)
|
||
time_pressure = self.priority_algorithm._calculate_time_pressure(speaker, context)
|
||
audience_reaction = self.priority_algorithm._calculate_audience_reaction(speaker, context)
|
||
strategy_need = self.priority_algorithm._calculate_strategy_need(speaker, context, profile)
|
||
|
||
priority = self.priority_algorithm.calculate_priority(
|
||
speaker,
|
||
context,
|
||
recent_speeches
|
||
)
|
||
priorities.append((speaker, priority))
|
||
|
||
print(f"发言者: {speaker}")
|
||
print(f" 反驳紧急性: {rebuttal_urgency:.6f}")
|
||
print(f" 论证强度: {argument_strength:.6f}")
|
||
print(f" 时间压力: {time_pressure:.6f}")
|
||
print(f" 观众反应: {audience_reaction:.6f}")
|
||
print(f" 策略需求: {strategy_need:.6f}")
|
||
print(f" 最终优先级: {priority:.6f}")
|
||
print()
|
||
|
||
# 调试输出
|
||
print(f"所有优先级值: {[p[1] for p in priorities]}")
|
||
print(f"唯一优先级数量: {len(set(p[1] for p in priorities))}")
|
||
print(f"优先级差异: {max(p[1] for p in priorities) - min(p[1] for p in priorities)}")
|
||
|
||
# 验证优先级计算
|
||
assert all(0 <= p[1] <= 1 for p in priorities), "优先级应该在0-1之间"
|
||
assert len(set(p[1] for p in priorities)) > 1, "不同发言应该有不同优先级"
|
||
|
||
self.test_results["priority_algorithm_integration"] = True
|
||
print("✅ 优先级算法集成测试通过")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 优先级算法集成测试失败: {e}")
|
||
self.error_log.append(f"优先级算法集成错误: {e}")
|
||
self.test_results["priority_algorithm_integration"] = False
|
||
return False
|
||
|
||
def test_flow_controller_integration(self):
|
||
"""测试流程控制器集成"""
|
||
print("\n🧪 测试流程控制器集成")
|
||
print("-" * 40)
|
||
|
||
try:
|
||
# 测试与优先级算法的集成
|
||
initial_stage = self.flow_controller.current_stage
|
||
print(f"初始阶段: {initial_stage.value}")
|
||
|
||
# 模拟完整的辩论流程
|
||
test_sequence = [
|
||
("吕洞宾", "开场陈述:AI投资是未来发展的关键"),
|
||
("何仙姑", "反方观点:需要谨慎评估风险"),
|
||
("铁拐李", "补充论据:技术发展支持投资决策"),
|
||
("汉钟离", "风险分析:市场不确定性因素"),
|
||
("曹国舅", "综合观点:平衡收益与风险"),
|
||
("蓝采和", "实践经验:类似投资案例分析"),
|
||
("韩湘子", "未来展望:长期发展趋势"),
|
||
("张果老", "总结陈词:理性投资建议")
|
||
]
|
||
|
||
stage_transitions = 0
|
||
for speaker, content in test_sequence:
|
||
# 记录发言
|
||
self.flow_controller.record_speech(speaker, content)
|
||
|
||
# 检查是否需要推进阶段
|
||
if hasattr(self.flow_controller, '_should_advance_stage') and self.flow_controller._should_advance_stage():
|
||
old_stage = self.flow_controller.current_stage
|
||
if self.flow_controller.advance_stage():
|
||
stage_transitions += 1
|
||
print(f"阶段转换: {old_stage.value} -> {self.flow_controller.current_stage.value}")
|
||
|
||
# 获取流程状态
|
||
status = self.flow_controller.get_flow_status()
|
||
print(f"发言者: {speaker}, 当前阶段: {status['current_stage']}, 进度: {status['stage_progress']}")
|
||
|
||
# 验证流程控制
|
||
final_status = self.flow_controller.get_flow_status()
|
||
total_speeches = len(self.flow_controller.debate_history)
|
||
assert total_speeches == len(test_sequence), f"发言总数应该匹配,期望{len(test_sequence)},实际{total_speeches}"
|
||
assert stage_transitions > 0, "应该发生阶段转换"
|
||
|
||
self.test_results["flow_controller_integration"] = True
|
||
print(f"✅ 流程控制器集成测试通过,发生了 {stage_transitions} 次阶段转换")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 流程控制器集成测试失败: {e}")
|
||
self.error_log.append(f"流程控制器集成错误: {e}")
|
||
self.test_results["flow_controller_integration"] = False
|
||
return False
|
||
|
||
def test_health_monitor_integration(self):
|
||
"""测试健康监控集成"""
|
||
print("\n🧪 测试健康监控集成")
|
||
print("-" * 40)
|
||
|
||
try:
|
||
# 模拟辩论数据
|
||
debate_data = {
|
||
"participants": ["吕洞宾", "何仙姑", "铁拐李", "汉钟离"],
|
||
"speeches": [
|
||
{"speaker": "吕洞宾", "content": "我强烈支持这个提案", "timestamp": datetime.now()},
|
||
{"speaker": "何仙姑", "content": "我完全反对,这太危险了", "timestamp": datetime.now()},
|
||
{"speaker": "铁拐李", "content": "让我们理性分析一下", "timestamp": datetime.now()},
|
||
{"speaker": "汉钟离", "content": "数据显示情况复杂", "timestamp": datetime.now()}
|
||
],
|
||
"current_stage": "承",
|
||
"duration": timedelta(minutes=15)
|
||
}
|
||
|
||
# 更新健康监控
|
||
self.health_monitor.update_metrics(debate_data)
|
||
|
||
# 检查健康状态
|
||
health_status = self.health_monitor.get_health_status()
|
||
health_report = self.health_monitor.get_health_report()
|
||
print(f"健康状态: {health_status.value}")
|
||
print(f"整体分数: {health_report['overall_score']:.1f}")
|
||
print(f"监控指标数量: {len(health_report['metrics'])}")
|
||
print(f"活跃警报: {health_report['active_alerts']}个")
|
||
|
||
# 模拟问题场景
|
||
problematic_data = {
|
||
"participants": ["吕洞宾", "何仙姑"],
|
||
"speeches": [
|
||
{"speaker": "吕洞宾", "content": "你们都是白痴!", "timestamp": datetime.now()},
|
||
{"speaker": "吕洞宾", "content": "我说了算!", "timestamp": datetime.now()},
|
||
{"speaker": "吕洞宾", "content": "闭嘴!", "timestamp": datetime.now()}
|
||
],
|
||
"current_stage": "转",
|
||
"duration": timedelta(minutes=30)
|
||
}
|
||
|
||
self.health_monitor.update_metrics(problematic_data)
|
||
|
||
# 检查是否触发警报
|
||
alerts = self.health_monitor.active_alerts
|
||
print(f"活跃警报数量: {len(alerts)}")
|
||
|
||
# 验证监控功能
|
||
assert health_status is not None, "应该有健康状态"
|
||
assert isinstance(health_status, type(health_status)), "健康状态应该是HealthStatus枚举"
|
||
|
||
self.test_results["health_monitor_integration"] = True
|
||
print("✅ 健康监控集成测试通过")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 健康监控集成测试失败: {e}")
|
||
self.error_log.append(f"健康监控集成错误: {e}")
|
||
self.test_results["health_monitor_integration"] = False
|
||
return False
|
||
|
||
async def test_chat_coordinator_integration(self):
|
||
"""测试多群聊协调集成"""
|
||
print("\n🧪 测试多群聊协调集成")
|
||
print("-" * 40)
|
||
|
||
try:
|
||
# 模拟多群聊场景
|
||
main_chat_message = {
|
||
"chat_id": "main_debate",
|
||
"speaker": "吕洞宾",
|
||
"content": "我认为我们应该投资AI技术",
|
||
"timestamp": datetime.now()
|
||
}
|
||
|
||
# 处理主群聊消息
|
||
await self.chat_coordinator.handle_message(main_chat_message)
|
||
|
||
# 模拟策略讨论
|
||
strategy_message = {
|
||
"chat_id": "strategy_positive",
|
||
"speaker": "铁拐李",
|
||
"content": "我们需要准备更多技术数据来支持论点",
|
||
"timestamp": datetime.now()
|
||
}
|
||
|
||
await self.chat_coordinator.handle_message(strategy_message)
|
||
|
||
# 检查消息路由
|
||
routing_status = self.chat_coordinator.get_routing_status()
|
||
print(f"路由状态: {routing_status}")
|
||
|
||
# 模拟协调决策
|
||
coordination_result = await self.chat_coordinator.coordinate_response(
|
||
main_chat_message,
|
||
context={"stage": "承", "topic": "AI投资"}
|
||
)
|
||
|
||
print(f"协调结果: {coordination_result}")
|
||
|
||
# 验证协调功能
|
||
assert coordination_result is not None, "应该有协调结果"
|
||
|
||
self.test_results["chat_coordinator_integration"] = True
|
||
print("✅ 多群聊协调集成测试通过")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 多群聊协调集成测试失败: {e}")
|
||
self.error_log.append(f"多群聊协调集成错误: {e}")
|
||
self.test_results["chat_coordinator_integration"] = False
|
||
return False
|
||
|
||
async def test_cross_component_integration(self):
|
||
"""测试跨组件集成"""
|
||
print("\n🧪 测试跨组件集成")
|
||
print("-" * 40)
|
||
|
||
try:
|
||
# 清空之前的发言历史
|
||
self.flow_controller.debate_history.clear()
|
||
|
||
# 模拟完整的辩论流程
|
||
debate_scenario = {
|
||
"topic": "人工智能投资策略",
|
||
"participants": ["吕洞宾", "何仙姑", "铁拐李", "汉钟离"],
|
||
"duration": 30 # 分钟
|
||
}
|
||
|
||
print(f"开始辩论: {debate_scenario['topic']}")
|
||
|
||
# 1. 流程控制器管理发言顺序
|
||
speakers_sequence = []
|
||
for i in range(8): # 模拟8轮发言
|
||
speaker = self.flow_controller.get_current_speaker()
|
||
speakers_sequence.append(speaker)
|
||
|
||
# 2. 生成发言内容(简化)
|
||
content = f"这是{speaker}在第{i+1}轮的发言,关于{debate_scenario['topic']}"
|
||
|
||
# 3. 优先级算法分析发言
|
||
context = {
|
||
"stage": self.flow_controller.current_stage.value,
|
||
"topic": debate_scenario['topic'],
|
||
"recent_speakers": speakers_sequence[-3:]
|
||
}
|
||
|
||
analysis = self.priority_algorithm.analyze_speech(content, speaker, context)
|
||
|
||
# 构建正确格式的recent_speeches
|
||
recent_speeches = []
|
||
for j, prev_speaker in enumerate(speakers_sequence):
|
||
recent_speeches.append({
|
||
"speaker": prev_speaker,
|
||
"content": f"这是{prev_speaker}在第{j+1}轮的发言",
|
||
"timestamp": datetime.now().isoformat(),
|
||
"team": "positive" if "正" in prev_speaker else "negative"
|
||
})
|
||
|
||
priority = self.priority_algorithm.calculate_priority(speaker, context, recent_speeches)
|
||
|
||
# 4. 记录发言到流程控制器
|
||
self.flow_controller.record_speech(speaker, content)
|
||
|
||
# 5. 更新健康监控
|
||
debate_data = {
|
||
"participants": debate_scenario['participants'],
|
||
"speeches": [{"speaker": speaker, "content": content, "timestamp": datetime.now()}],
|
||
"current_stage": self.flow_controller.current_stage.value,
|
||
"duration": timedelta(minutes=i*2)
|
||
}
|
||
self.health_monitor.update_metrics(debate_data)
|
||
|
||
# 6. 多群聊协调处理
|
||
message = {
|
||
"chat_id": "main_debate",
|
||
"speaker": speaker,
|
||
"content": content,
|
||
"timestamp": datetime.now()
|
||
}
|
||
# 异步调用
|
||
try:
|
||
await self.chat_coordinator.handle_message(message)
|
||
except Exception as e:
|
||
print(f"警告: 消息处理失败: {e}")
|
||
|
||
print(f"第{i+1}轮 - 发言者: {speaker}, 优先级: {priority:.3f}, 阶段: {context['stage']}")
|
||
|
||
# 验证集成效果
|
||
print("\n开始获取各组件状态...")
|
||
|
||
try:
|
||
flow_status = self.flow_controller.get_flow_status()
|
||
print(f"✅ 流程状态获取成功: {type(flow_status)}")
|
||
except Exception as e:
|
||
print(f"❌ 流程状态获取失败: {e}")
|
||
raise
|
||
|
||
try:
|
||
health_status = self.health_monitor.get_health_status()
|
||
print(f"✅ 健康状态获取成功: {type(health_status)}")
|
||
except Exception as e:
|
||
print(f"❌ 健康状态获取失败: {e}")
|
||
raise
|
||
|
||
try:
|
||
routing_status = self.chat_coordinator.get_routing_status()
|
||
print(f"✅ 路由状态获取成功: {type(routing_status)}, 值: {routing_status}")
|
||
except Exception as e:
|
||
print(f"❌ 路由状态获取失败: {e}")
|
||
raise
|
||
|
||
print(f"\n集成测试结果:")
|
||
print(f"- 总发言数: {len(self.flow_controller.debate_history)}")
|
||
print(f"- 当前阶段: {flow_status['current_stage']}")
|
||
print(f"- 健康状态: {health_status.value}")
|
||
|
||
# 安全地访问routing_status
|
||
if isinstance(routing_status, dict):
|
||
print(f"- 活跃路由数: {routing_status.get('active_routes', 0)}")
|
||
print(f"- 消息队列大小: {routing_status.get('message_queue_size', 0)}")
|
||
print(f"- 总群聊数: {routing_status.get('total_rooms', 0)}")
|
||
else:
|
||
print(f"- 路由状态: {routing_status}")
|
||
print(f"- 路由状态类型: {type(routing_status)}")
|
||
|
||
# 验证所有组件都正常工作
|
||
total_speeches = len(self.flow_controller.debate_history)
|
||
assert total_speeches == 8, f"应该记录8次发言,实际{total_speeches}次"
|
||
assert health_status is not None, "应该有健康状态"
|
||
assert len(speakers_sequence) == 8, "应该有8个发言者记录"
|
||
|
||
self.test_results["cross_component_integration"] = True
|
||
print("✅ 跨组件集成测试通过")
|
||
return True
|
||
|
||
except Exception as e:
|
||
import traceback
|
||
print(f"❌ 跨组件集成测试失败: {e}")
|
||
print(f"详细错误信息:")
|
||
traceback.print_exc()
|
||
self.error_log.append(f"跨组件集成错误: {e}")
|
||
self.test_results["cross_component_integration"] = False
|
||
return False
|
||
|
||
def test_performance_under_load(self):
|
||
"""测试负载下的性能"""
|
||
print("\n🧪 测试负载下的性能")
|
||
print("-" * 40)
|
||
|
||
try:
|
||
# 性能测试参数
|
||
num_speeches = 100
|
||
num_threads = 5
|
||
|
||
def simulate_debate_load():
|
||
"""模拟辩论负载"""
|
||
thread_name = threading.current_thread().name
|
||
for i in range(num_speeches // num_threads):
|
||
try:
|
||
# 模拟发言处理
|
||
speaker = f"Speaker-{thread_name}-{i}"
|
||
content = f"这是来自{speaker}的测试发言 {i}"
|
||
|
||
# 优先级计算
|
||
context = {"stage": "承", "topic": "性能测试", "recent_speakers": []}
|
||
analysis = self.priority_algorithm.analyze_speech(content, speaker, context)
|
||
priority = self.priority_algorithm.calculate_priority(speaker, context, [])
|
||
|
||
# 流程记录
|
||
self.flow_controller.record_speech(speaker, content)
|
||
|
||
# 健康监控
|
||
debate_data = {
|
||
"participants": [speaker],
|
||
"speeches": [{"speaker": speaker, "content": content, "timestamp": datetime.now()}],
|
||
"current_stage": "承",
|
||
"duration": timedelta(seconds=i)
|
||
}
|
||
self.health_monitor.update_metrics(debate_data)
|
||
|
||
except Exception as e:
|
||
self.error_log.append(f"负载测试错误 {thread_name}-{i}: {e}")
|
||
|
||
# 开始性能测试
|
||
start_time = time.time()
|
||
|
||
threads = []
|
||
for i in range(num_threads):
|
||
thread = threading.Thread(target=simulate_debate_load, name=f"LoadTest-{i}")
|
||
threads.append(thread)
|
||
thread.start()
|
||
|
||
for thread in threads:
|
||
thread.join()
|
||
|
||
end_time = time.time()
|
||
duration = end_time - start_time
|
||
|
||
# 计算性能指标
|
||
total_operations = num_speeches * 4 # 每次发言包含4个操作
|
||
ops_per_second = total_operations / duration
|
||
|
||
self.performance_metrics = {
|
||
"total_operations": total_operations,
|
||
"duration": duration,
|
||
"ops_per_second": ops_per_second,
|
||
"avg_operation_time": duration / total_operations * 1000, # 毫秒
|
||
"concurrent_threads": num_threads,
|
||
"errors": len([e for e in self.error_log if "负载测试错误" in e])
|
||
}
|
||
|
||
print(f"性能测试结果:")
|
||
print(f"- 总操作数: {total_operations}")
|
||
print(f"- 执行时间: {duration:.3f} 秒")
|
||
print(f"- 操作速度: {ops_per_second:.1f} 操作/秒")
|
||
print(f"- 平均操作时间: {self.performance_metrics['avg_operation_time']:.2f} 毫秒")
|
||
print(f"- 并发线程: {num_threads}")
|
||
print(f"- 错误数量: {self.performance_metrics['errors']}")
|
||
|
||
# 性能验证
|
||
assert ops_per_second > 100, "操作速度应该超过100操作/秒"
|
||
assert self.performance_metrics['errors'] == 0, "不应该有错误"
|
||
|
||
self.test_results["performance_under_load"] = True
|
||
print("✅ 负载性能测试通过")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 负载性能测试失败: {e}")
|
||
self.error_log.append(f"负载性能测试错误: {e}")
|
||
self.test_results["performance_under_load"] = False
|
||
return False
|
||
|
||
def test_data_consistency(self):
|
||
"""测试数据一致性"""
|
||
print("\n🧪 测试数据一致性")
|
||
print("-" * 40)
|
||
|
||
try:
|
||
# 为了确保数据一致性测试的准确性,创建新的flow_controller实例
|
||
from jixia.debates.optimized_debate_flow import OptimizedDebateFlowController, FlowControlMode
|
||
test_flow_controller = OptimizedDebateFlowController()
|
||
|
||
# 模拟数据操作
|
||
test_data = {
|
||
"speakers": ["吕洞宾", "何仙姑", "铁拐李"],
|
||
"speeches": [
|
||
"AI投资具有巨大潜力",
|
||
"但风险也不容忽视",
|
||
"我们需要平衡收益与风险"
|
||
]
|
||
}
|
||
|
||
# 1. 保存流程控制器数据
|
||
for i, (speaker, content) in enumerate(zip(test_data["speakers"], test_data["speeches"])):
|
||
test_flow_controller.record_speech(speaker, content)
|
||
print(f"记录发言 {i+1}: {speaker} - {content[:30]}...")
|
||
|
||
print(f"当前debate_history长度: {len(test_flow_controller.debate_history)}")
|
||
|
||
flow_data_file = "test_flow_consistency.json"
|
||
test_flow_controller.save_flow_data(flow_data_file)
|
||
|
||
# 2. 保存健康监控数据
|
||
debate_data = {
|
||
"participants": test_data["speakers"],
|
||
"speeches": [
|
||
{"speaker": s, "content": c, "timestamp": datetime.now()}
|
||
for s, c in zip(test_data["speakers"], test_data["speeches"])
|
||
],
|
||
"current_stage": "承",
|
||
"duration": timedelta(minutes=10)
|
||
}
|
||
self.health_monitor.update_metrics(debate_data)
|
||
|
||
health_data_file = "test_health_consistency.json"
|
||
self.health_monitor.save_monitoring_data(health_data_file)
|
||
|
||
# 3. 验证数据文件
|
||
assert os.path.exists(flow_data_file), "流程数据文件应该存在"
|
||
assert os.path.exists(health_data_file), "健康数据文件应该存在"
|
||
|
||
# 4. 读取并验证数据内容
|
||
with open(flow_data_file, 'r', encoding='utf-8') as f:
|
||
flow_data = json.load(f)
|
||
|
||
with open(health_data_file, 'r', encoding='utf-8') as f:
|
||
health_data = json.load(f)
|
||
|
||
# 调试信息
|
||
print(f"读取的flow_data中debate_history长度: {len(flow_data.get('debate_history', []))}")
|
||
print(f"debate_history内容: {flow_data.get('debate_history', [])}")
|
||
|
||
# 验证数据完整性
|
||
actual_count = len(flow_data.get("debate_history", []))
|
||
assert actual_count == 3, f"应该有3条发言记录,实际有{actual_count}条"
|
||
assert "health_metrics" in health_data, "应该包含健康指标"
|
||
assert "monitoring_config" in health_data, "应该包含监控配置"
|
||
|
||
print(f"数据一致性验证:")
|
||
print(f"- 流程数据记录: {len(flow_data['debate_history'])} 条")
|
||
print(f"- 健康数据大小: {os.path.getsize(health_data_file)} 字节")
|
||
print(f"- 流程数据大小: {os.path.getsize(flow_data_file)} 字节")
|
||
|
||
# 清理测试文件
|
||
os.remove(flow_data_file)
|
||
os.remove(health_data_file)
|
||
|
||
self.test_results["data_consistency"] = True
|
||
print("✅ 数据一致性测试通过")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 数据一致性测试失败: {e}")
|
||
self.error_log.append(f"数据一致性错误: {e}")
|
||
self.test_results["data_consistency"] = False
|
||
return False
|
||
|
||
def generate_comprehensive_report(self):
|
||
"""生成综合测试报告"""
|
||
print("\n" + "=" * 60)
|
||
print("📊 集夏v2.1.0 综合测试报告")
|
||
print("=" * 60)
|
||
|
||
# 测试结果统计
|
||
total_tests = len(self.test_results)
|
||
passed_tests = sum(1 for result in self.test_results.values() if result)
|
||
failed_tests = total_tests - passed_tests
|
||
pass_rate = (passed_tests / total_tests) * 100 if total_tests > 0 else 0
|
||
|
||
print(f"\n🎯 测试结果统计:")
|
||
print(f"- 总测试数: {total_tests}")
|
||
print(f"- 通过测试: {passed_tests}")
|
||
print(f"- 失败测试: {failed_tests}")
|
||
print(f"- 通过率: {pass_rate:.1f}%")
|
||
|
||
# 详细测试结果
|
||
print(f"\n📋 详细测试结果:")
|
||
for test_name, result in self.test_results.items():
|
||
status = "✅ 通过" if result else "❌ 失败"
|
||
print(f"- {test_name}: {status}")
|
||
|
||
# 性能指标
|
||
if self.performance_metrics:
|
||
print(f"\n⚡ 性能指标:")
|
||
for metric, value in self.performance_metrics.items():
|
||
if isinstance(value, float):
|
||
print(f"- {metric}: {value:.3f}")
|
||
else:
|
||
print(f"- {metric}: {value}")
|
||
|
||
# 错误日志
|
||
if self.error_log:
|
||
print(f"\n🚨 错误日志 ({len(self.error_log)} 条):")
|
||
for i, error in enumerate(self.error_log[:5], 1): # 只显示前5条
|
||
print(f"- {i}. {error}")
|
||
if len(self.error_log) > 5:
|
||
print(f"- ... 还有 {len(self.error_log) - 5} 条错误")
|
||
|
||
# 系统状态
|
||
print(f"\n🔧 系统状态:")
|
||
try:
|
||
flow_status = self.flow_controller.get_flow_status()
|
||
health_status = self.health_monitor.get_health_status()
|
||
|
||
print(f"- 流程控制器: 正常 (总发言: {flow_status.get('total_speeches', 0)})")
|
||
print(f"- 健康监控: 正常 (状态: {health_status.value})")
|
||
print(f"- 优先级算法: 正常")
|
||
print(f"- 多群聊协调: 正常")
|
||
except Exception as e:
|
||
print(f"- 系统状态检查失败: {e}")
|
||
|
||
# 总结
|
||
print(f"\n🎉 测试总结:")
|
||
if pass_rate >= 90:
|
||
print("🟢 系统状态优秀!所有核心功能运行正常,可以发布v2.1.0版本。")
|
||
elif pass_rate >= 70:
|
||
print("🟡 系统状态良好,但有部分功能需要优化。建议修复后再发布。")
|
||
else:
|
||
print("🔴 系统存在重大问题,需要进行全面修复后才能发布。")
|
||
|
||
return {
|
||
"pass_rate": pass_rate,
|
||
"total_tests": total_tests,
|
||
"passed_tests": passed_tests,
|
||
"failed_tests": failed_tests,
|
||
"performance_metrics": self.performance_metrics,
|
||
"error_count": len(self.error_log)
|
||
}
|
||
|
||
async def run_all_tests(self):
|
||
"""运行所有测试"""
|
||
print("🚀 开始集夏v2.1.0综合功能测试")
|
||
print("=" * 60)
|
||
|
||
# 同步测试方法
|
||
sync_test_methods = [
|
||
self.test_priority_algorithm_integration,
|
||
self.test_flow_controller_integration,
|
||
self.test_health_monitor_integration,
|
||
self.test_performance_under_load,
|
||
self.test_data_consistency
|
||
]
|
||
|
||
# 异步测试方法
|
||
async_test_methods = [
|
||
self.test_chat_coordinator_integration,
|
||
self.test_cross_component_integration
|
||
]
|
||
|
||
start_time = time.time()
|
||
|
||
# 运行同步测试
|
||
for test_method in sync_test_methods:
|
||
try:
|
||
test_method()
|
||
except Exception as e:
|
||
print(f"❌ 测试执行异常: {e}")
|
||
self.error_log.append(f"测试执行异常: {e}")
|
||
|
||
# 运行异步测试
|
||
for test_method in async_test_methods:
|
||
try:
|
||
await test_method()
|
||
except Exception as e:
|
||
print(f"❌ 测试执行异常: {e}")
|
||
self.error_log.append(f"测试执行异常: {e}")
|
||
|
||
end_time = time.time()
|
||
total_duration = end_time - start_time
|
||
|
||
print(f"\n⏱️ 总测试时间: {total_duration:.3f} 秒")
|
||
|
||
# 生成综合报告
|
||
return self.generate_comprehensive_report()
|
||
|
||
async def main():
|
||
"""主函数"""
|
||
tester = V2_1_IntegrationTester()
|
||
report = await tester.run_all_tests()
|
||
|
||
# 保存测试报告
|
||
report_file = "v2_1_comprehensive_test_report.json"
|
||
with open(report_file, 'w', encoding='utf-8') as f:
|
||
json.dump({
|
||
"timestamp": datetime.now().isoformat(),
|
||
"version": "v2.1.0",
|
||
"test_results": tester.test_results,
|
||
"performance_metrics": tester.performance_metrics,
|
||
"error_log": tester.error_log,
|
||
"summary": report
|
||
}, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"\n📄 详细测试报告已保存到: {report_file}")
|
||
|
||
return report["pass_rate"] >= 70 # 70%通过率作为发布标准
|
||
|
||
if __name__ == "__main__":
|
||
success = asyncio.run(main())
|
||
sys.exit(0 if success else 1) |