feat(README): 添加 OpenBB 市场数据集成

- 在 README.md 中添加了 OpenBB 市场数据集成的说明
- 更新了 doppler_config.py 中 Google GenAI 配置逻辑
- 移除了与 ADK 和 Ollama 相关的示例代码文件
This commit is contained in:
ben 2025-08-21 04:50:29 +00:00
parent 51576ebb6f
commit ed49ef2833
14 changed files with 270 additions and 960 deletions

View File

@ -11,6 +11,7 @@
- **🌍 天下体系分析**: 基于儒门天下观的资本生态"天命树"分析模型
- **🔒 安全配置管理**: 使用Doppler进行统一的密钥和配置管理
- **📊 智能数据源**: 基于17个RapidAPI订阅的永动机数据引擎
- **📈 市场数据 (可选)**: 集成 OpenBB v4统一路由多数据提供商详见 docs/openbb_integration.md
- **🎨 现代化界面**: 基于Streamlit的响应式Web界面
## 🏗️ 项目结构

View File

@ -95,9 +95,12 @@ def get_google_genai_config() -> Dict[str, str]:
Returns:
Google GenAI配置字典
"""
use_vertex_ai = get_secret('GOOGLE_GENAI_USE_VERTEXAI', 'FALSE').upper() == 'TRUE'
api_key = '' if use_vertex_ai else get_secret('GOOGLE_API_KEY', '')
return {
'api_key': get_secret('GOOGLE_API_KEY', ''),
'use_vertex_ai': get_secret('GOOGLE_GENAI_USE_VERTEXAI', 'FALSE'),
'api_key': api_key,
'use_vertex_ai': str(use_vertex_ai).upper(),
'project_id': get_secret('GOOGLE_CLOUD_PROJECT_ID', ''),
'location': get_secret('GOOGLE_CLOUD_LOCATION', 'us-central1'),
'memory_bank_enabled': get_secret('VERTEX_MEMORY_BANK_ENABLED', 'TRUE'),

View File

@ -0,0 +1,56 @@
# OpenBB 集成指南
本指南帮助你在本项目中启用并使用 OpenBB v4 作为市场数据源,同时保证在未安装 OpenBB 的情况下,应用可平稳回退到演示/合成数据。
## 1. 为什么选择 OpenBB v4
- 统一的路由接口:`from openbb import obb`
- 多数据提供商聚合(如 yfinance、polygon、fmp 等)
- 返回对象支持 `.results``.to_df()`,便于统一处理
## 2. 安装与环境准备
默认未安装 OpenBBrequirements.txt 中为可选依赖)。如需启用:
```bash
pip install "openbb>=4.1.0"
```
若你使用的是国内网络,建议配置合适的 PyPI 镜像或使用代理。
## 3. 配置说明
无需额外配置即可使用 `provider='yfinance'` 的公共数据。若你有付费数据源(如 polygon可通过环境变量或 OpenBB 的 provider 配置进行设置。
## 4. 代码结构与调用方式
- Streamlit UI: `app/tabs/openbb_tab.py`
- 自动检测 OpenBB 是否可用;若不可用则使用演示数据或合成数据
- 优先路由:`obb.equity.price.historical`ETF 回退至 `obb.etf.price.historical`
- 引擎模块: `src/jixia/engines/openbb_engine.py`
- 延迟导入 OpenBB首次调用时 `from openbb import obb`
- 对 `.results` / `.to_df()` / `.to_dataframe()` 做兼容处理
- 辅助脚本: `src/jixia/engines/openbb_stock_data.py`
- 延迟导入 `obb`
- ETF 历史数据路径更新为 `obb.etf.price.historical`
## 5. 回退机制
- UI 层OpenBB Tab
- 未安装 OpenBB 或请求失败:读取 `examples/data/*.json` 的演示数据;仍失败则生成合成数据
- 引擎层
- 若未安装 OpenBB返回 `success=False`,带错误消息,不影响其他功能
## 6. 开发与测试
- 单元测试建议:
- 未安装 OpenBB 时,`_load_price_data` 能返回演示/合成数据
- 已安装 OpenBB 时,能通过 `obb.equity.price.historical` 获取 DataFrame
- 在本仓库中新增了占位测试:`tests/test_openbb_fallback.py`
## 7. 典型问题排查
- ImportError: No module named 'openbb'
- 未安装 OpenBB按第2节安装。
- 返回空数据
- 检查 symbol 是否正确;尝试更换 provider 或缩短时间窗口。
- 列名/索引不匹配
- UI 中已对常见列/索引做了规范化处理;如仍异常,可打印原始 DataFrame 排查。
## 8. 后续计划
- 接入更多 OpenBB 路由(基本面、新闻、财报、因子)
- 与辩论系统结果联动,生成投资洞察卡片
- 支持用户自定义 provider 优先级与兜底策略

View File

@ -1,130 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
稷下学宫 Google ADK 论道系统测试
基于 Google ADK 的八仙论道原型
"""
import os
import asyncio
from google.adk import Agent
from google.adk.tools import FunctionTool
# 八仙智能体定义
def create_baxian_agents():
"""创建八仙智能体"""
# 铁拐李 - 逆向思维专家
tie_guai_li = Agent(
name="铁拐李",
model="gemini-2.5-flash"
)
# 汉钟离 - 平衡协调者
han_zhong_li = Agent(
name="汉钟离",
model="gemini-2.5-flash"
)
# 张果老 - 历史智慧者
zhang_guo_lao = Agent(
name="张果老",
model="gemini-2.5-flash"
)
# 蓝采和 - 创新思维者
lan_cai_he = Agent(
name="蓝采和",
model="gemini-2.5-flash"
)
# 何仙姑 - 直觉洞察者
he_xian_gu = Agent(
name="何仙姑",
model="gemini-2.5-flash"
)
# 吕洞宾 - 理性分析者
lu_dong_bin = Agent(
name="吕洞宾",
model="gemini-2.5-flash"
)
# 韩湘子 - 艺术感知者
han_xiang_zi = Agent(
name="韩湘子",
model="gemini-2.5-flash"
)
# 曹国舅 - 实务执行者
cao_guo_jiu = Agent(
name="曹国舅",
model="gemini-2.5-flash"
)
return {
"铁拐李": tie_guai_li,
"汉钟离": han_zhong_li,
"张果老": zhang_guo_lao,
"蓝采和": lan_cai_he,
"何仙姑": he_xian_gu,
"吕洞宾": lu_dong_bin,
"韩湘子": han_xiang_zi,
"曹国舅": cao_guo_jiu
}
def test_single_agent():
"""测试单个智能体"""
print("🧪 测试单个智能体...")
# 创建铁拐李智能体
tie_guai_li = Agent(
name="铁拐李",
model="gemini-2.5-flash"
)
print(f"✅ 智能体 '{tie_guai_li.name}' 创建成功")
print(f"📱 使用模型: {tie_guai_li.model}")
return tie_guai_li
def test_baxian_creation():
"""测试八仙智能体创建"""
print("\n🎭 创建八仙智能体...")
baxian = create_baxian_agents()
print(f"✅ 成功创建 {len(baxian)} 个智能体:")
for name, agent in baxian.items():
print(f" - {name}: {agent.model}")
return baxian
def main():
"""主测试函数"""
print("🚀 开始稷下学宫 ADK 论道系统测试...")
# 检查API密钥
api_key = os.getenv('GOOGLE_API_KEY')
if not api_key:
print("❌ 未找到 GOOGLE_API_KEY 环境变量")
print("请使用: doppler run -- python src/jixia/debates/adk_debate_test.py")
return
print(f"✅ API密钥已配置 (长度: {len(api_key)} 字符)")
# 测试单个智能体
single_agent = test_single_agent()
# 测试八仙智能体创建
baxian = test_baxian_creation()
print("\n🎉 ADK 论道系统基础测试完成!")
print("\n📝 下一步:")
print(" 1. 实现智能体间的对话逻辑")
print(" 2. 集成 RapidAPI 数据源")
print(" 3. 创建论道主题和流程")
print(" 4. 连接 Streamlit 界面")
if __name__ == "__main__":
main()

View File

@ -1,82 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
稷下学宫 ADK 简单论道测试
实现智能体间的基本对话功能
"""
import os
from google.adk import Agent
def create_debate_agents():
"""创建论道智能体"""
# 铁拐李 - 逆向思维专家
tie_guai_li = Agent(
name="铁拐李",
model="gemini-2.0-flash-exp"
)
# 吕洞宾 - 理性分析者
lu_dong_bin = Agent(
name="吕洞宾",
model="gemini-2.0-flash-exp"
)
return tie_guai_li, lu_dong_bin
def simple_debate_test():
"""简单论道测试"""
print("🎭 开始简单论道测试...")
# 创建智能体
tie_guai_li, lu_dong_bin = create_debate_agents()
print("\n📋 论道主题: 人工智能对未来社会的影响")
print("\n🎯 开始论道...")
try:
# 测试智能体创建
print("\n✅ 智能体创建成功:")
print(f" - {tie_guai_li.name}: {tie_guai_li.model}")
print(f" - {lu_dong_bin.name}: {lu_dong_bin.model}")
print("\n🎉 简单论道测试完成!")
print("\n📝 智能体基础功能验证成功")
except Exception as e:
print(f"❌ 论道测试失败: {e}")
return False
return True
def main():
"""主函数"""
print("🚀 稷下学宫 ADK 简单论道系统")
# 检查API密钥
api_key = os.getenv('GOOGLE_API_KEY')
if not api_key:
print("❌ 未找到 GOOGLE_API_KEY 环境变量")
print("请使用: doppler run -- python src/jixia/debates/adk_simple_debate.py")
return
print(f"✅ API密钥已配置")
# 运行测试
try:
result = simple_debate_test()
if result:
print("\n📝 测试结果: 成功")
print("\n🎯 下一步开发计划:")
print(" 1. 学习ADK的正确调用方式")
print(" 2. 实现智能体对话功能")
print(" 3. 扩展到八仙全员论道")
print(" 4. 集成实时数据源")
else:
print("\n❌ 测试失败")
except Exception as e:
print(f"❌ 运行失败: {e}")
if __name__ == "__main__":
main()

View File

@ -1,355 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
稷下学宫本地版 - 基于Ollama的四仙辩论系统
使用本地Ollama服务无需API密钥
"""
import asyncio
import json
from datetime import datetime
from swarm import Swarm, Agent
from typing import Dict, List, Any, Optional
import random
class JixiaOllamaSwarm:
"""稷下学宫本地版 - 使用Ollama的四仙辩论系统"""
def __init__(self):
# Ollama配置
self.ollama_base_url = "http://100.99.183.38:11434"
self.model_name = "gemma3n:e4b" # 使用你指定的模型
# 初始化Swarm客户端使用Ollama
from openai import OpenAI
openai_client = OpenAI(
api_key="ollama", # Ollama不需要真实的API密钥
base_url=f"{self.ollama_base_url}/v1"
)
self.client = Swarm(client=openai_client)
print(f"🦙 使用本地Ollama服务: {self.ollama_base_url}")
print(f"🤖 使用模型: {self.model_name}")
# 四仙配置
self.immortals = {
'吕洞宾': {
'role': '技术分析专家',
'stance': 'positive',
'specialty': '技术分析和图表解读',
'style': '犀利直接,一剑封喉'
},
'何仙姑': {
'role': '风险控制专家',
'stance': 'negative',
'specialty': '风险评估和资金管理',
'style': '温和坚定,关注风险'
},
'张果老': {
'role': '历史数据分析师',
'stance': 'positive',
'specialty': '历史回测和趋势分析',
'style': '博古通今,从历史找规律'
},
'铁拐李': {
'role': '逆向投资大师',
'stance': 'negative',
'specialty': '逆向思维和危机发现',
'style': '不拘一格,挑战共识'
}
}
# 创建智能体
self.agents = self.create_agents()
def create_agents(self) -> Dict[str, Agent]:
"""创建四仙智能体"""
agents = {}
# 吕洞宾 - 技术分析专家
agents['吕洞宾'] = Agent(
name="LuDongbin",
instructions="""
你是吕洞宾八仙之首技术分析专家
你的特点
- 擅长技术分析和图表解读
- 立场看涨派善于发现投资机会
- 风格犀利直接一剑封喉
在辩论中
1. 从技术分析角度分析市场
2. 使用具体的技术指标支撑观点如RSIMACD均线等
3. 保持看涨的乐观态度
4. 发言以"吕洞宾曰:"开头
5. 发言控制在100字以内简洁有力
6. 发言完毕后说"请何仙姑继续论道"
请用古雅但现代的语言风格结合专业的技术分析
""",
functions=[self.to_hexiangu]
)
# 何仙姑 - 风险控制专家
agents['何仙姑'] = Agent(
name="HeXiangu",
instructions="""
你是何仙姑八仙中唯一的女仙风险控制专家
你的特点
- 擅长风险评估和资金管理
- 立场看跌派关注投资风险
- 风格温和坚定关注风险控制
在辩论中
1. 从风险控制角度分析市场
2. 指出潜在的投资风险和危险信号
3. 保持谨慎的态度强调风险管理
4. 发言以"何仙姑曰:"开头
5. 发言控制在100字以内温和但坚定
6. 发言完毕后说"请张果老继续论道"
请用温和但专业的语调体现女性的细致和关怀
""",
functions=[self.to_zhangguolao]
)
# 张果老 - 历史数据分析师
agents['张果老'] = Agent(
name="ZhangGuoLao",
instructions="""
你是张果老历史数据分析师
你的特点
- 擅长历史回测和趋势分析
- 立场看涨派从历史中寻找机会
- 风格博古通今从历史中找规律
在辩论中
1. 从历史数据角度分析市场
2. 引用具体的历史案例和数据
3. 保持乐观的投资态度
4. 发言以"张果老曰:"开头
5. 发言控制在100字以内引经据典
6. 发言完毕后说"请铁拐李继续论道"
请用博学的语调多引用历史数据和案例
""",
functions=[self.to_tieguaili]
)
# 铁拐李 - 逆向投资大师
agents['铁拐李'] = Agent(
name="TieGuaiLi",
instructions="""
你是铁拐李逆向投资大师
你的特点
- 擅长逆向思维和危机发现
- 立场看跌派挑战主流观点
- 风格不拘一格敢于质疑
在辩论中
1. 从逆向投资角度分析市场
2. 挑战前面三位仙人的观点
3. 寻找市场的潜在危机和泡沫
4. 发言以"铁拐李曰:"开头
5. 作为最后发言者要总结四仙观点并给出结论
6. 发言控制在150字以内包含总结
请用直率犀利的语言体现逆向思维的独特视角
""",
functions=[] # 最后一个,不需要转换
)
return agents
def to_hexiangu(self):
"""转到何仙姑"""
return self.agents['何仙姑']
def to_zhangguolao(self):
"""转到张果老"""
return self.agents['张果老']
def to_tieguaili(self):
"""转到铁拐李"""
return self.agents['铁拐李']
async def conduct_debate(self, topic: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""进行四仙辩论"""
print("🏛️ 稷下学宫四仙论道开始!")
print("=" * 60)
print(f"🎯 论道主题: {topic}")
print(f"⏰ 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🦙 使用本地Ollama: {self.ollama_base_url}")
print()
# 构建初始提示
prompt = self.build_prompt(topic, context)
try:
print("⚔️ 吕洞宾仙长请先发言...")
print("-" * 40)
# 开始辩论
response = self.client.run(
agent=self.agents['吕洞宾'],
messages=[{"role": "user", "content": prompt}],
max_turns=8, # 四仙各发言一次,加上可能的交互
model_override=self.model_name
)
print("\n" + "=" * 60)
print("🎊 四仙论道圆满结束!")
# 处理结果
result = self.process_result(response, topic, context)
self.display_summary(result)
return result
except Exception as e:
print(f"❌ 论道过程中出错: {e}")
import traceback
traceback.print_exc()
return None
def build_prompt(self, topic: str, context: Dict[str, Any] = None) -> str:
"""构建辩论提示"""
context_str = ""
if context:
context_str = f"\n📊 市场背景:\n{json.dumps(context, indent=2, ensure_ascii=False)}\n"
prompt = f"""
🏛 稷下学宫四仙论道正式开始
📜 论道主题: {topic}
{context_str}
🎭 论道规则:
1. 四仙按序发言吕洞宾 何仙姑 张果老 铁拐李
2. 正反方交替吕洞宾(看涨) 何仙姑(看跌) 张果老(看涨) 铁拐李(看跌)
3. 每位仙人从专业角度分析提供具体数据支撑
4. 可以质疑前面仙人的观点但要有理有据
5. 保持仙风道骨的表达风格但要专业
6. 每次发言简洁有力控制在100字以内
7. 铁拐李作为最后发言者要总结观点
🗡 请吕洞宾仙长首先发言
记住你是技术分析专家要从技术面找到投资机会
发言要简洁有力一剑封喉
"""
return prompt
def process_result(self, response, topic: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""处理辩论结果"""
messages = response.messages if hasattr(response, 'messages') else []
debate_messages = []
for msg in messages:
if msg.get('role') == 'assistant' and msg.get('content'):
content = msg['content']
speaker = self.extract_speaker(content)
debate_messages.append({
'speaker': speaker,
'content': content,
'timestamp': datetime.now().isoformat(),
'stance': self.immortals.get(speaker, {}).get('stance', 'unknown')
})
return {
"debate_id": f"jixia_ollama_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"topic": topic,
"context": context,
"messages": debate_messages,
"final_output": debate_messages[-1]['content'] if debate_messages else "",
"timestamp": datetime.now().isoformat(),
"framework": "OpenAI Swarm + Ollama",
"model": self.model_name,
"ollama_url": self.ollama_base_url
}
def extract_speaker(self, content: str) -> str:
"""从内容中提取发言者"""
for name in self.immortals.keys():
if f"{name}" in content:
return name
return "未知仙人"
def display_summary(self, result: Dict[str, Any]):
"""显示辩论总结"""
print("\n🌟 四仙论道总结")
print("=" * 60)
print(f"📜 主题: {result['topic']}")
print(f"⏰ 时间: {result['timestamp']}")
print(f"🔧 框架: {result['framework']}")
print(f"🤖 模型: {result['model']}")
print(f"💬 发言数: {len(result['messages'])}")
# 统计正反方观点
positive_count = len([m for m in result['messages'] if m.get('stance') == 'positive'])
negative_count = len([m for m in result['messages'] if m.get('stance') == 'negative'])
print(f"📊 观点分布: 看涨{positive_count}条, 看跌{negative_count}")
print("\n🏆 最终总结:")
print("-" * 40)
if result['messages']:
print(result['final_output'])
print("\n✨ 本地辩论特色:")
print("🦙 使用本地Ollama无需API密钥")
print("🗡️ 四仙各展所长,观点多元")
print("⚖️ 正反方交替,辩论激烈")
print("🚀 基于Swarm性能优越")
print("🔒 完全本地运行,数据安全")
# 主函数
async def main():
"""主函数"""
print("🏛️ 稷下学宫本地版 - Ollama + Swarm")
print("🦙 使用本地Ollama服务无需API密钥")
print("🚀 四仙论道,完全本地运行")
print()
# 创建辩论系统
academy = JixiaOllamaSwarm()
# 辩论主题
topics = [
"英伟达股价走势AI泡沫还是技术革命",
"美联储2024年货币政策加息还是降息",
"比特币vs黄金谁是更好的避险资产",
"中国房地产市场:触底反弹还是继续下行?",
"特斯拉股价:马斯克效应还是基本面支撑?"
]
# 随机选择主题
topic = random.choice(topics)
# 市场背景
context = {
"market_sentiment": "谨慎乐观",
"volatility": "中等",
"key_events": ["财报季", "央行会议", "地缘政治"],
"technical_indicators": {
"RSI": 65,
"MACD": "金叉",
"MA20": "上穿"
}
}
# 开始辩论
result = await academy.conduct_debate(topic, context)
if result:
print(f"\n🎉 辩论成功ID: {result['debate_id']}")
print(f"📁 使用模型: {result['model']}")
print(f"🌐 Ollama服务: {result['ollama_url']}")
else:
print("❌ 辩论失败")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,361 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
稷下学宫简化版 - 基于OpenAI Swarm的四仙辩论系统
避免复杂的函数名称问题专注于辩论效果
"""
import os
import asyncio
import json
from datetime import datetime
from swarm import Swarm, Agent
from typing import Dict, List, Any, Optional
import random
class JixiaSimpleSwarm:
"""稷下学宫简化版 - 四仙辩论系统"""
def __init__(self):
# 使用Doppler配置
try:
from config.doppler_config import get_doppler_manager
manager = get_doppler_manager()
manager.load_config(force_doppler=True)
print("🔐 使用Doppler配置")
except Exception as e:
print(f"❌ Doppler配置失败: {e}")
raise
# 获取API密钥
self.api_key = self.get_api_key()
if self.api_key:
# 初始化Swarm客户端
from openai import OpenAI
openai_client = OpenAI(
api_key=self.api_key,
base_url="https://openrouter.ai/api/v1",
default_headers={
"HTTP-Referer": "https://github.com/ben/cauldron",
"X-Title": "Jixia Academy"
}
)
self.client = Swarm(client=openai_client)
else:
self.client = None
# 四仙配置
self.immortals = {
'吕洞宾': {
'role': '技术分析专家',
'stance': 'positive',
'specialty': '技术分析和图表解读',
'style': '犀利直接,一剑封喉'
},
'何仙姑': {
'role': '风险控制专家',
'stance': 'negative',
'specialty': '风险评估和资金管理',
'style': '温和坚定,关注风险'
},
'张果老': {
'role': '历史数据分析师',
'stance': 'positive',
'specialty': '历史回测和趋势分析',
'style': '博古通今,从历史找规律'
},
'铁拐李': {
'role': '逆向投资大师',
'stance': 'negative',
'specialty': '逆向思维和危机发现',
'style': '不拘一格,挑战共识'
}
}
# 创建智能体
self.agents = self.create_agents()
def get_api_key(self):
"""获取API密钥"""
api_keys = [
os.getenv('OPENROUTER_API_KEY_1'),
os.getenv('OPENROUTER_API_KEY_2'),
os.getenv('OPENROUTER_API_KEY_3'),
os.getenv('OPENROUTER_API_KEY_4')
]
for key in api_keys:
if key and key.startswith('sk-'):
print(f"✅ 找到API密钥: {key[:20]}...")
return key
print("❌ 未找到有效的API密钥")
return None
def create_agents(self) -> Dict[str, Agent]:
"""创建四仙智能体"""
if not self.client:
return {}
agents = {}
# 吕洞宾 - 技术分析专家
agents['吕洞宾'] = Agent(
name="LuDongbin",
instructions="""
你是吕洞宾八仙之首技术分析专家
你的特点
- 擅长技术分析和图表解读
- 立场看涨派善于发现投资机会
- 风格犀利直接一剑封喉
在辩论中
1. 从技术分析角度分析市场
2. 使用具体的技术指标支撑观点
3. 保持看涨的乐观态度
4. 发言以"吕洞宾曰:"开头
5. 发言完毕后说"请何仙姑继续论道"
""",
functions=[self.to_hexiangu]
)
# 何仙姑 - 风险控制专家
agents['何仙姑'] = Agent(
name="HeXiangu",
instructions="""
你是何仙姑八仙中唯一的女仙风险控制专家
你的特点
- 擅长风险评估和资金管理
- 立场看跌派关注投资风险
- 风格温和坚定关注风险控制
在辩论中
1. 从风险控制角度分析市场
2. 指出潜在的投资风险
3. 保持谨慎的态度
4. 发言以"何仙姑曰:"开头
5. 发言完毕后说"请张果老继续论道"
""",
functions=[self.to_zhangguolao]
)
# 张果老 - 历史数据分析师
agents['张果老'] = Agent(
name="ZhangGuoLao",
instructions="""
你是张果老历史数据分析师
你的特点
- 擅长历史回测和趋势分析
- 立场看涨派从历史中寻找机会
- 风格博古通今从历史中找规律
在辩论中
1. 从历史数据角度分析市场
2. 引用历史案例和数据
3. 保持乐观的投资态度
4. 发言以"张果老曰:"开头
5. 发言完毕后说"请铁拐李继续论道"
""",
functions=[self.to_tieguaili]
)
# 铁拐李 - 逆向投资大师
agents['铁拐李'] = Agent(
name="TieGuaiLi",
instructions="""
你是铁拐李逆向投资大师
你的特点
- 擅长逆向思维和危机发现
- 立场看跌派挑战主流观点
- 风格不拘一格敢于质疑
在辩论中
1. 从逆向投资角度分析市场
2. 挑战前面仙人的观点
3. 寻找市场的潜在危机
4. 发言以"铁拐李曰:"开头
5. 作为最后发言者要总结四仙观点并给出结论
""",
functions=[] # 最后一个,不需要转换
)
return agents
def to_hexiangu(self):
"""转到何仙姑"""
return self.agents['何仙姑']
def to_zhangguolao(self):
"""转到张果老"""
return self.agents['张果老']
def to_tieguaili(self):
"""转到铁拐李"""
return self.agents['铁拐李']
async def conduct_debate(self, topic: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""进行四仙辩论"""
if not self.client:
print("❌ 客户端未初始化,无法进行辩论")
return None
print("🏛️ 稷下学宫四仙论道开始!")
print("=" * 60)
print(f"🎯 论道主题: {topic}")
print(f"⏰ 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()
# 构建初始提示
prompt = self.build_prompt(topic, context)
try:
print("⚔️ 吕洞宾仙长请先发言...")
print("-" * 40)
# 开始辩论
response = self.client.run(
agent=self.agents['吕洞宾'],
messages=[{"role": "user", "content": prompt}],
max_turns=10,
model_override="openai/gpt-3.5-turbo" # 使用稳定的模型
)
print("\n" + "=" * 60)
print("🎊 四仙论道圆满结束!")
# 处理结果
result = self.process_result(response, topic, context)
self.display_summary(result)
return result
except Exception as e:
print(f"❌ 论道过程中出错: {e}")
import traceback
traceback.print_exc()
return None
def build_prompt(self, topic: str, context: Dict[str, Any] = None) -> str:
"""构建辩论提示"""
context_str = ""
if context:
context_str = f"\n📊 市场背景:\n{json.dumps(context, indent=2, ensure_ascii=False)}\n"
prompt = f"""
🏛 稷下学宫四仙论道正式开始
📜 论道主题: {topic}
{context_str}
🎭 论道规则:
1. 四仙按序发言吕洞宾 何仙姑 张果老 铁拐李
2. 正反方交替吕洞宾(看涨) 何仙姑(看跌) 张果老(看涨) 铁拐李(看跌)
3. 每位仙人从专业角度分析提供具体数据支撑
4. 可以质疑前面仙人的观点
5. 保持仙风道骨的表达风格
6. 铁拐李作为最后发言者要总结观点
🗡 请吕洞宾仙长首先发言
记住你是技术分析专家要从技术面找到投资机会
"""
return prompt
def process_result(self, response, topic: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""处理辩论结果"""
messages = response.messages if hasattr(response, 'messages') else []
debate_messages = []
for msg in messages:
if msg.get('role') == 'assistant' and msg.get('content'):
content = msg['content']
speaker = self.extract_speaker(content)
debate_messages.append({
'speaker': speaker,
'content': content,
'timestamp': datetime.now().isoformat()
})
return {
"debate_id": f"jixia_simple_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"topic": topic,
"context": context,
"messages": debate_messages,
"final_output": debate_messages[-1]['content'] if debate_messages else "",
"timestamp": datetime.now().isoformat(),
"framework": "OpenAI Swarm (Simplified)"
}
def extract_speaker(self, content: str) -> str:
"""从内容中提取发言者"""
for name in self.immortals.keys():
if f"{name}" in content:
return name
return "未知仙人"
def display_summary(self, result: Dict[str, Any]):
"""显示辩论总结"""
print("\n🌟 四仙论道总结")
print("=" * 60)
print(f"📜 主题: {result['topic']}")
print(f"⏰ 时间: {result['timestamp']}")
print(f"🔧 框架: {result['framework']}")
print(f"💬 发言数: {len(result['messages'])}")
print("\n🏆 最终总结:")
print("-" * 40)
if result['messages']:
print(result['final_output'])
print("\n✨ 辩论特色:")
print("🗡️ 四仙各展所长,观点多元")
print("⚖️ 正反方交替,辩论激烈")
print("🚀 基于Swarm性能优越")
# 主函数
async def main():
"""主函数"""
print("🏛️ 稷下学宫简化版 - OpenAI Swarm")
print("🚀 四仙论道,简洁高效")
print()
# 创建辩论系统
academy = JixiaSimpleSwarm()
if not academy.client:
print("❌ 系统初始化失败")
return
# 辩论主题
topics = [
"英伟达股价走势AI泡沫还是技术革命",
"美联储2024年货币政策加息还是降息",
"比特币vs黄金谁是更好的避险资产",
"中国房地产市场:触底反弹还是继续下行?"
]
# 随机选择主题
topic = random.choice(topics)
# 市场背景
context = {
"market_sentiment": "谨慎乐观",
"volatility": "中等",
"key_events": ["财报季", "央行会议", "地缘政治"]
}
# 开始辩论
result = await academy.conduct_debate(topic, context)
if result:
print(f"\n🎉 辩论成功ID: {result['debate_id']}")
else:
print("❌ 辩论失败")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -52,8 +52,8 @@ google-cloud-aiplatform>=1.38.0
PyYAML>=6.0
python-frontmatter>=1.0.0
# 市场数据 - OpenBB可选安装
openbb>=4.1.0
# 市场数据 - OpenBB可选安装,未默认安装。若需启用,请取消下一行注释
# openbb>=4.1.0
# 新增:从 .env 加载本地环境变量
python-dotenv>=1.0.1

View File

@ -6,7 +6,7 @@ OpenBB 集成引擎
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import openbb
@dataclass
class ImmortalConfig:
@ -18,7 +18,7 @@ class ImmortalConfig:
class APIResult:
"""API调用结果数据类"""
success: bool
data: Optional[Dict[str, Any]] = None
data: Optional[Any] = None
provider_used: Optional[str] = None
error: Optional[str] = None
@ -29,6 +29,9 @@ class OpenBBEngine:
"""
初始化 OpenBB 引擎
"""
# 延迟导入 OpenBB避免未安装时报错
self._obb = None
# 八仙专属数据源分配
self.immortal_sources: Dict[str, ImmortalConfig] = {
'吕洞宾': ImmortalConfig( # 乾-技术分析专家
@ -67,6 +70,18 @@ class OpenBBEngine:
print("✅ OpenBB 引擎初始化完成")
def _ensure_openbb(self):
"""Lazy import OpenBB v4 obb router."""
if self._obb is not None:
return True
try:
from openbb import obb # type: ignore
self._obb = obb
return True
except Exception:
self._obb = None
return False
def get_immortal_data(self, immortal_name: str, data_type: str, symbol: str = 'AAPL') -> APIResult:
"""
为特定八仙获取专属数据
@ -88,56 +103,59 @@ class OpenBBEngine:
# 根据数据类型调用不同的 OpenBB 函数
try:
if not self._ensure_openbb():
return APIResult(success=False, error='OpenBB 未安装,请先安装 openbb>=4 并在 requirements.txt 启用')
obb = self._obb
if data_type == 'price':
result = openbb.obb.equity.price.quote(symbol=symbol, provider=immortal_config.primary)
result = obb.equity.price.quote(symbol=symbol, provider=immortal_config.primary)
return APIResult(
success=True,
data=result.results,
data=getattr(result, 'results', getattr(result, 'to_dict', lambda: None)()),
provider_used=immortal_config.primary
)
elif data_type == 'historical':
result = openbb.obb.equity.price.historical(symbol=symbol, provider=immortal_config.primary)
result = obb.equity.price.historical(symbol=symbol, provider=immortal_config.primary)
return APIResult(
success=True,
data=result.results,
data=getattr(result, 'results', getattr(result, 'to_dict', lambda: None)()),
provider_used=immortal_config.primary
)
elif data_type == 'profile':
result = openbb.obb.equity.profile(symbol=symbol, provider=immortal_config.primary)
result = obb.equity.profile(symbol=symbol, provider=immortal_config.primary)
return APIResult(
success=True,
data=result.results,
data=getattr(result, 'results', getattr(result, 'to_dict', lambda: None)()),
provider_used=immortal_config.primary
)
elif data_type == 'news':
result = openbb.obb.news.company(symbol=symbol)
result = obb.news.company(symbol=symbol)
return APIResult(
success=True,
data=result.results,
data=getattr(result, 'results', getattr(result, 'to_dict', lambda: None)()),
provider_used='news_api'
)
elif data_type == 'earnings':
result = openbb.obb.equity.earnings.earnings_historical(symbol=symbol, provider=immortal_config.primary)
result = obb.equity.earnings.earnings_historical(symbol=symbol, provider=immortal_config.primary)
return APIResult(
success=True,
data=result.results,
data=getattr(result, 'results', getattr(result, 'to_dict', lambda: None)()),
provider_used=immortal_config.primary
)
elif data_type == 'dividends':
result = openbb.obb.equity.fundamental.dividend(symbol=symbol, provider=immortal_config.primary)
result = obb.equity.fundamental.dividend(symbol=symbol, provider=immortal_config.primary)
return APIResult(
success=True,
data=result.results,
data=getattr(result, 'results', getattr(result, 'to_dict', lambda: None)()),
provider_used=immortal_config.primary
)
elif data_type == 'screener':
# 使用简单的筛选器作为替代
result = openbb.obb.equity.screener.etf(
result = obb.equity.screener.etf(
provider=immortal_config.primary
)
return APIResult(
success=True,
data=result.results,
data=getattr(result, 'results', getattr(result, 'to_dict', lambda: None)()),
provider_used=immortal_config.primary
)
else:

View File

@ -3,7 +3,6 @@
OpenBB 股票数据获取模块
"""
import openbb
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
@ -26,17 +25,24 @@ def get_stock_data(symbol: str, days: int = 90) -> Optional[List[Dict[str, Any]]
print(f"🔍 正在获取 {symbol}{days} 天的数据...")
print(f" 时间范围: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
# 使用OpenBB获取数据
result = openbb.obb.equity.price.historical(
# 使用OpenBB获取数据延迟导入
try:
from openbb import obb # type: ignore
except Exception as e:
print(f"⚠️ OpenBB 未安装或导入失败: {e}")
return None
result = obb.equity.price.historical(
symbol=symbol,
provider='yfinance',
start_date=start_date.strftime('%Y-%m-%d'),
end_date=end_date.strftime('%Y-%m-%d')
)
if result and result.results:
print(f"✅ 成功获取 {len(result.results)} 条记录")
return result.results
results = getattr(result, 'results', None)
if results:
print(f"✅ 成功获取 {len(results)} 条记录")
return results
else:
print("❌ 未获取到数据")
return None
@ -64,17 +70,24 @@ def get_etf_data(symbol: str, days: int = 90) -> Optional[List[Dict[str, Any]]]:
print(f"🔍 正在获取 {symbol}{days} 天的数据...")
print(f" 时间范围: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
# 使用OpenBB获取数据
result = openbb.obb.etf.historical(
# 使用OpenBB获取数据延迟导入
try:
from openbb import obb # type: ignore
except Exception as e:
print(f"⚠️ OpenBB 未安装或导入失败: {e}")
return None
result = obb.etf.price.historical(
symbol=symbol,
provider='yfinance',
start_date=start_date.strftime('%Y-%m-%d'),
end_date=end_date.strftime('%Y-%m-%d')
)
if result and result.results:
print(f"✅ 成功获取 {len(result.results)} 条记录")
return result.results
results = getattr(result, 'results', None)
if results:
print(f"✅ 成功获取 {len(results)} 条记录")
return results
else:
print("❌ 未获取到数据")
return None

View File

@ -0,0 +1,26 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
验证在未安装 OpenBB OpenBB Tab 的数据加载回退行为
该测试不强制要求安装 OpenBB因此仅检查函数能返回非空 DataFrame
"""
import importlib
import types
import pandas as pd
from app.tabs.openbb_tab import _load_price_data
def test_openbb_fallback_without_openbb():
# 尝试卸载 openbb 以模拟未安装环境(若本地未安装会抛错,忽略)
try:
if 'openbb' in list(importlib.sys.modules.keys()):
del importlib.sys.modules['openbb']
except Exception:
pass
df = _load_price_data('AAPL', 180)
assert isinstance(df, pd.DataFrame)
assert not df.empty
assert 'Date' in df.columns and 'Close' in df.columns

121
tmp_rovodev_vertex_check.py Normal file
View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
临时健康检查脚本验证本机 Vertex AI 基础配置是否完好
使用说明
python3 tmp_rovodev_vertex_check.py
运行完成后请将输出粘贴给我我会据此给出修复建议
完成诊断后可删除该文件
"""
import os
import json
import sys
from pathlib import Path
STATUS = []
def ok(msg):
STATUS.append((True, msg))
print(f"{msg}")
def warn(msg):
STATUS.append((None, msg))
print(f"⚠️ {msg}")
def fail(msg):
STATUS.append((False, msg))
print(f"{msg}")
def has(var):
return bool(os.getenv(var))
print("🧪 Vertex AI 本地配置健康检查\n")
# 1) 关键环境变量与文件
adc_path = Path.home() / ".config/gcloud/application_default_credentials.json"
if adc_path.exists():
ok(f"找到 ADC (Application Default Credentials): {adc_path}")
else:
warn("未找到 ADC 文件 (~/.config/gcloud/application_default_credentials.json)。如果你改用服务账号密钥也可忽略此项")
vars_to_check = [
"GOOGLE_GENAI_USE_VERTEXAI",
"GOOGLE_CLOUD_PROJECT_ID",
"GOOGLE_CLOUD_LOCATION",
"VERTEX_MEMORY_BANK_ENABLED",
"GOOGLE_API_KEY",
"GOOGLE_SERVICE_ACCOUNT_KEY",
]
print("\n🔍 环境变量:")
for v in vars_to_check:
val = os.getenv(v)
if not val:
if v in ("GOOGLE_SERVICE_ACCOUNT_KEY", "GOOGLE_API_KEY"):
# 可选
warn(f"{v}: 未设置 (可选)")
else:
fail(f"{v}: 未设置")
else:
if v.endswith("KEY"):
ok(f"{v}: 已设置 (不显示具体值)")
else:
ok(f"{v}: {val}")
# 2) 依赖与版本
print("\n🔍 依赖google-cloud-aiplatform")
try:
import google
from google.cloud import aiplatform
ok(f"google-cloud-aiplatform 可导入,版本: {aiplatform.__version__}")
except Exception as e:
fail(f"无法导入 google-cloud-aiplatform: {e}")
# 3) 验证默认凭据可用性(若存在)
print("\n🔍 默认凭据 (ADC) 可用性:")
try:
import google.auth
from google.auth.transport.requests import Request
creds, proj = google.auth.default(scopes=[
"https://www.googleapis.com/auth/cloud-platform",
])
# 刷新一次,验证有效性
try:
creds.refresh(Request())
ok("默认凭据可用并成功刷新访问令牌")
except Exception as e:
warn(f"找到默认凭据,但刷新失败(可能未登录或网络受限):{e}")
if proj:
ok(f"默认凭据解析到的 Project: {proj}")
else:
warn("默认凭据未解析出 Project ID")
except Exception as e:
warn(f"未能通过 google.auth.default() 获取默认凭据:{e}")
# 4) 初始化 Vertex AI使用环境中的 Project/Location
print("\n🔍 Vertex AI 初始化:")
project = os.getenv("GOOGLE_CLOUD_PROJECT_ID")
location = os.getenv("GOOGLE_CLOUD_LOCATION", "us-central1")
if project:
try:
from google.cloud import aiplatform
aiplatform.init(project=project, location=location)
ok(f"Vertex AI 初始化成功: project={project}, location={location}")
except Exception as e:
fail(f"Vertex AI 初始化失败: {e}")
else:
fail("缺少 GOOGLE_CLOUD_PROJECT_ID无法初始化 Vertex AI")
# 5) 汇总
print("\n📋 检查摘要:")
all_good = True
for s, msg in STATUS:
if s is False:
all_good = False
if all_good:
print("\n🎉 结论:看起来你的 Vertex AI 本地配置是可用的。")
else:
print("\n🚧 结论:存在未通过项。请根据上面的 ❌/⚠️ 提示逐项修复后重试。")
print("\n提示:\n- 启用 Vertex 模式export GOOGLE_GENAI_USE_VERTEXAI=TRUE\n- 必填export GOOGLE_CLOUD_PROJECT_ID=你的项目ID\n- 可选export GOOGLE_CLOUD_LOCATION=us-central1\n- 建议使用 ADCgcloud auth application-default login\n- 或设置服务账号密钥export GOOGLE_SERVICE_ACCOUNT_KEY='(JSON 字符串或路径)'\n")