384 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			384 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
#!/usr/bin/env python3
 | 
						||
# -*- coding: utf-8 -*-
 | 
						||
"""
 | 
						||
多群聊协调系统测试脚本
 | 
						||
"""
 | 
						||
 | 
						||
import asyncio
 | 
						||
import sys
 | 
						||
import os
 | 
						||
from datetime import datetime
 | 
						||
 | 
						||
# 添加项目路径
 | 
						||
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
 | 
						||
 | 
						||
from jixia.coordination.multi_chat_coordinator import (
 | 
						||
    MultiChatCoordinator, ChatType, MessagePriority, CoordinationAction
 | 
						||
)
 | 
						||
 | 
						||
async def test_basic_messaging():
 | 
						||
    """测试基本消息功能"""
 | 
						||
    print("\n🧪 测试基本消息功能")
 | 
						||
    print("=" * 50)
 | 
						||
    
 | 
						||
    coordinator = MultiChatCoordinator()
 | 
						||
    
 | 
						||
    # 测试正常消息发送
 | 
						||
    message1 = await coordinator.send_message(
 | 
						||
        "main_debate", "正1", 
 | 
						||
        "AI投资是未来科技发展的重要驱动力", 
 | 
						||
        MessagePriority.NORMAL, ["观点", "AI", "投资"]
 | 
						||
    )
 | 
						||
    print(f"✅ 消息1发送成功: {message1.id}")
 | 
						||
    
 | 
						||
    message2 = await coordinator.send_message(
 | 
						||
        "main_debate", "反1", 
 | 
						||
        "AI投资存在泡沫风险,需要谨慎对待", 
 | 
						||
        MessagePriority.NORMAL, ["反驳", "风险", "投资"]
 | 
						||
    )
 | 
						||
    print(f"✅ 消息2发送成功: {message2.id}")
 | 
						||
    
 | 
						||
    # 测试内部讨论
 | 
						||
    message3 = await coordinator.send_message(
 | 
						||
        "positive_internal", "正2", 
 | 
						||
        "我们需要收集更多AI成功案例的数据", 
 | 
						||
        MessagePriority.HIGH, ["策略", "数据"]
 | 
						||
    )
 | 
						||
    print(f"✅ 内部消息发送成功: {message3.id}")
 | 
						||
    
 | 
						||
    return coordinator
 | 
						||
 | 
						||
async def test_escalation_rules():
 | 
						||
    """测试升级规则"""
 | 
						||
    print("\n🚨 测试升级规则")
 | 
						||
    print("=" * 50)
 | 
						||
    
 | 
						||
    coordinator = MultiChatCoordinator()
 | 
						||
    
 | 
						||
    # 发送紧急消息,应该触发升级规则
 | 
						||
    urgent_message = await coordinator.send_message(
 | 
						||
        "main_debate", "正3", 
 | 
						||
        "系统检测到异常行为,需要紧急干预", 
 | 
						||
        MessagePriority.URGENT, ["紧急", "系统"]
 | 
						||
    )
 | 
						||
    print(f"🚨 紧急消息发送: {urgent_message.id}")
 | 
						||
    
 | 
						||
    # 等待协调规则处理
 | 
						||
    await asyncio.sleep(0.1)
 | 
						||
    
 | 
						||
    # 检查Human干预群是否收到升级消息
 | 
						||
    human_room = coordinator.chat_rooms["human_intervention"]
 | 
						||
    escalated_messages = [msg for msg in human_room.message_history 
 | 
						||
                         if "升级" in msg.tags]
 | 
						||
    
 | 
						||
    if escalated_messages:
 | 
						||
        print(f"✅ 升级规则生效,Human干预群收到 {len(escalated_messages)} 条升级消息")
 | 
						||
        for msg in escalated_messages:
 | 
						||
            print(f"   📨 {msg.sender}: {msg.content[:100]}...")
 | 
						||
    else:
 | 
						||
        print("❌ 升级规则未生效")
 | 
						||
    
 | 
						||
    return coordinator
 | 
						||
 | 
						||
async def test_broadcast_rules():
 | 
						||
    """测试广播规则"""
 | 
						||
    print("\n📢 测试广播规则")
 | 
						||
    print("=" * 50)
 | 
						||
    
 | 
						||
    coordinator = MultiChatCoordinator()
 | 
						||
    
 | 
						||
    # 在策略会议群发送决策消息
 | 
						||
    strategy_message = await coordinator.send_message(
 | 
						||
        "strategy_meeting", "系统", 
 | 
						||
        "决策:采用数据驱动的论证策略", 
 | 
						||
        MessagePriority.HIGH, ["决策", "策略"]
 | 
						||
    )
 | 
						||
    print(f"📋 策略决策发送: {strategy_message.id}")
 | 
						||
    
 | 
						||
    # 等待协调规则处理
 | 
						||
    await asyncio.sleep(0.1)
 | 
						||
    
 | 
						||
    # 检查内部讨论群是否收到广播消息
 | 
						||
    broadcast_count = 0
 | 
						||
    for room_id, room in coordinator.chat_rooms.items():
 | 
						||
        if room.chat_type == ChatType.INTERNAL_DISCUSSION:
 | 
						||
            broadcast_messages = [msg for msg in room.message_history 
 | 
						||
                                if "广播" in msg.tags]
 | 
						||
            if broadcast_messages:
 | 
						||
                broadcast_count += len(broadcast_messages)
 | 
						||
                print(f"✅ {room.name} 收到 {len(broadcast_messages)} 条广播消息")
 | 
						||
    
 | 
						||
    if broadcast_count > 0:
 | 
						||
        print(f"✅ 广播规则生效,总共发送 {broadcast_count} 条广播消息")
 | 
						||
    else:
 | 
						||
        print("❌ 广播规则未生效")
 | 
						||
    
 | 
						||
    return coordinator
 | 
						||
 | 
						||
async def test_filter_rules():
 | 
						||
    """测试过滤规则"""
 | 
						||
    print("\n🔍 测试过滤规则")
 | 
						||
    print("=" * 50)
 | 
						||
    
 | 
						||
    coordinator = MultiChatCoordinator()
 | 
						||
    
 | 
						||
    # 发送低质量消息
 | 
						||
    low_quality_message = await coordinator.send_message(
 | 
						||
        "main_debate", "正4", 
 | 
						||
        "好的", 
 | 
						||
        MessagePriority.LOW
 | 
						||
    )
 | 
						||
    print(f"📝 低质量消息发送: {low_quality_message.id}")
 | 
						||
    
 | 
						||
    # 等待协调规则处理
 | 
						||
    await asyncio.sleep(0.1)
 | 
						||
    
 | 
						||
    # 检查消息是否被过滤
 | 
						||
    if low_quality_message.metadata.get("filtered"):
 | 
						||
        print(f"✅ 过滤规则生效,消息被标记为已过滤")
 | 
						||
        print(f"   过滤原因: {low_quality_message.metadata.get('filter_reason')}")
 | 
						||
    else:
 | 
						||
        print("❌ 过滤规则未生效")
 | 
						||
    
 | 
						||
    return coordinator
 | 
						||
 | 
						||
async def test_discussion_merging():
 | 
						||
    """测试讨论合并"""
 | 
						||
    print("\n🔗 测试讨论合并")
 | 
						||
    print("=" * 50)
 | 
						||
    
 | 
						||
    coordinator = MultiChatCoordinator()
 | 
						||
    
 | 
						||
    # 发送相关消息
 | 
						||
    messages = [
 | 
						||
        ("main_debate", "正1", "AI技术的发展速度令人惊叹", ["AI", "技术"]),
 | 
						||
        ("positive_internal", "正2", "我们应该强调AI技术的创新价值", ["AI", "创新"]),
 | 
						||
        ("negative_internal", "反1", "AI技术也带来了就业问题", ["AI", "就业"]),
 | 
						||
    ]
 | 
						||
    
 | 
						||
    sent_messages = []
 | 
						||
    for chat_id, sender, content, tags in messages:
 | 
						||
        msg = await coordinator.send_message(chat_id, sender, content, tags=tags)
 | 
						||
        sent_messages.append(msg)
 | 
						||
        print(f"📨 发送消息: {sender} - {content[:30]}...")
 | 
						||
    
 | 
						||
    # 发送触发合并的消息
 | 
						||
    trigger_message = await coordinator.send_message(
 | 
						||
        "main_debate", "系统", 
 | 
						||
        "需要整合关于AI技术的所有讨论", 
 | 
						||
        tags=["AI", "整合"]
 | 
						||
    )
 | 
						||
    print(f"🔗 触发合并消息: {trigger_message.id}")
 | 
						||
    
 | 
						||
    # 等待协调规则处理
 | 
						||
    await asyncio.sleep(0.1)
 | 
						||
    
 | 
						||
    # 检查策略会议群是否收到合并摘要
 | 
						||
    strategy_room = coordinator.chat_rooms["strategy_meeting"]
 | 
						||
    merge_messages = [msg for msg in strategy_room.message_history 
 | 
						||
                     if "合并" in msg.tags or "摘要" in msg.tags]
 | 
						||
    
 | 
						||
    if merge_messages:
 | 
						||
        print(f"✅ 讨论合并生效,策略会议群收到 {len(merge_messages)} 条摘要")
 | 
						||
        for msg in merge_messages:
 | 
						||
            print(f"   📋 摘要: {msg.content[:100]}...")
 | 
						||
    else:
 | 
						||
        print("❌ 讨论合并未生效")
 | 
						||
    
 | 
						||
    return coordinator
 | 
						||
 | 
						||
async def test_permission_system():
 | 
						||
    """测试权限系统"""
 | 
						||
    print("\n🔐 测试权限系统")
 | 
						||
    print("=" * 50)
 | 
						||
    
 | 
						||
    coordinator = MultiChatCoordinator()
 | 
						||
    
 | 
						||
    # 测试正常权限
 | 
						||
    try:
 | 
						||
        normal_message = await coordinator.send_message(
 | 
						||
            "main_debate", "正1", "这是一条正常消息"
 | 
						||
        )
 | 
						||
        print(f"✅ 正常权限测试通过: {normal_message.id}")
 | 
						||
    except Exception as e:
 | 
						||
        print(f"❌ 正常权限测试失败: {e}")
 | 
						||
    
 | 
						||
    # 测试无权限用户
 | 
						||
    try:
 | 
						||
        unauthorized_message = await coordinator.send_message(
 | 
						||
            "main_debate", "未授权用户", "这是一条未授权消息"
 | 
						||
        )
 | 
						||
        print(f"❌ 权限系统失效,未授权用户发送成功: {unauthorized_message.id}")
 | 
						||
    except PermissionError as e:
 | 
						||
        print(f"✅ 权限系统正常,拒绝未授权用户: {e}")
 | 
						||
    except Exception as e:
 | 
						||
        print(f"❌ 权限系统异常: {e}")
 | 
						||
    
 | 
						||
    # 测试内部群权限
 | 
						||
    try:
 | 
						||
        internal_message = await coordinator.send_message(
 | 
						||
            "positive_internal", "正2", "内部策略讨论"
 | 
						||
        )
 | 
						||
        print(f"✅ 内部群权限测试通过: {internal_message.id}")
 | 
						||
    except Exception as e:
 | 
						||
        print(f"❌ 内部群权限测试失败: {e}")
 | 
						||
    
 | 
						||
    # 测试跨团队权限
 | 
						||
    try:
 | 
						||
        cross_team_message = await coordinator.send_message(
 | 
						||
            "positive_internal", "反1", "反方试图进入正方内部群"
 | 
						||
        )
 | 
						||
        print(f"❌ 跨团队权限控制失效: {cross_team_message.id}")
 | 
						||
    except PermissionError as e:
 | 
						||
        print(f"✅ 跨团队权限控制正常: {e}")
 | 
						||
    except Exception as e:
 | 
						||
        print(f"❌ 跨团队权限控制异常: {e}")
 | 
						||
    
 | 
						||
    return coordinator
 | 
						||
 | 
						||
async def test_system_status():
 | 
						||
    """测试系统状态"""
 | 
						||
    print("\n📊 测试系统状态")
 | 
						||
    print("=" * 50)
 | 
						||
    
 | 
						||
    coordinator = MultiChatCoordinator()
 | 
						||
    
 | 
						||
    # 发送一些测试消息
 | 
						||
    test_messages = [
 | 
						||
        ("main_debate", "正1", "测试消息1"),
 | 
						||
        ("main_debate", "反1", "测试消息2"),
 | 
						||
        ("positive_internal", "正2", "内部消息1"),
 | 
						||
        ("negative_internal", "反2", "内部消息2"),
 | 
						||
        ("strategy_meeting", "系统", "策略消息1"),
 | 
						||
    ]
 | 
						||
    
 | 
						||
    for chat_id, sender, content in test_messages:
 | 
						||
        await coordinator.send_message(chat_id, sender, content)
 | 
						||
    
 | 
						||
    # 获取系统状态
 | 
						||
    status = coordinator.get_chat_status()
 | 
						||
    
 | 
						||
    print(f"📈 系统状态报告:")
 | 
						||
    print(f"  总群聊数: {status['total_rooms']}")
 | 
						||
    print(f"  活跃群聊数: {status['active_rooms']}")
 | 
						||
    print(f"  总消息数: {status['total_messages']}")
 | 
						||
    print(f"  待处理消息: {status['pending_messages']}")
 | 
						||
    print(f"  协调规则数: {status['coordination_rules']}")
 | 
						||
    print(f"  活跃规则数: {status['active_rules']}")
 | 
						||
    
 | 
						||
    print(f"\n📋 群聊详情:")
 | 
						||
    for room_id, room_info in status['rooms'].items():
 | 
						||
        print(f"  🏠 {room_info['name']} ({room_info['type']})")
 | 
						||
        print(f"     参与者: {room_info['participants']} 人")
 | 
						||
        print(f"     消息数: {room_info['messages']}")
 | 
						||
        print(f"     活跃状态: {'✅' if room_info['is_active'] else '❌'}")
 | 
						||
    
 | 
						||
    # 测试数据保存
 | 
						||
    try:
 | 
						||
        coordinator.save_coordination_data("test_coordination_data.json")
 | 
						||
        print(f"\n💾 数据保存测试通过")
 | 
						||
    except Exception as e:
 | 
						||
        print(f"\n❌ 数据保存测试失败: {e}")
 | 
						||
    
 | 
						||
    return coordinator
 | 
						||
 | 
						||
async def test_performance():
 | 
						||
    """测试性能"""
 | 
						||
    print("\n⚡ 测试性能")
 | 
						||
    print("=" * 50)
 | 
						||
    
 | 
						||
    coordinator = MultiChatCoordinator()
 | 
						||
    
 | 
						||
    # 批量发送消息测试
 | 
						||
    start_time = datetime.now()
 | 
						||
    message_count = 100
 | 
						||
    
 | 
						||
    print(f"📤 发送 {message_count} 条消息...")
 | 
						||
    
 | 
						||
    for i in range(message_count):
 | 
						||
        chat_id = "main_debate" if i % 2 == 0 else "positive_internal"
 | 
						||
        sender = f"测试用户{i % 4 + 1}"
 | 
						||
        if chat_id == "positive_internal":
 | 
						||
            sender = f"正{i % 4 + 1}"
 | 
						||
        
 | 
						||
        content = f"性能测试消息 {i + 1}: 这是一条用于性能测试的消息内容"
 | 
						||
        
 | 
						||
        try:
 | 
						||
            await coordinator.send_message(chat_id, sender, content)
 | 
						||
        except PermissionError:
 | 
						||
            # 忽略权限错误,继续测试
 | 
						||
            pass
 | 
						||
    
 | 
						||
    end_time = datetime.now()
 | 
						||
    duration = (end_time - start_time).total_seconds()
 | 
						||
    
 | 
						||
    print(f"⏱️  性能测试结果:")
 | 
						||
    print(f"   总耗时: {duration:.3f} 秒")
 | 
						||
    print(f"   平均每条消息: {duration/message_count*1000:.2f} 毫秒")
 | 
						||
    print(f"   消息处理速度: {message_count/duration:.1f} 条/秒")
 | 
						||
    
 | 
						||
    # 获取最终状态
 | 
						||
    final_status = coordinator.get_chat_status()
 | 
						||
    print(f"   最终消息总数: {final_status['total_messages']}")
 | 
						||
    
 | 
						||
    return coordinator
 | 
						||
 | 
						||
async def run_all_tests():
 | 
						||
    """运行所有测试"""
 | 
						||
    print("🚀 多群聊协调系统测试开始")
 | 
						||
    print("=" * 60)
 | 
						||
    
 | 
						||
    tests = [
 | 
						||
        ("基本消息功能", test_basic_messaging),
 | 
						||
        ("升级规则", test_escalation_rules),
 | 
						||
        ("广播规则", test_broadcast_rules),
 | 
						||
        ("过滤规则", test_filter_rules),
 | 
						||
        ("讨论合并", test_discussion_merging),
 | 
						||
        ("权限系统", test_permission_system),
 | 
						||
        ("系统状态", test_system_status),
 | 
						||
        ("性能测试", test_performance),
 | 
						||
    ]
 | 
						||
    
 | 
						||
    results = []
 | 
						||
    
 | 
						||
    for test_name, test_func in tests:
 | 
						||
        try:
 | 
						||
            print(f"\n🧪 开始测试: {test_name}")
 | 
						||
            coordinator = await test_func()
 | 
						||
            results.append((test_name, "✅ 通过", None))
 | 
						||
            print(f"✅ {test_name} 测试完成")
 | 
						||
        except Exception as e:
 | 
						||
            results.append((test_name, "❌ 失败", str(e)))
 | 
						||
            print(f"❌ {test_name} 测试失败: {e}")
 | 
						||
    
 | 
						||
    # 测试结果总结
 | 
						||
    print("\n" + "=" * 60)
 | 
						||
    print("📊 测试结果总结")
 | 
						||
    print("=" * 60)
 | 
						||
    
 | 
						||
    passed = 0
 | 
						||
    failed = 0
 | 
						||
    
 | 
						||
    for test_name, status, error in results:
 | 
						||
        print(f"{status} {test_name}")
 | 
						||
        if error:
 | 
						||
            print(f"   错误: {error}")
 | 
						||
        
 | 
						||
        if "✅" in status:
 | 
						||
            passed += 1
 | 
						||
        else:
 | 
						||
            failed += 1
 | 
						||
    
 | 
						||
    print(f"\n📈 测试统计:")
 | 
						||
    print(f"   通过: {passed}")
 | 
						||
    print(f"   失败: {failed}")
 | 
						||
    print(f"   总计: {passed + failed}")
 | 
						||
    print(f"   成功率: {passed/(passed+failed)*100:.1f}%")
 | 
						||
    
 | 
						||
    if failed == 0:
 | 
						||
        print("\n🎉 所有测试通过!多群聊协调系统运行正常。")
 | 
						||
    else:
 | 
						||
        print(f"\n⚠️  有 {failed} 个测试失败,需要检查相关功能。")
 | 
						||
 | 
						||
if __name__ == "__main__":
 | 
						||
    asyncio.run(run_all_tests()) |