#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 增强版多群聊协调系统 v2.1.0 基于原有系统,增加智能路由、负载均衡、实时分析等功能 """ import asyncio import json from typing import Dict, List, Any, Optional, Callable from dataclasses import dataclass, field from enum import Enum from datetime import datetime, timedelta import logging import statistics from collections import defaultdict class RoutingStrategy(Enum): """路由策略""" SMART_ROUTING = "智能路由" LOAD_BALANCED = "负载均衡" PRIORITY_BASED = "优先级" class MessageAnalysisLevel(Enum): """消息分析级别""" BASIC = "基础" STANDARD = "标准" ADVANCED = "高级" @dataclass class ChatAnalytics: """群聊分析数据""" chat_id: str message_count: int = 0 active_participants: set = field(default_factory=set) engagement_score: float = 0.0 topic_distribution: Dict[str, int] = field(default_factory=dict) sentiment_trends: List[float] = field(default_factory=list) @dataclass class LoadMetrics: """负载指标""" chat_id: str current_load: float = 0.0 peak_load: float = 0.0 message_rate: float = 0.0 _load_history: List[float] = field(default_factory=list) class EnhancedMultiChatCoordinator: """增强版多群聊协调器 v2.1.0""" def __init__(self): # 基础组件 self.chat_rooms: Dict[str, Dict] = {} self.chat_analytics: Dict[str, ChatAnalytics] = {} self.load_metrics: Dict[str, LoadMetrics] = {} self.message_analyzer = MessageAnalyzer() self.load_balancer = LoadBalancer() self.performance_tracker = PerformanceTracker() self.logger = logging.getLogger(__name__) # 初始化 self._initialize_enhanced_system() self.logger.info("增强版多群聊协调系统 v2.1.0 初始化完成") def _initialize_enhanced_system(self): """初始化增强系统""" # 初始化基础群聊 rooms = { "main_debate": {"name": "主辩论群", "type": "主辩论", "participants": ["正1", "正2", "正3", "正4", "反1", "反2", "反3", "反4"]}, "positive_internal": {"name": "正方内部讨论群", "type": "内部讨论", "participants": ["正1", "正2", "正3", "正4"]}, "negative_internal": {"name": "反方内部讨论群", "type": "内部讨论", "participants": ["反1", "反2", "反3", "反4"]}, "strategy_meeting": {"name": "策略会议群", "type": "策略会议", "participants": ["正1", "反1", "系统"]}, "human_intervention": {"name": "Human干预群", "type": "人工干预", "participants": ["Human", "系统"]}, "observation": {"name": "观察群", "type": "观察记录", "participants": ["观察者", "记录员"]} } for room_id, room_config in rooms.items(): self.chat_rooms[room_id] = {**room_config, "id": room_id, "is_active": True, "message_history": []} self.chat_analytics[room_id] = ChatAnalytics(chat_id=room_id) self.load_metrics[room_id] = LoadMetrics(chat_id=room_id) async def send_enhanced_message(self, chat_id: str, sender: str, content: str, priority: int = 2, tags: List[str] = None, analysis_level: MessageAnalysisLevel = MessageAnalysisLevel.STANDARD) -> Dict[str, Any]: """发送增强消息""" start_time = datetime.now() try: # 1. 验证群聊存在 if chat_id not in self.chat_rooms: raise ValueError(f"群聊 {chat_id} 不存在") # 2. 消息分析 analysis = await self.message_analyzer.analyze_message(content, sender, tags or [], analysis_level) # 3. 负载检查与智能路由 load_check = self.load_balancer.check_capacity(chat_id, self.load_metrics) if not load_check["can_handle"] and load_check["alternative"]: original_chat = chat_id chat_id = load_check["alternative"] self.logger.info(f"消息从 {original_chat} 路由到 {chat_id}") # 4. 创建消息 message = { "id": f"{chat_id}_{datetime.now().timestamp()}", "chat_id": chat_id, "sender": sender, "content": content, "priority": priority, "tags": tags or [], "timestamp": datetime.now(), "analysis": analysis } # 5. 存储消息 self.chat_rooms[chat_id]["message_history"].append(message) # 6. 更新分析数据 await self._update_analytics(message, analysis) # 7. 智能协调动作 await self._execute_smart_coordination(message, analysis) # 8. 性能记录 processing_time = (datetime.now() - start_time).total_seconds() * 1000 self.performance_tracker.record_operation("send_message", processing_time, True) return message except Exception as e: processing_time = (datetime.now() - start_time).total_seconds() * 1000 self.performance_tracker.record_operation("send_message", processing_time, False) self.logger.error(f"发送消息失败: {e}") raise async def _update_analytics(self, message: Dict[str, Any], analysis: Dict[str, Any]): """更新分析数据""" chat_id = message["chat_id"] analytics = self.chat_analytics[chat_id] # 更新基础指标 analytics.message_count += 1 analytics.active_participants.add(message["sender"]) # 更新话题分布 for tag in message.get("tags", []): analytics.topic_distribution[tag] = analytics.topic_distribution.get(tag, 0) + 1 # 更新情感趋势 sentiment = analysis.get("sentiment", 0.0) analytics.sentiment_trends.append(sentiment) if len(analytics.sentiment_trends) > 50: analytics.sentiment_trends.pop(0) # 计算参与度 analytics.engagement_score = self._calculate_engagement_score(analytics) # 更新负载 await self._update_load_metrics(chat_id, message) def _calculate_engagement_score(self, analytics: ChatAnalytics) -> float: """计算参与度分数""" if analytics.message_count == 0: return 0.0 participant_ratio = len(analytics.active_participants) / max(1, analytics.message_count) frequency_score = min(analytics.message_count / 100, 1.0) sentiment_variance = 0.0 if len(analytics.sentiment_trends) > 1: sentiment_variance = statistics.stdev(analytics.sentiment_trends) engagement = (participant_ratio * 0.4 + frequency_score * 0.4 + sentiment_variance * 0.2) return min(engagement, 1.0) async def _update_load_metrics(self, chat_id: str, message: Dict[str, Any]): """更新负载指标""" metrics = self.load_metrics[chat_id] # 计算基础负载 content_length = len(message.get("content", "")) base_load = min(content_length / 1000, 1.0) metrics.current_load = base_load metrics.peak_load = max(metrics.peak_load, metrics.current_load) # 更新历史 metrics._load_history.append(metrics.current_load) if len(metrics._load_history) > 100: metrics._load_history.pop(0) async def _execute_smart_coordination(self, message: Dict[str, Any], analysis: Dict[str, Any]): """执行智能协调""" urgency = analysis.get("urgency", 0.0) topics = analysis.get("topics", []) # 紧急消息升级到Human干预群 if urgency > 0.7: await self._escalate_to_human(message) # 策略相关消息分发到内部群 if "策略" in topics or "决策" in topics: await self._distribute_strategy_message(message) # 高参与度消息复制到观察群 chat_analytics = self.chat_analytics[message["chat_id"]] if chat_analytics.engagement_score > 0.8: await self._archive_to_observation(message) async def _escalate_to_human(self, message: Dict[str, Any]): """升级到Human干预群""" escalated_content = f"🚨 [紧急升级] 来自 {message['chat_id']}\n发送者: {message['sender']}\n内容: {message['content']}" await self.send_enhanced_message( "human_intervention", "系统", escalated_content, priority=5, tags=["升级", "紧急"] ) async def _distribute_strategy_message(self, message: Dict[str, Any]): """分发策略消息""" strategy_content = f"📢 [策略分发] {message['content']}" for target in ["positive_internal", "negative_internal"]: await self.send_enhanced_message( target, "系统", strategy_content, priority=3, tags=["策略", "分发"] ) async def _archive_to_observation(self, message: Dict[str, Any]): """归档到观察群""" archive_content = f"📁 [高活跃归档] 来自 {message['chat_id']}: {message['content'][:100]}..." await self.send_enhanced_message( "observation", "系统", archive_content, priority=1, tags=["归档", "高活跃"] ) def get_enhanced_status(self) -> Dict[str, Any]: """获取增强状态""" return { "version": "v2.1.0", "total_rooms": len(self.chat_rooms), "active_rooms": len([r for r in self.chat_rooms.values() if r["is_active"]]), "total_messages": sum(len(r["message_history"]) for r in self.chat_rooms.values()), "analytics": { chat_id: { "message_count": analytics.message_count, "active_participants": len(analytics.active_participants), "engagement_score": analytics.engagement_score, "top_topics": sorted(analytics.topic_distribution.items(), key=lambda x: x[1], reverse=True)[:3], "current_load": self.load_metrics[chat_id].current_load } for chat_id, analytics in self.chat_analytics.items() }, "performance": self.performance_tracker.get_summary() } # 兼容性方法 async def handle_message(self, message_data: Dict[str, Any]) -> Dict[str, Any]: """处理消息(兼容性)""" try: chat_id = message_data.get("chat_id", "main_debate") sender = message_data.get("speaker", message_data.get("sender", "未知")) content = message_data.get("content", "") message = await self.send_enhanced_message(chat_id, sender, content) return { "success": True, "message_id": message["id"], "processed_at": datetime.now().isoformat() } except Exception as e: return { "success": False, "error": str(e), "processed_at": datetime.now().isoformat() } class MessageAnalyzer: """消息分析器""" def __init__(self): self.sentiment_keywords = { "positive": ["好", "棒", "优秀", "支持", "赞同"], "negative": ["坏", "差", "错误", "反对", "质疑"], } async def analyze_message(self, content: str, sender: str, tags: List[str], level: MessageAnalysisLevel) -> Dict[str, Any]: """分析消息""" return { "content_length": len(content), "word_count": len(content.split()), "sentiment": self._analyze_sentiment(content), "urgency": self._analyze_urgency(content, tags), "topics": self._extract_topics(content, tags), "intent": self._analyze_intent(content), "timestamp": datetime.now().isoformat() } def _analyze_sentiment(self, content: str) -> float: """分析情感(-1到1)""" positive_count = sum(1 for word in self.sentiment_keywords["positive"] if word in content) negative_count = sum(1 for word in self.sentiment_keywords["negative"] if word in content) total_words = len(content.split()) if total_words == 0: return 0.0 sentiment = (positive_count - negative_count) / total_words return max(-1.0, min(1.0, sentiment * 10)) def _analyze_urgency(self, content: str, tags: List[str]) -> float: """分析紧急度(0到1)""" urgent_keywords = ["紧急", "立即", "错误", "异常", "危险"] urgent_tags = ["紧急", "错误"] urgency = 0.0 for keyword in urgent_keywords: if keyword in content: urgency += 0.2 for tag in urgent_tags: if tag in tags: urgency += 0.3 return min(urgency, 1.0) def _extract_topics(self, content: str, tags: List[str]) -> List[str]: """提取话题""" topics = list(tags) topic_keywords = { "AI": ["AI", "人工智能", "算法"], "投资": ["投资", "收益", "风险"], "策略": ["策略", "计划", "方案"] } for topic, keywords in topic_keywords.items(): if any(keyword in content for keyword in keywords): if topic not in topics: topics.append(topic) return topics def _analyze_intent(self, content: str) -> str: """分析意图""" if any(marker in content for marker in ["?", "什么", "如何"]): return "question" elif any(marker in content for marker in ["反对", "质疑", "不同意"]): return "objection" elif any(marker in content for marker in ["支持", "赞同", "同意"]): return "agreement" else: return "statement" class LoadBalancer: """负载均衡器""" def __init__(self): self.capacity_threshold = 0.8 def check_capacity(self, chat_id: str, load_metrics: Dict[str, LoadMetrics]) -> Dict[str, Any]: """检查容量""" if chat_id not in load_metrics: return {"can_handle": True, "alternative": None} metrics = load_metrics[chat_id] if metrics.current_load > self.capacity_threshold: alternative = self._find_alternative(chat_id, load_metrics) return {"can_handle": False, "alternative": alternative} return {"can_handle": True, "alternative": None} def _find_alternative(self, original: str, load_metrics: Dict[str, LoadMetrics]) -> Optional[str]: """寻找替代群聊""" for chat_id, metrics in load_metrics.items(): if chat_id != original and metrics.current_load < self.capacity_threshold: return chat_id return None class PerformanceTracker: """性能追踪器""" def __init__(self): self.operation_history = defaultdict(list) def record_operation(self, operation: str, duration_ms: float, success: bool): """记录操作""" self.operation_history[operation].append({ "duration_ms": duration_ms, "success": success, "timestamp": datetime.now() }) # 保持最近100条 if len(self.operation_history[operation]) > 100: self.operation_history[operation].pop(0) def get_summary(self) -> Dict[str, Any]: """获取摘要""" summary = {} for operation, records in self.operation_history.items(): if records: durations = [r["duration_ms"] for r in records] success_count = sum(1 for r in records if r["success"]) summary[operation] = { "total_calls": len(records), "success_rate": success_count / len(records), "avg_duration_ms": statistics.mean(durations) } return summary # 测试函数 async def test_enhanced_system(): """测试增强系统""" print("🚀 测试增强版多群聊协调系统 v2.1.0") print("=" * 50) coordinator = EnhancedMultiChatCoordinator() # 测试正常消息 message1 = await coordinator.send_enhanced_message( "main_debate", "正1", "我认为AI投资具有巨大潜力", tags=["AI", "投资"] ) print(f"✅ 正常消息: {message1['id']}") # 测试紧急消息 message2 = await coordinator.send_enhanced_message( "main_debate", "反1", "系统出现紧急错误,需要立即处理!", tags=["紧急", "错误"] ) print(f"🚨 紧急消息: {message2['id']}") # 测试策略消息 message3 = await coordinator.send_enhanced_message( "strategy_meeting", "系统", "新的辩论策略已制定,请各队参考", tags=["策略", "决策"] ) print(f"📋 策略消息: {message3['id']}") # 获取状态 status = coordinator.get_enhanced_status() print(f"\n📊 系统状态:") print(f" 版本: {status['version']}") print(f" 总群聊: {status['total_rooms']}") print(f" 总消息: {status['total_messages']}") print(f" 性能指标: {status['performance']}") print("\n🎉 增强版多群聊协调系统测试完成!") return coordinator if __name__ == "__main__": asyncio.run(test_enhanced_system())