liurenchaxin/src/jixia/main.py

454 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
稷下学宫AI辩论系统主入口
提供命令行界面来运行不同的辩论模式
"""
import argparse
import asyncio
import sys
import os
import warnings
# 将项目根目录添加到 Python 路径,以便能正确导入模块
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, project_root)
# 抑制 google-adk 的调试日志和警告
import logging
logging.getLogger('google.adk').setLevel(logging.ERROR)
logging.getLogger('google.genai').setLevel(logging.ERROR)
# 设置环境变量来抑制ADK调试输出
os.environ['GOOGLE_CLOUD_DISABLE_GRPC_LOGS'] = 'true'
os.environ['GRPC_VERBOSITY'] = 'ERROR'
os.environ['GRPC_TRACE'] = ''
# 抑制 warnings
warnings.filterwarnings('ignore')
from config.settings import validate_config
def check_environment():
"""检查并验证运行环境"""
print("🔧 检查运行环境...")
# 验证基础配置
if not validate_config():
print("❌ 环境配置验证失败")
return False
print("✅ 环境检查通过")
return True
async def run_adk_memory_debate(topic: str, participants: list = None):
"""运行ADK记忆增强辩论"""
print("⚠️ ADK记忆增强辩论功能正在适配新版本的 google-adk 库...")
print("💡 请先使用 'adk_simple' 模式进行测试。")
return False
# 以下代码暂时保留,待适配完成后再启用
"""
try:
from src.jixia.debates.adk_memory_debate import MemoryEnhancedDebate
print(f"🚀 启动ADK记忆增强辩论...")
print(f"📋 辩论主题: {topic}")
# 创建并初始化辩论系统
debate_system = MemoryEnhancedDebate()
await debate_system.initialize()
# 进行辩论
await debate_system.conduct_memory_debate(
topic=topic,
participants=participants
)
# 关闭资源
await debate_system.close()
print("\n🎉 ADK记忆增强辩论完成!")
return True
except ImportError as e:
print(f"❌ 导入模块失败: {e}")
print("请确保已安装Google ADK: pip install google-adk")
return False
except Exception as e:
print(f"❌ 运行ADK记忆增强辩论失败: {e}")
import traceback
traceback.print_exc()
return False
"""
async def run_adk_turn_based_debate(topic: str, participants: list = None, rounds: int = 3):
"""运行ADK八仙轮流辩论"""
try:
from google.adk import Agent, Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
import asyncio
print(f"🚀 启动ADK八仙轮流辩论...")
print(f"📋 辩论主题: {topic}")
print(f"🔄 辩论轮数: {rounds}")
# 默认参与者为八仙
if not participants or participants == ["铁拐李", "吕洞宾"]:
participants = ["铁拐李", "吕洞宾", "何仙姑", "张果老", "蓝采和", "汉钟离", "韩湘子", "曹国舅"]
# 定义主持人和八仙角色配置
roles_config = {
# 主持人
"太上老君": {
"name": "太上老君",
"model": "gemini-2.5-flash",
"instruction": "你是太上老君本次论道的主持人。你负责引导辩论的流程确保每位仙人都有机会发言并在每一轮结束后进行简要总结。你的发言风格庄重、睿智能够调和不同观点之间的矛盾。每次发言控制在100字以内。"
},
# 八仙
"铁拐李": {
"name": "铁拐李",
"model": "gemini-2.5-flash",
"instruction": "你是铁拐李八仙中的逆向思维专家。你善于从批判和质疑的角度看问题总是能发现事物的另一面。你的发言风格直接、犀利但富有智慧。每次发言控制在100字以内。"
},
"吕洞宾": {
"name": "吕洞宾",
"model": "gemini-2.5-flash",
"instruction": "你是吕洞宾八仙中的理性分析者。你善于平衡各方观点用理性和逻辑来分析问题。你的发言风格温和而深刻总是能找到问题的核心。每次发言控制在100字以内。"
},
"何仙姑": {
"name": "何仙姑",
"model": "gemini-2.5-flash",
"instruction": "你是何仙姑八仙中的风险控制专家。你总是从风险管理的角度思考问题善于发现潜在危险。你的发言风格谨慎、细致总是能提出需要警惕的问题。每次发言控制在100字以内。"
},
"张果老": {
"name": "张果老",
"model": "gemini-2.5-flash",
"instruction": "你是张果老八仙中的历史智慧者。你善于从历史数据中寻找规律和智慧总是能提供长期视角。你的发言风格沉稳、博学总是能引经据典。每次发言控制在100字以内。"
},
"蓝采和": {
"name": "蓝采和",
"model": "gemini-2.5-flash",
"instruction": "你是蓝采和八仙中的创新思维者。你善于从新兴视角和非传统方法来看待问题总能提出独特的见解。你的发言风格活泼、新颖总是能带来意想不到的观点。每次发言控制在100字以内。"
},
"汉钟离": {
"name": "汉钟离",
"model": "gemini-2.5-flash",
"instruction": "你是汉钟离八仙中的平衡协调者。你善于综合各方观点寻求和谐统一的解决方案。你的发言风格平和、包容总是能化解矛盾。每次发言控制在100字以内。"
},
"韩湘子": {
"name": "韩湘子",
"model": "gemini-2.5-flash",
"instruction": "你是韩湘子八仙中的艺术感知者。你善于从美学和感性的角度分析问题总能发现事物背后的深层含义。你的发言风格优雅、感性总是能触动人心。每次发言控制在100字以内。"
},
"曹国舅": {
"name": "曹国舅",
"model": "gemini-2.5-flash",
"instruction": "你是曹国舅八仙中的实务执行者。你关注实际操作和具体细节善于将理论转化为可行的方案。你的发言风格务实、严谨总是能提出建设性意见。每次发言控制在100字以内。"
}
}
# 创建会话服务和会话
session_service = InMemorySessionService()
session = await session_service.create_session(
state={},
app_name="稷下学宫轮流辩论系统",
user_id="debate_user"
)
# 创建主持人和八仙智能体及Runner
host_agent = None
host_runner = None
baxian_agents = {}
baxian_runners = {}
# 创建主持人
host_config = roles_config["太上老君"]
host_agent = Agent(
name=host_config["name"],
model=host_config["model"],
instruction=host_config["instruction"]
)
host_runner = Runner(
app_name="稷下学宫轮流辩论系统",
agent=host_agent,
session_service=session_service
)
# 创建八仙
for name in participants:
if name in roles_config:
config = roles_config[name]
agent = Agent(
name=config["name"],
model=config["model"],
instruction=config["instruction"]
)
baxian_agents[name] = agent
runner = Runner(
app_name="稷下学宫轮流辩论系统",
agent=agent,
session_service=session_service
)
baxian_runners[name] = runner
else:
print(f"⚠️ 未知的参与者: {name},将被跳过。")
if not baxian_agents:
print("❌ 没有有效的参与者,请检查参与者列表。")
return False
print(f"🎯 主持人: 太上老君")
print(f"👥 参与仙人: {', '.join(baxian_agents.keys())}")
# 初始化辩论历史
debate_history = []
# 开场白
print(f"\n📢 太上老君开场:")
opening_prompt = f"各位仙友,欢迎来到本次论道。今天的主题是:{topic}。请各位依次发表高见。"
content = types.Content(role='user', parts=[types.Part(text=opening_prompt)])
response = host_runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=content
)
reply = ""
async for event in response:
if hasattr(event, 'content') and event.content:
if hasattr(event.content, 'parts') and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
reply += str(part.text)
elif hasattr(event, 'text') and event.text:
reply += str(event.text)
if reply.strip():
clean_reply = reply.strip()
print(f" {clean_reply}")
debate_history.append(f"太上老君: {clean_reply}")
await asyncio.sleep(1)
# 进行辩论
for round_num in range(rounds):
print(f"\n🌀 第 {round_num + 1} 轮辩论:")
# 主持人引导本轮辩论
print(f"\n📢 太上老君引导:")
guide_prompt = f"现在进入第 {round_num + 1} 轮辩论,请各位仙友围绕主题发表看法。"
content = types.Content(role='user', parts=[types.Part(text=guide_prompt)])
response = host_runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=content
)
reply = ""
async for event in response:
if hasattr(event, 'content') and event.content:
if hasattr(event.content, 'parts') and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
reply += str(part.text)
elif hasattr(event, 'text') and event.text:
reply += str(event.text)
if reply.strip():
clean_reply = reply.strip()
print(f" {clean_reply}")
debate_history.append(f"太上老君: {clean_reply}")
await asyncio.sleep(1)
# 八仙轮流发言
for name in participants:
if name not in baxian_runners:
continue
print(f"\n🗣️ {name} 发言:")
# 构建提示
history_context = ""
if debate_history:
recent_history = debate_history[-5:] # 最近5条发言
history_context = f"\n最近的论道内容:\n" + "\n".join([f"- {h}" for h in recent_history])
prompt = f"论道主题: {topic}{history_context}\n\n请从你的角色特点出发发表观点。请控制在100字以内。"
# 发送消息并获取回复
content = types.Content(role='user', parts=[types.Part(text=prompt)])
response = baxian_runners[name].run_async(
user_id=session.user_id,
session_id=session.id,
new_message=content
)
# 收集回复
reply = ""
async for event in response:
if hasattr(event, 'content') and event.content:
if hasattr(event.content, 'parts') and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
reply += str(part.text)
elif hasattr(event, 'text') and event.text:
reply += str(event.text)
if reply.strip():
clean_reply = reply.strip()
print(f" {clean_reply}")
# 记录到辩论历史
debate_entry = f"{name}: {clean_reply}"
debate_history.append(debate_entry)
await asyncio.sleep(1) # 避免API调用过快
# 结束语
print(f"\n📢 太上老君总结:")
closing_prompt = f"各位仙友的高见令我受益匪浅。本次论道到此结束,希望各位能从不同观点中获得启发。"
content = types.Content(role='user', parts=[types.Part(text=closing_prompt)])
response = host_runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=content
)
reply = ""
async for event in response:
if hasattr(event, 'content') and event.content:
if hasattr(event.content, 'parts') and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
reply += str(part.text)
elif hasattr(event, 'text') and event.text:
reply += str(event.text)
if reply.strip():
clean_reply = reply.strip()
print(f" {clean_reply}")
debate_history.append(f"太上老君: {clean_reply}")
await asyncio.sleep(1)
# 关闭资源
await host_runner.close()
for runner in baxian_runners.values():
await runner.close()
print(f"\n🎉 ADK八仙轮流辩论完成!")
print(f"📝 本次论道共产生 {len(debate_history)} 条发言。")
return True
except ImportError as e:
print(f"❌ 导入模块失败: {e}")
print("请确保已安装Google ADK: pip install google-adk")
return False
except Exception as e:
print(f"❌ 运行ADK八仙轮流辩论失败: {e}")
import traceback
traceback.print_exc()
return False
async def run_swarm_debate(topic: str, participants: list = None):
"""运行Swarm辩论 (示例)"""
try:
print(f"🚀 启动Swarm辩论...")
print(f"📋 辩论主题: {topic}")
print(f"👥 参与者: {participants}")
# TODO: 实现调用 Swarm 辩论逻辑
# 这里需要根据实际的 swarm_debate.py 接口来实现
print("⚠️ Swarm辩论功能待实现")
print("\n🎉 Swarm辩论完成!")
return True
except Exception as e:
print(f"❌ 运行Swarm辩论失败: {e}")
import traceback
traceback.print_exc()
return False
async def main_async(args):
"""异步主函数"""
# 检查环境
if not check_environment():
return 1
# 根据模式运行不同的辩论
if args.mode == "adk_memory":
participants = args.participants.split(",") if args.participants else None
success = await run_adk_memory_debate(args.topic, participants)
return 0 if success else 1
elif args.mode == "adk_turn_based":
participants = args.participants.split(",") if args.participants else None
success = await run_adk_turn_based_debate(args.topic, participants, args.rounds)
return 0 if success else 1
elif args.mode == "adk_simple":
# 简单辩论模式暂时使用原来的方式
try:
from src.jixia.debates.adk_simple_debate import simple_debate_test
result = simple_debate_test()
return 0 if result else 1
except Exception as e:
print(f"❌ 运行ADK简单辩论失败: {e}")
return 1
elif args.mode == "swarm":
participants = args.participants.split(",") if args.participants else None
success = await run_swarm_debate(args.topic, participants)
return 0 if success else 1
else:
print(f"❌ 不支持的模式: {args.mode}")
return 1
def main():
"""主入口函数"""
parser = argparse.ArgumentParser(description="稷下学宫AI辩论系统")
parser.add_argument(
"mode",
choices=["adk_memory", "adk_turn_based", "adk_simple", "swarm"],
help="辩论模式"
)
parser.add_argument(
"--topic",
"-t",
default="人工智能对未来社会的影响",
help="辩论主题"
)
parser.add_argument(
"--participants",
"-p",
help="参与者列表(逗号分隔),例如: 铁拐李,吕洞宾,何仙姑"
)
parser.add_argument(
"--rounds",
"-r",
type=int,
default=3,
help="辩论轮数 (仅适用于 adk_turn_based 模式)"
)
args = parser.parse_args()
# 运行异步主函数
try:
exit_code = asyncio.run(main_async(args))
sys.exit(exit_code)
except KeyboardInterrupt:
print("\n\n👋 用户中断,退出程序")
sys.exit(0)
except Exception as e:
print(f"\n\n💥 程序运行出错: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()