#!/usr/bin/env python3 """ Vertex AI Memory Bank 演示脚本 展示稷下学宫记忆增强AI辩论系统 """ import asyncio import sys import os # 添加项目根目录到路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.jixia.agents.memory_enhanced_agent import create_memory_enhanced_council from config.settings import validate_config async def demo_memory_enhanced_debate(): """演示记忆增强的AI辩论""" print("🏛️ 稷下学宫 Vertex AI Memory Bank 演示") print("=" * 60) # 验证配置 print("🔧 验证配置...") if not validate_config("google_adk"): print("❌ 配置验证失败,请检查环境变量") return try: # 创建记忆增强议会 print("\n🎭 创建八仙记忆增强议会...") council = await create_memory_enhanced_council() # 演示主题 topics = [ "特斯拉股票投资价值分析", "人工智能行业投资机会", "加密货币市场前景展望" ] # 选择参与的仙人(为了演示,只选择几位) participants = ["tieguaili", "lvdongbin", "hexiangu", "zhangguolao"] for i, topic in enumerate(topics, 1): print(f"\n{'='*40}") print(f"🎯 第 {i} 场辩论: {topic}") print(f"{'='*40}") # 进行记忆增强辩论 result = await council.conduct_memory_debate( topic=topic, participants=participants, rounds=2 # 每场2轮,保持演示简洁 ) print(f"\n📊 辩论结果:") print(f" 主题: {result['topic']}") print(f" 参与者: {len(result['participants'])} 位仙人") print(f" 总发言: {result['total_exchanges']} 次") # 显示部分对话内容 print(f"\n💬 精彩观点摘录:") for exchange in result['conversation_history'][:4]: # 只显示前4条 content_preview = exchange['content'][:120] + "..." if len(exchange['content']) > 120 else exchange['content'] print(f" 🗣️ {exchange['chinese_name']}: {content_preview}") # 获取集体记忆摘要 print(f"\n📚 获取集体记忆...") summary = await council.get_collective_memory_summary(topic) if "暂无相关集体记忆" not in summary: print(f" ✅ 已生成 {len(summary)} 字符的记忆摘要") else: print(f" ℹ️ 这是新主题,正在建立记忆") # 演示间隔 if i < len(topics): print(f"\n⏳ 准备下一场辩论...") await asyncio.sleep(1) # 最终演示:展示记忆的累积效果 print(f"\n{'='*60}") print("🧠 记忆累积效果演示") print(f"{'='*60}") # 让铁拐李基于所有记忆回答一个综合问题 tieguaili = council.agents.get("tieguaili") if tieguaili: print(f"\n🤔 向铁拐李提问: '基于你的所有记忆,总结一下当前市场的主要风险'") comprehensive_response = await tieguaili.respond_with_memory( message="基于你参与的所有辩论和积累的记忆,总结一下当前市场的主要风险和你的投资建议。", topic="综合市场分析" ) print(f"\n🧙‍♂️ 铁拐李的综合分析:") print(f" {comprehensive_response}") # 展示记忆学习功能 print(f"\n🎓 演示记忆学习功能...") # 让何仙姑学习一个用户偏好 hexiangu = council.agents.get("hexiangu") if hexiangu: await hexiangu.learn_preference( preference="用户偏好ESG投资,关注环境和社会责任", topic="投资偏好" ) print(f" ✅ 何仙姑学习了ESG投资偏好") # 基于新学到的偏好回答问题 esg_response = await hexiangu.respond_with_memory( message="推荐一些符合ESG标准的投资标的", topic="ESG投资" ) print(f"\n👸 何仙姑基于学习的偏好回应:") print(f" {esg_response[:200]}...") print(f"\n🎉 演示完成!") print(f"\n💡 Memory Bank 的优势:") print(f" ✅ 智能体能记住历史对话和分析") print(f" ✅ 学习用户偏好,提供个性化建议") print(f" ✅ 积累投资策略和市场洞察") print(f" ✅ 跨会话保持一致的人格和观点") print(f" ✅ 基于历史经验做出更好的决策") except Exception as e: print(f"❌ 演示过程中出现错误: {e}") print(f"💡 请检查:") print(f" - Google Cloud Project ID 是否正确配置") print(f" - Vertex AI API 是否已启用") print(f" - 网络连接是否正常") async def demo_individual_memory_features(): """演示个体记忆功能""" print(f"\n{'='*60}") print("🔍 个体记忆功能详细演示") print(f"{'='*60}") try: from src.jixia.memory.vertex_memory_bank import VertexMemoryBank from src.jixia.agents.memory_enhanced_agent import MemoryEnhancedAgent # 创建记忆银行 memory_bank = VertexMemoryBank.from_config() # 创建单个智能体进行详细演示 agent = MemoryEnhancedAgent("tieguaili", memory_bank) print(f"\n🧙‍♂️ 与 {agent.personality.chinese_name} 的记忆互动演示") # 1. 添加不同类型的记忆 print(f"\n📝 添加不同类型的记忆...") memories_to_add = [ { "content": "在2008年金融危机中,逆向投资者获得了丰厚回报", "memory_type": "knowledge", "topic": "历史教训" }, { "content": "用户偏好价值投资,不喜欢高风险的成长股", "memory_type": "preference", "topic": "用户偏好" }, { "content": "当市场过度乐观时,应该保持谨慎并寻找反向机会", "memory_type": "strategy", "topic": "投资策略" } ] for memory in memories_to_add: await memory_bank.add_memory( agent_name="tieguaili", content=memory["content"], memory_type=memory["memory_type"], debate_topic=memory["topic"] ) print(f" ✅ 添加{memory['memory_type']}记忆: {memory['content'][:50]}...") # 2. 搜索记忆 print(f"\n🔍 搜索相关记忆...") search_queries = ["金融危机", "价值投资", "投资策略"] for query in search_queries: results = await memory_bank.search_memories( agent_name="tieguaili", query=query, limit=3 ) print(f" 🔎 搜索 '{query}': 找到 {len(results)} 条相关记忆") for result in results: relevance = result.get('relevance_score', 'N/A') print(f" - {result['content'][:60]}... (相关度: {relevance})") # 3. 基于记忆的智能回应 print(f"\n🤖 基于记忆的智能回应演示...") questions = [ "现在市场很乐观,你有什么建议?", "推荐一些适合保守投资者的标的", "历史上有哪些值得借鉴的投资教训?" ] for question in questions: print(f"\n❓ 问题: {question}") response = await agent.respond_with_memory( message=question, topic="投资咨询" ) print(f"🧙‍♂️ 铁拐李: {response[:150]}...") print(f"\n✨ 个体记忆功能演示完成!") except Exception as e: print(f"❌ 个体记忆演示失败: {e}") async def main(): """主演示函数""" print("🚀 启动 Vertex AI Memory Bank 完整演示") # 主要演示:记忆增强辩论 await demo_memory_enhanced_debate() # 详细演示:个体记忆功能 await demo_individual_memory_features() print(f"\n🏛️ 稷下学宫 Memory Bank 演示结束") print(f"📖 更多信息请参考: docs/VERTEX_MEMORY_BANK_SETUP.md") if __name__ == "__main__": # 运行演示 asyncio.run(main())