liurenchaxin/examples/debates/adk_real_debate.py

249 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
稷下学宫 ADK 真实论道系统
实现铁拐李和吕洞宾的实际对话辩论
"""
import os
import asyncio
from google.adk import Agent, Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
import re
import sys
from contextlib import contextmanager
def create_debate_agents():
"""创建论道智能体"""
# 铁拐李 - 逆向思维专家
tie_guai_li = Agent(
name="铁拐李",
model="gemini-2.5-flash",
instruction="你是铁拐李八仙中的逆向思维专家。你善于从批判和质疑的角度看问题总是能发现事物的另一面。你的发言风格直接、犀利但富有智慧。每次发言控制在100字以内。"
)
# 吕洞宾 - 理性分析者
lu_dong_bin = Agent(
name="吕洞宾",
model="gemini-2.5-flash",
instruction="你是吕洞宾八仙中的理性分析者。你善于平衡各方观点用理性和逻辑来分析问题。你的发言风格温和而深刻总是能找到问题的核心。每次发言控制在100字以内。"
)
return tie_guai_li, lu_dong_bin
async def conduct_debate():
"""进行实际辩论"""
print("🎭 稷下学宫论道开始...")
# 创建智能体
tie_guai_li, lu_dong_bin = create_debate_agents()
print("\n📋 论道主题: 雅江水电站对中印关系的影响")
print("\n🎯 八仙论道,智慧交锋...")
print("\n🚀 使用真实ADK调用进行论道...")
await real_adk_debate(tie_guai_li, lu_dong_bin)
@contextmanager
def suppress_stdout():
"""临时抑制stdout输出"""
with open(os.devnull, 'w') as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
try:
yield
finally:
sys.stdout = old_stdout
def clean_debug_output(text):
"""清理ADK输出中的调试信息"""
if not text:
return ""
# 移除API密钥相关信息
text = re.sub(r'Both GOOGLE_API_KEY and GEMINI_API_KEY are set\. Using GOOGLE_API_KEY\.', '', text)
# 移除Event from unknown agent信息
text = re.sub(r'Event from an unknown agent: [^\n]*\n?', '', text)
# 移除多余的空白字符
text = re.sub(r'\n\s*\n', '\n', text)
text = text.strip()
return text
async def real_adk_debate(tie_guai_li, lu_dong_bin):
"""使用真实ADK进行辩论"""
print("\n🔥 真实ADK论道模式")
# 设置环境变量来抑制ADK调试输出
os.environ['GOOGLE_CLOUD_DISABLE_GRPC_LOGS'] = 'true'
os.environ['GRPC_VERBOSITY'] = 'NONE'
os.environ['GRPC_TRACE'] = ''
# 临时抑制警告和调试信息
import warnings
warnings.filterwarnings('ignore')
# 设置日志级别
import logging
logging.getLogger().setLevel(logging.ERROR)
# 创建会话服务
session_service = InMemorySessionService()
# 创建会话
session = await session_service.create_session(
state={},
app_name="稷下学宫论道系统",
user_id="debate_user"
)
# 创建Runner实例
tie_runner = Runner(
app_name="稷下学宫论道系统",
agent=tie_guai_li,
session_service=session_service
)
lu_runner = Runner(
app_name="稷下学宫论道系统",
agent=lu_dong_bin,
session_service=session_service
)
try:
# 第一轮:铁拐李开场
print("\n🗣️ 铁拐李发言:")
tie_prompt = "作为逆向思维专家请从批判角度分析雅江水电站建设对中印关系可能带来的负面影响和潜在风险。请控制在100字以内。"
tie_content = types.Content(role='user', parts=[types.Part(text=tie_prompt)])
with suppress_stdout():
tie_response = tie_runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=tie_content
)
tie_reply = ""
async for event in tie_response:
# 只处理包含实际文本内容的事件,过滤调试信息
if hasattr(event, 'content') and event.content:
if hasattr(event.content, 'parts') and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text and part.text.strip():
text_content = str(part.text).strip()
# 过滤掉调试信息和系统消息
if not text_content.startswith('Event from') and not 'API_KEY' in text_content:
tie_reply += text_content
elif hasattr(event, 'text') and event.text:
text_content = str(event.text).strip()
if not text_content.startswith('Event from') and not 'API_KEY' in text_content:
tie_reply += text_content
# 清理并输出铁拐李的回复
clean_tie_reply = clean_debug_output(tie_reply)
if clean_tie_reply:
print(f" {clean_tie_reply}")
# 第二轮:吕洞宾回应
print("\n🗣️ 吕洞宾回应:")
lu_prompt = f"铁拐李提到了雅江水电站的负面影响:'{tie_reply[:50]}...'。作为理性分析者请从平衡角度回应既承认风险又指出雅江水电站对中印关系的积极意义。请控制在100字以内。"
lu_content = types.Content(role='user', parts=[types.Part(text=lu_prompt)])
with suppress_stdout():
lu_response = lu_runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=lu_content
)
lu_reply = ""
async for event in lu_response:
# 只处理包含实际文本内容的事件,过滤调试信息
if hasattr(event, 'content') and event.content:
if hasattr(event.content, 'parts') and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text and part.text.strip():
text_content = str(part.text).strip()
# 过滤掉调试信息和系统消息
if not text_content.startswith('Event from') and not 'API_KEY' in text_content:
lu_reply += text_content
elif hasattr(event, 'text') and event.text:
text_content = str(event.text).strip()
if not text_content.startswith('Event from') and not 'API_KEY' in text_content:
lu_reply += text_content
# 清理并输出吕洞宾的回复
clean_lu_reply = clean_debug_output(lu_reply)
if clean_lu_reply:
print(f" {clean_lu_reply}")
# 第三轮:铁拐李再次发言
print("\n🗣️ 铁拐李再次发言:")
tie_prompt2 = f"吕洞宾提到了雅江水电站的积极意义:'{lu_reply[:50]}...'。请从逆向思维角度对这些所谓的积极影响进行质疑和反思。请控制在100字以内。"
tie_content2 = types.Content(role='user', parts=[types.Part(text=tie_prompt2)])
with suppress_stdout():
tie_response2 = tie_runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=tie_content2
)
tie_reply2 = ""
async for event in tie_response2:
# 只处理包含实际文本内容的事件,过滤调试信息
if hasattr(event, 'content') and event.content:
if hasattr(event.content, 'parts') and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text and part.text.strip():
text_content = str(part.text).strip()
# 过滤掉调试信息和系统消息
if not text_content.startswith('Event from') and not 'API_KEY' in text_content:
tie_reply2 += text_content
elif hasattr(event, 'text') and event.text:
text_content = str(event.text).strip()
if not text_content.startswith('Event from') and not 'API_KEY' in text_content:
tie_reply2 += text_content
# 清理并输出铁拐李的第二次回复
clean_tie_reply2 = clean_debug_output(tie_reply2)
if clean_tie_reply2:
print(f" {clean_tie_reply2}")
print("\n🎉 真实ADK论道完成")
print("\n📝 智慧交锋,各抒己见,这就是稷下学宫的魅力所在。")
finally:
# 清理资源
await tie_runner.close()
await lu_runner.close()
def main():
"""主函数"""
print("🚀 稷下学宫 ADK 真实论道系统")
# 检查API密钥
api_key = os.getenv('GOOGLE_API_KEY')
if not api_key:
print("❌ 未找到 GOOGLE_API_KEY 环境变量")
print("请使用: doppler run -- python src/jixia/debates/adk_real_debate.py")
return
print(f"✅ API密钥已配置")
# 运行异步辩论
try:
asyncio.run(conduct_debate())
except Exception as e:
print(f"❌ 运行失败: {e}")
if __name__ == "__main__":
main()