liurenchaxin/tests/test_human_intervention.py

505 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Human干预系统测试脚本
"""
import asyncio
import json
import time
from datetime import datetime, timedelta
from src.jixia.intervention.human_intervention_system import (
DebateHealthMonitor, HealthStatus, InterventionLevel, AlertType
)
class TestHumanInterventionSystem:
"""Human干预系统测试类"""
def __init__(self):
self.monitor = DebateHealthMonitor()
self.test_results = []
# 添加事件处理器用于测试
self.monitor.add_event_handler("alert_created", self._handle_alert_created)
self.monitor.add_event_handler("intervention_executed", self._handle_intervention_executed)
self.monitor.add_event_handler("human_notification", self._handle_human_notification)
self.received_alerts = []
self.received_interventions = []
self.received_notifications = []
async def _handle_alert_created(self, alert):
"""处理警报创建事件"""
self.received_alerts.append(alert)
print(f"🚨 收到警报: {alert.alert_type.value} - {alert.message}")
async def _handle_intervention_executed(self, action):
"""处理干预执行事件"""
self.received_interventions.append(action)
print(f"🛠️ 执行干预: {action.action_type} - {action.description}")
async def _handle_human_notification(self, notification):
"""处理Human通知事件"""
self.received_notifications.append(notification)
print(f"👤 Human通知: {notification['message']}")
async def test_basic_health_monitoring(self):
"""测试基本健康监控功能"""
print("\n🧪 测试基本健康监控功能...")
# 正常辩论数据
normal_debate_data = {
"recent_messages": [
{"sender": "正1", "content": "我认为人工智能投资具有巨大潜力因为技术发展迅速市场需求不断增长。首先AI技术在各行各业都有广泛应用前景。"},
{"sender": "反1", "content": "虽然AI投资有潜力但我们也要考虑风险。技术泡沫、监管不确定性等因素都可能影响投资回报。"},
{"sender": "正2", "content": "反方提到的风险确实存在,但是通过合理的投资策略和风险管理,我们可以最大化收益同时控制风险。"},
{"sender": "反2", "content": "正方的观点有道理,不过我想补充一点:投资时机也很重要,现在可能不是最佳入场时机。"}
],
"topic_keywords": ["人工智能", "AI", "投资", "风险", "收益", "技术", "市场"],
"system_status": {
"error_rate": 0.01,
"avg_response_time": 1.2,
"system_load": 0.5
}
}
score, status = await self.monitor.analyze_debate_health(normal_debate_data)
success = score >= 70 and status in [HealthStatus.EXCELLENT, HealthStatus.GOOD]
self.test_results.append(("基本健康监控", success, f"得分: {score:.1f}, 状态: {status.value}"))
print(f"✅ 正常辩论健康度: {score:.1f}分 ({status.value})")
return success
async def test_quality_decline_detection(self):
"""测试质量下降检测"""
print("\n🧪 测试质量下降检测...")
# 低质量辩论数据
low_quality_data = {
"recent_messages": [
{"sender": "正1", "content": ""},
{"sender": "反1", "content": "不好"},
{"sender": "正2", "content": "是的"},
{"sender": "反2", "content": "不是"},
{"sender": "正1", "content": ""},
{"sender": "反1", "content": ""},
],
"topic_keywords": ["人工智能", "AI", "投资"],
"system_status": {
"error_rate": 0.01,
"avg_response_time": 1.0,
"system_load": 0.4
}
}
initial_alert_count = len(self.received_alerts)
score, status = await self.monitor.analyze_debate_health(low_quality_data)
# 检查是否触发了质量相关警报
quality_alerts = [alert for alert in self.received_alerts[initial_alert_count:]
if alert.alert_type == AlertType.QUALITY_DECLINE]
success = len(quality_alerts) > 0 and score < 50
self.test_results.append(("质量下降检测", success, f"得分: {score:.1f}, 警报数: {len(quality_alerts)}"))
print(f"✅ 低质量辩论检测: {score:.1f}分, 触发警报: {len(quality_alerts)}")
return success
async def test_toxic_behavior_detection(self):
"""测试有害行为检测"""
print("\n🧪 测试有害行为检测...")
# 包含有害行为的数据
toxic_data = {
"recent_messages": [
{"sender": "正1", "content": "我认为这个观点是正确的,有充分的理由支持。"},
{"sender": "反1", "content": "你这个观点太愚蠢了!完全没有逻辑!"},
{"sender": "正2", "content": "请保持理性讨论,不要进行人身攻击。"},
{"sender": "反2", "content": "闭嘴!你们这些白痴根本不懂!"},
{"sender": "正1", "content": "让我们回到正题,理性分析这个问题。"}
],
"topic_keywords": ["观点", "逻辑", "分析"],
"system_status": {
"error_rate": 0.02,
"avg_response_time": 1.5,
"system_load": 0.6
}
}
initial_alert_count = len(self.received_alerts)
score, status = await self.monitor.analyze_debate_health(toxic_data)
# 检查是否触发了有害行为警报
toxic_alerts = [alert for alert in self.received_alerts[initial_alert_count:]
if alert.alert_type == AlertType.TOXIC_BEHAVIOR]
success = len(toxic_alerts) > 0
self.test_results.append(("有害行为检测", success, f"警报数: {len(toxic_alerts)}, 文明度分数: {self.monitor.health_metrics['interaction_civility'].value:.1f}"))
print(f"✅ 有害行为检测: 触发警报: {len(toxic_alerts)}")
return success
async def test_emotional_escalation_detection(self):
"""测试情绪升级检测"""
print("\n🧪 测试情绪升级检测...")
# 情绪激动的数据
emotional_data = {
"recent_messages": [
{"sender": "正1", "content": "我强烈反对这个观点!!!"},
{"sender": "反1", "content": "你们完全错了!!!这太愤怒了!!!"},
{"sender": "正2", "content": "我非常生气!!!这个讨论让我很讨厌!!!"},
{"sender": "反2", "content": "大家都冷静一下!!!不要这么激动!!!"}
],
"topic_keywords": ["观点", "讨论"],
"system_status": {
"error_rate": 0.01,
"avg_response_time": 1.0,
"system_load": 0.5
}
}
initial_alert_count = len(self.received_alerts)
score, status = await self.monitor.analyze_debate_health(emotional_data)
# 检查是否触发了情绪升级警报
emotion_alerts = [alert for alert in self.received_alerts[initial_alert_count:]
if alert.alert_type == AlertType.EMOTIONAL_ESCALATION]
success = len(emotion_alerts) > 0
self.test_results.append(("情绪升级检测", success, f"警报数: {len(emotion_alerts)}, 情绪稳定性: {self.monitor.health_metrics['emotional_stability'].value:.1f}"))
print(f"✅ 情绪升级检测: 触发警报: {len(emotion_alerts)}")
return success
async def test_participation_imbalance_detection(self):
"""测试参与不平衡检测"""
print("\n🧪 测试参与不平衡检测...")
# 参与不平衡的数据
imbalanced_data = {
"recent_messages": [
{"sender": "正1", "content": "我有很多观点要分享..."},
{"sender": "正1", "content": "首先,我认为..."},
{"sender": "正1", "content": "其次,我们应该..."},
{"sender": "正1", "content": "最后,我建议..."},
{"sender": "正1", "content": "总结一下..."},
{"sender": "正1", "content": "补充一点..."},
{"sender": "正1", "content": "再说一遍..."},
{"sender": "反1", "content": "好的"}
],
"topic_keywords": ["观点", "建议"],
"system_status": {
"error_rate": 0.01,
"avg_response_time": 1.0,
"system_load": 0.5
}
}
initial_alert_count = len(self.received_alerts)
score, status = await self.monitor.analyze_debate_health(imbalanced_data)
# 检查是否触发了参与不平衡警报
balance_alerts = [alert for alert in self.received_alerts[initial_alert_count:]
if alert.alert_type == AlertType.PARTICIPATION_IMBALANCE]
success = len(balance_alerts) > 0
self.test_results.append(("参与不平衡检测", success, f"警报数: {len(balance_alerts)}, 平衡度: {self.monitor.health_metrics['participation_balance'].value:.1f}"))
print(f"✅ 参与不平衡检测: 触发警报: {len(balance_alerts)}")
return success
async def test_auto_intervention(self):
"""测试自动干预功能"""
print("\n🧪 测试自动干预功能...")
# 触发多种问题的数据
problematic_data = {
"recent_messages": [
{"sender": "正1", "content": "你们都是白痴!!!"},
{"sender": "反1", "content": "愚蠢!!!"},
{"sender": "正2", "content": "垃圾观点!!!"},
{"sender": "反2", "content": "讨厌!!!"}
],
"topic_keywords": ["观点"],
"system_status": {
"error_rate": 0.05,
"avg_response_time": 3.0,
"system_load": 0.9
}
}
initial_intervention_count = len(self.received_interventions)
score, status = await self.monitor.analyze_debate_health(problematic_data)
# 检查是否执行了自动干预
new_interventions = self.received_interventions[initial_intervention_count:]
success = len(new_interventions) > 0
self.test_results.append(("自动干预功能", success, f"执行干预: {len(new_interventions)}"))
print(f"✅ 自动干预: 执行了 {len(new_interventions)} 次干预")
for intervention in new_interventions:
print(f" - {intervention.action_type}: {intervention.description}")
return success
async def test_human_notification(self):
"""测试Human通知功能"""
print("\n🧪 测试Human通知功能...")
# 设置较低的通知阈值以便测试
original_threshold = self.monitor.monitoring_config["human_notification_threshold"]
self.monitor.monitoring_config["human_notification_threshold"] = InterventionLevel.MODERATE_GUIDANCE
# 严重问题数据
critical_data = {
"recent_messages": [
{"sender": "正1", "content": ""},
{"sender": "反1", "content": ""},
{"sender": "正2", "content": ""},
{"sender": "反2", "content": ""}
],
"topic_keywords": ["重要话题"],
"system_status": {
"error_rate": 0.1,
"avg_response_time": 5.0,
"system_load": 0.95
}
}
initial_notification_count = len(self.received_notifications)
score, status = await self.monitor.analyze_debate_health(critical_data)
# 恢复原始阈值
self.monitor.monitoring_config["human_notification_threshold"] = original_threshold
# 检查是否发送了Human通知
new_notifications = self.received_notifications[initial_notification_count:]
success = len(new_notifications) > 0
self.test_results.append(("Human通知功能", success, f"发送通知: {len(new_notifications)}"))
print(f"✅ Human通知: 发送了 {len(new_notifications)} 次通知")
return success
async def test_health_report_generation(self):
"""测试健康报告生成"""
print("\n🧪 测试健康报告生成...")
report = self.monitor.get_health_report()
required_fields = ["overall_score", "health_status", "metrics", "active_alerts",
"recent_interventions", "monitoring_enabled", "last_check"]
success = all(field in report for field in required_fields)
success = success and len(report["metrics"]) == 6 # 6个健康指标
self.test_results.append(("健康报告生成", success, f"包含字段: {len(report)}"))
print(f"✅ 健康报告生成: 包含 {len(report)} 个字段")
print(f" 整体得分: {report['overall_score']}")
print(f" 健康状态: {report['health_status']}")
print(f" 活跃警报: {report['active_alerts']}")
return success
async def test_alert_resolution(self):
"""测试警报解决功能"""
print("\n🧪 测试警报解决功能...")
# 确保有一些警报
if not self.monitor.active_alerts:
# 创建一个测试警报
from src.jixia.intervention.human_intervention_system import InterventionAlert
test_alert = InterventionAlert(
id="test_alert_123",
alert_type=AlertType.QUALITY_DECLINE,
severity=InterventionLevel.GENTLE_REMINDER,
message="测试警报",
affected_participants=[],
metrics={"test": 50},
timestamp=datetime.now()
)
self.monitor.active_alerts.append(test_alert)
# 解决第一个警报
if self.monitor.active_alerts:
alert_id = self.monitor.active_alerts[0].id
success = self.monitor.resolve_alert(alert_id, "测试解决")
# 清理已解决的警报
initial_count = len(self.monitor.active_alerts)
self.monitor.clear_resolved_alerts()
final_count = len(self.monitor.active_alerts)
success = success and (final_count < initial_count)
else:
success = True # 没有警报也算成功
self.test_results.append(("警报解决功能", success, f"解决并清理警报"))
print(f"✅ 警报解决: 功能正常")
return success
async def test_monitoring_control(self):
"""测试监控控制功能"""
print("\n🧪 测试监控控制功能...")
# 测试禁用监控
self.monitor.disable_monitoring()
disabled_state = not self.monitor.monitoring_enabled
# 测试启用监控
self.monitor.enable_monitoring()
enabled_state = self.monitor.monitoring_enabled
success = disabled_state and enabled_state
self.test_results.append(("监控控制功能", success, "启用/禁用功能正常"))
print(f"✅ 监控控制: 启用/禁用功能正常")
return success
async def test_data_persistence(self):
"""测试数据持久化"""
print("\n🧪 测试数据持久化...")
try:
# 保存监控数据
test_filename = "test_monitoring_data.json"
self.monitor.save_monitoring_data(test_filename)
# 检查文件是否存在并包含正确数据
import os
if os.path.exists(test_filename):
with open(test_filename, 'r', encoding='utf-8') as f:
data = json.load(f)
required_sections = ["health_metrics", "active_alerts", "intervention_history",
"monitoring_config", "monitoring_enabled", "export_time"]
success = all(section in data for section in required_sections)
# 清理测试文件
os.remove(test_filename)
else:
success = False
except Exception as e:
print(f" 数据持久化错误: {e}")
success = False
self.test_results.append(("数据持久化", success, "保存/加载功能正常"))
print(f"✅ 数据持久化: 功能正常")
return success
async def test_performance(self):
"""测试性能"""
print("\n🧪 测试性能...")
# 准备测试数据
test_data = {
"recent_messages": [
{"sender": f"用户{i%4}", "content": f"这是第{i}条测试消息,包含一些内容用于分析。"}
for i in range(20)
],
"topic_keywords": ["测试", "性能", "分析", "消息"],
"system_status": {
"error_rate": 0.01,
"avg_response_time": 1.0,
"system_load": 0.5
}
}
# 性能测试
iterations = 100
start_time = time.time()
for _ in range(iterations):
await self.monitor.analyze_debate_health(test_data)
end_time = time.time()
total_time = end_time - start_time
avg_time = total_time / iterations
analyses_per_second = iterations / total_time
# 性能要求:平均处理时间 < 100ms
success = avg_time < 0.1
self.test_results.append(("性能测试", success, f"平均处理时间: {avg_time*1000:.2f}ms, 处理速度: {analyses_per_second:.1f}次/秒"))
print(f"✅ 性能测试: 平均处理时间 {avg_time*1000:.2f}ms, 处理速度 {analyses_per_second:.1f}次/秒")
return success
async def run_all_tests(self):
"""运行所有测试"""
print("🚀 开始Human干预系统测试...")
print("=" * 60)
test_functions = [
self.test_basic_health_monitoring,
self.test_quality_decline_detection,
self.test_toxic_behavior_detection,
self.test_emotional_escalation_detection,
self.test_participation_imbalance_detection,
self.test_auto_intervention,
self.test_human_notification,
self.test_health_report_generation,
self.test_alert_resolution,
self.test_monitoring_control,
self.test_data_persistence,
self.test_performance
]
passed_tests = 0
total_tests = len(test_functions)
for test_func in test_functions:
try:
result = await test_func()
if result:
passed_tests += 1
except Exception as e:
print(f"❌ 测试失败: {test_func.__name__} - {e}")
self.test_results.append((test_func.__name__, False, f"异常: {e}"))
# 输出测试结果
print("\n" + "=" * 60)
print("📊 测试结果汇总:")
print("=" * 60)
for test_name, success, details in self.test_results:
status = "✅ 通过" if success else "❌ 失败"
print(f"{status} {test_name}: {details}")
success_rate = (passed_tests / total_tests) * 100
print(f"\n🎯 总体测试结果: {passed_tests}/{total_tests} 通过 ({success_rate:.1f}%)")
if success_rate >= 90:
print("🎉 Human干预系统测试优秀")
elif success_rate >= 80:
print("👍 Human干预系统测试良好")
elif success_rate >= 70:
print("⚠️ Human干预系统测试一般需要改进。")
else:
print("❌ Human干预系统测试较差需要重大改进。")
# 输出系统状态
print("\n📋 系统状态报告:")
report = self.monitor.get_health_report()
print(f"监控状态: {'启用' if report['monitoring_enabled'] else '禁用'}")
print(f"活跃警报: {report['active_alerts']}")
print(f"近期干预: {report['recent_interventions']}")
print(f"收到警报: {len(self.received_alerts)}")
print(f"执行干预: {len(self.received_interventions)}")
print(f"Human通知: {len(self.received_notifications)}")
return success_rate >= 80
async def main():
"""主函数"""
tester = TestHumanInterventionSystem()
await tester.run_all_tests()
if __name__ == "__main__":
asyncio.run(main())