liurenchaxin/main.py

249 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
稷下学宫AI辩论系统主入口
提供命令行界面来运行不同的辩论模式
"""
import argparse
import asyncio
import sys
import os
from typing import Dict, Any, List, Tuple
# 将 src 目录添加到 Python 路径,以便能正确导入模块
project_root = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(project_root, 'src'))
from config.settings import validate_config, get_database_config
from google.adk import Agent, Runner
from google.adk.sessions import InMemorySessionService, Session
from google.genai import types
import pymongo
from datetime import datetime
def check_environment(mode: str = "hybrid"):
"""检查并验证运行环境"""
print("🔧 检查运行环境...")
if not validate_config(mode=mode):
print("❌ 环境配置验证失败")
return False
print("✅ 环境检查通过")
return True
async def _get_llm_reply(runner: Runner, prompt: str) -> str:
"""一个辅助函数用于调用Runner并获取纯文本回复同时流式输出到控制台"""
# 每个调用创建一个新的会话
session = await runner.session_service.create_session(state={}, app_name=runner.app_name, user_id="debate_user")
content = types.Content(role='user', parts=[types.Part(text=prompt)])
response = runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=content
)
reply = ""
async for event in response:
chunk = ""
if hasattr(event, 'content') and event.content and hasattr(event.content, 'parts'):
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
chunk = str(part.text)
elif hasattr(event, 'text') and event.text:
chunk = str(event.text)
if chunk:
print(chunk, end="", flush=True)
reply += chunk
return reply.strip()
async def run_adk_turn_based_debate(topic: str, rounds: int = 2):
"""运行由太上老君主持的,基于八卦对立和顺序的辩论"""
try:
print(f"🚀 启动ADK八仙论道 (太上老君主持)...")
print(f"📋 辩论主题: {topic}")
print(f"🔄 辩论总轮数: {rounds}")
# 1. 初始化记忆银行
print("🧠 初始化记忆银行...")
from src.jixia.memory.factory import get_memory_backend
memory_bank = get_memory_backend()
print("✅ 记忆银行准备就绪。")
character_configs = {
"太上老君": {"name": "太上老君", "model": "gemini-2.5-flash", "instruction": "你是太上老君天道化身辩论的主持人。你的言辞沉稳、公正、充满智慧。你的任务是1. 对辩论主题进行开场介绍。2. 在每轮或每场对决前进行引导。3. 在辩论结束后,对所有观点进行全面、客观的总结。保持中立,不偏袒任何一方。"},
"吕洞宾": {"name": "吕洞宾", "model": "gemini-2.5-flash", "instruction": "你是吕洞宾(乾卦),男性代表,善于理性分析,逻辑性强,推理严密。"},
"何仙姑": {"name": "何仙姑", "model": "gemini-2.5-flash", "instruction": "你是何仙姑(坤卦),女性代表,注重平衡与和谐,善于创新思维。"},
"张果老": {"name": "张果老", "model": "gemini-2.5-flash", "instruction": "你是张果老(兑卦),老者代表,具传统智慧,发言厚重沉稳,经验导向。"},
"韩湘子": {"name": "韩湘子", "model": "gemini-2.5-flash", "instruction": "你是韩湘子(艮卦),少年代表,具创新思维,发言活泼灵动,具前瞻性。"},
"汉钟离": {"name": "汉钟离", "model": "gemini-2.5-flash", "instruction": "你是汉钟离(离卦),富者代表,有权威意识,发言威严庄重,逻辑清晰。"},
"蓝采和": {"name": "蓝采和", "model": "gemini-2.5-flash", "instruction": "你是蓝采和(坎卦),贫者代表,关注公平,发言平易近人。"},
"曹国舅": {"name": "曹国舅", "model": "gemini-2.5-flash", "instruction": "你是曹国舅(震卦),贵者代表,具商业思维,发言精明务实,效率优先。"},
"铁拐李": {"name": "铁拐李", "model": "gemini-2.5-flash", "instruction": "你是铁拐李(巽卦),贱者代表,具草根智慧,发言朴实直接,实用至上。"}
}
# 为每个Runner创建独立的SessionService
runners: Dict[str, Runner] = {
name: Runner(
app_name="稷下学宫八仙论道系统",
agent=Agent(name=config["name"], model=config["model"], instruction=config["instruction"]),
session_service=InMemorySessionService()
) for name, config in character_configs.items()
}
host_runner = runners["太上老君"]
debate_history = []
print("\n" + "="*20 + " 辩论开始 " + "="*20)
print(f"\n👑 太上老君: ", end="", flush=True)
opening_prompt = f"请为本次关于“{topic}”的辩论,发表一段公正、深刻的开场白,并宣布辩论开始。"
opening_statement = await _get_llm_reply(host_runner, opening_prompt)
print() # Newline after streaming
# --- 第一轮:核心对立辩论 ---
if rounds >= 1:
print(f"\n👑 太上老君: ", end="", flush=True)
round1_intro = await _get_llm_reply(host_runner, "请为第一轮核心对立辩论进行引导。")
print() # Newline after streaming
duel_pairs: List[Tuple[str, str, str]] = [
("乾坤对立 (男女)", "吕洞宾", "何仙姑"),
("兑艮对立 (老少)", "张果老", "韩湘子"),
("离坎对立 (富贫)", "汉钟离", "蓝采和"),
("震巽对立 (贵贱)", "曹国舅", "铁拐李")
]
for title, p1, p2 in duel_pairs:
print(f"\n--- {title} ---")
print(f"👑 太上老君: ", end="", flush=True)
duel_intro = await _get_llm_reply(host_runner, f"现在开始“{title}”的对决,请{p1}{p2}准备。")
print() # Newline after streaming
print(f"🗣️ {p1}: ", end="", flush=True)
s1 = await _get_llm_reply(runners[p1], f"主题:{topic}。作为开场,请从你的角度阐述观点。")
print(); debate_history.append(f"{p1}: {s1}")
await memory_bank.add_memory(agent_name=p1, content=s1, memory_type="statement", debate_topic=topic)
print(f"🗣️ {p2}: ", end="", flush=True)
s2 = await _get_llm_reply(runners[p2], f"主题:{topic}。对于刚才{p1}的观点“{s1[:50]}...”,请进行回应。")
print(); debate_history.append(f"{p2}: {s2}")
await memory_bank.add_memory(agent_name=p2, content=s2, memory_type="statement", debate_topic=topic)
print(f"🗣️ {p1}: ", end="", flush=True)
s3 = await _get_llm_reply(runners[p1], f"主题:{topic}。对于{p2}的回应“{s2[:50]}...”,请进行反驳。")
print(); debate_history.append(f"{p1}: {s3}")
await memory_bank.add_memory(agent_name=p1, content=s3, memory_type="statement", debate_topic=topic)
print(f"🗣️ {p2}: ", end="", flush=True)
s4 = await _get_llm_reply(runners[p2], f"主题:{topic}。针对{p1}的反驳“{s3[:50]}...”,请为本场对决做总结。")
print(); debate_history.append(f"{p2}: {s4}")
await memory_bank.add_memory(agent_name=p2, content=s4, memory_type="statement", debate_topic=topic)
await asyncio.sleep(1)
# --- 第二轮:先天八卦顺序发言 (集成记忆银行) ---
if rounds >= 2:
print(f"\n👑 太上老君: ", end="", flush=True)
round2_intro = await _get_llm_reply(host_runner, "请为第二轮,也就是结合场上观点的综合发言,进行引导。")
print() # Newline after streaming
baxi_sequence = ["吕洞宾", "张果老", "汉钟离", "曹国舅", "铁拐李", "蓝采和", "韩湘子", "何仙姑"]
for name in baxi_sequence:
print(f"\n--- {name}的回合 ---")
context = await memory_bank.get_agent_context(name, topic)
prompt = f"这是你关于“{topic}”的记忆上下文,请参考并对其他人的观点进行回应:\n{context}\n\n现在请从你的角色特点出发,继续发表你的看法。"
print(f"🗣️ {name}: ", end="", flush=True)
reply = await _get_llm_reply(runners[name], prompt)
print(); debate_history.append(f"{name}: {reply}")
await memory_bank.add_memory(agent_name=name, content=reply, memory_type="statement", debate_topic=topic)
await asyncio.sleep(1)
print("\n" + "="*20 + " 辩论结束 " + "="*20)
# 4. 保存辩论会话到记忆银行
print("\n💾 正在保存辩论会话记录到记忆银行...")
await memory_bank.save_debate_session(
debate_topic=topic,
participants=[name for name in character_configs.keys() if name != "太上老君"],
conversation_history=[{"agent": h.split(": ")[0], "content": ": ".join(h.split(": ")[1:])} for h in debate_history if ": " in h],
outcomes={}
)
print("✅ 辩论会话已保存到记忆银行。")
# 5. 保存辩论过程资产到MongoDB
db_config = get_database_config()
if db_config.get("mongodb_url"):
print("\n💾 正在保存辩论过程资产到 MongoDB...")
try:
client = pymongo.MongoClient(db_config["mongodb_url"])
db = client.get_database("jixia_academy")
collection = db.get_collection("debates")
summary_prompt = f"辩论已结束。以下是完整的辩论记录:\n\n{' '.join(debate_history)}\n\n请对本次辩论进行全面、公正、深刻的总结。"
print(f"\n👑 太上老君: ", end="", flush=True)
summary = await _get_llm_reply(host_runner, summary_prompt)
print() # Newline after streaming
debate_document = {
"topic": topic,
"rounds": rounds,
"timestamp": datetime.utcnow(),
"participants": [name for name in character_configs.keys() if name != "太上老君"],
"conversation": [{"agent": h.split(": ")[0], "content": ": ".join(h.split(": ")[1:])} for h in debate_history if ": " in h],
"summary": summary
}
collection.insert_one(debate_document)
print("✅ 辩论过程资产已成功保存到 MongoDB。")
client.close()
except Exception as e:
print(f"❌ 保存到 MongoDB 失败: {e}")
else:
print("⚠️ 未配置 MONGODB_URL跳过保存到 MongoDB。")
print(f"\n👑 太上老君: ", end="", flush=True)
summary_prompt = f"辩论已结束。以下是完整的辩论记录:\n\n{' '.join(debate_history)}\n\n请对本次辩论进行全面、公正、深刻的总结。"
summary = await _get_llm_reply(host_runner, summary_prompt)
print() # Newline after streaming
for runner in runners.values(): await runner.close()
print(f"\n🎉 ADK八仙轮流辩论完成!")
return True
except Exception as e:
print(f"❌ 运行ADK八仙轮流辩论失败: {e}")
import traceback
traceback.print_exc()
return False
async def main_async(args):
if not check_environment(mode="google_adk"): return 1
await run_adk_turn_based_debate(args.topic, args.rounds)
return 0
def main():
parser = argparse.ArgumentParser(description="稷下学宫AI辩论系统 (ADK版)")
parser.add_argument("--topic", "-t", default="AI是否应该拥有创造力", help="辩论主题")
parser.add_argument("--rounds", "-r", type=int, default=2, choices=[1, 2], help="辩论轮数 (1: 核心对立, 2: 对立+顺序发言)")
args = parser.parse_args()
try:
sys.exit(asyncio.run(main_async(args)))
except KeyboardInterrupt:
print("\n\n👋 用户中断,退出程序")
sys.exit(0)
except Exception as e:
print(f"\n\n💥 程序运行出错: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()