feat: 重构项目结构并增强安全性

- 将文档和脚本移动到更合适的目录结构
- 删除敏感信息并替换为Doppler环境变量引用
- 新增GitGuardian配置以加强代码安全扫描
- 实现MongoDB向量搜索索引创建脚本
- 添加文章embedding生成脚本
- 新增Alpha Vantage测试脚本
- 重构八仙辩论系统架构
- 实现swarm辩论触发机制
- 新增MongoDB与Swarm集成示例
- 完善RapidAPI负载均衡策略文档

所有修改均遵循安全最佳实践,敏感信息不再硬编码在代码中
This commit is contained in:
ben 2025-08-02 16:58:12 +00:00
parent 6b464b6e07
commit 4d58c6f938
24 changed files with 4162 additions and 504 deletions

View File

@ -1,9 +1,12 @@
# MongoDB Atlas Connection # MongoDB Atlas Connection (managed by Doppler)
MONGODB_URI=mongodb+srv://username:password@cluster.mongodb.net/ # MONGODB_URI=mongodb+srv://username:password@cluster.mongodb.net/
# Database Configuration # Database Configuration
MONGODB_DATABASE=taigong MONGODB_DATABASE=taigong
# Swarm Debate Configuration # Swarm Debate Configuration
SWARM_THRESHOLD=5 SWARM_THRESHOLD=5
SWARM_TIME_WINDOW_HOURS=24 SWARM_TIME_WINDOW_HOURS=24
# Note: Sensitive secrets like MONGODB_URI are managed by Doppler
# Run: doppler secrets set MONGODB_URI "your-connection-string"

45
.gitguardian.yaml Normal file
View File

@ -0,0 +1,45 @@
# GitGuardian configuration for liurenchaxin project
version: 2
# Paths to exclude from scanning
paths_ignore:
- venv/
- .git/
- node_modules/
- __pycache__/
- "*.pyc"
- "*.log"
- .kiro/
- examples/
- tests/fixtures/
- tools/
# File patterns to exclude
patterns_ignore:
- "*.min.js"
- "*.bundle.js"
- "*.map"
- "*.lock"
- "package-lock.json"
- "yarn.lock"
# Only scan specific file types for secrets
match_policy:
- "*.py"
- "*.js"
- "*.ts"
- "*.json"
- "*.yaml"
- "*.yml"
- "*.env.example"
- "*.md"
- "*.sh"
# Exclude .env files from scanning (they should be in .gitignore anyway)
paths_ignore_patterns:
- ".env"
- ".env.local"
- ".env.production"
# Secret detection settings
secret_scan_preference: secret

View File

@ -4,7 +4,7 @@
基于Doppler配置和实际测试记录当前RapidAPI订阅的API服务及其可用性。 基于Doppler配置和实际测试记录当前RapidAPI订阅的API服务及其可用性。
**API密钥**: `6731900a13msh816fbe854209ac2p1bded2jsn1538144d52a4` **API密钥**: `[REDACTED - 从Doppler获取RAPIDAPI_KEY]`
**总订阅数**: 17个API服务 **总订阅数**: 17个API服务
**最后更新**: 2025-08-02 **最后更新**: 2025-08-02
@ -193,4 +193,4 @@
**维护者**: Ben **维护者**: Ben
**联系方式**: 通过Doppler配置管理API密钥 **联系方式**: 通过Doppler配置管理API密钥
**更新频率**: 每次API测试后更新 **更新频率**: 每次API测试后更新

View File

@ -0,0 +1,341 @@
# 🏛️ 稷下学宫八仙论道负载分担策略
## 📋 概述
基于现有的RapidAPI订阅和雅虎财经数据接口设计一套智能负载分担策略让八仙论道系统中的不同角色调用不同的API端点获取相同类型的数据实现API负载均衡和系统稳定性。
## 🎯 核心理念
**"同样的数据,不同的路径"** - 通过多API轮换获取相同类型的市场数据避免单一API过载确保系统稳定运行。
## 📊 可用API资源清单
### ✅ 已验证可用的API (4个)
```python
AVAILABLE_APIS = {
'alpha_vantage': {
'host': 'alpha-vantage.p.rapidapi.com',
'response_time': 1.26,
'rate_limit': '500/min, 500k/month',
'specialty': ['stock_quote', 'company_overview', 'earnings']
},
'yahoo_finance_15': {
'host': 'yahoo-finance15.p.rapidapi.com',
'response_time': 2.07,
'rate_limit': '500/min, 500k/month',
'specialty': ['stock_quote', 'market_movers', 'news']
},
'webull': {
'host': 'webull.p.rapidapi.com',
'response_time': 1.56,
'rate_limit': '500/min, 500k/month',
'specialty': ['stock_search', 'market_gainers']
},
'seeking_alpha': {
'host': 'seeking-alpha.p.rapidapi.com',
'response_time': 3.32,
'rate_limit': '500/min, 500k/month',
'specialty': ['company_profile', 'market_analysis']
}
}
```
## 🎭 八仙负载分担配置
### 数据类型映射策略
```python
DATA_TYPE_API_MAPPING = {
# 股票报价数据 - 3个API可提供
'stock_quote': {
'吕洞宾': 'alpha_vantage', # 主力剑仙用最快的API
'何仙姑': 'yahoo_finance_15', # 风控专家用稳定的API
'张果老': 'webull', # 技术分析师用搜索强的API
'韩湘子': 'alpha_vantage', # 基本面研究用专业API
'汉钟离': 'yahoo_finance_15', # 量化专家用市场数据API
'蓝采和': 'webull', # 情绪分析师用活跃数据API
'曹国舅': 'seeking_alpha', # 宏观分析师用分析API
'铁拐李': 'alpha_vantage' # 逆向投资用基础数据API
},
# 公司概览数据 - 2个API可提供
'company_overview': {
'吕洞宾': 'alpha_vantage', # 技术分析需要完整数据
'何仙姑': 'seeking_alpha', # 风控需要分析师观点
'张果老': 'alpha_vantage', # 技术分析偏好数据API
'韩湘子': 'seeking_alpha', # 基本面研究需要深度分析
'汉钟离': 'alpha_vantage', # 量化需要结构化数据
'蓝采和': 'seeking_alpha', # 情绪分析需要市场观点
'曹国舅': 'seeking_alpha', # 宏观分析需要专业观点
'铁拐李': 'alpha_vantage' # 逆向投资需要基础数据
},
# 市场动态数据 - 2个API可提供
'market_movers': {
'吕洞宾': 'yahoo_finance_15', # 剑仙关注市场热点
'何仙姑': 'webull', # 风控关注活跃股票
'张果老': 'yahoo_finance_15', # 技术分析关注涨跌榜
'韩湘子': 'webull', # 基本面研究关注搜索热度
'汉钟离': 'yahoo_finance_15', # 量化关注市场数据
'蓝采和': 'webull', # 情绪分析关注活跃度
'曹国舅': 'yahoo_finance_15', # 宏观关注整体趋势
'铁拐李': 'webull' # 逆向投资关注异常股票
},
# 新闻和分析数据 - 2个API可提供
'market_news': {
'吕洞宾': 'yahoo_finance_15', # 剑仙需要快速资讯
'何仙姑': 'seeking_alpha', # 风控需要深度分析
'张果老': 'yahoo_finance_15', # 技术分析关注市场新闻
'韩湘子': 'seeking_alpha', # 基本面需要分析师观点
'汉钟离': 'yahoo_finance_15', # 量化关注数据驱动新闻
'蓝采和': 'seeking_alpha', # 情绪分析需要市场情绪
'曹国舅': 'seeking_alpha', # 宏观需要政策分析
'铁拐李': 'yahoo_finance_15' # 逆向投资关注反向指标
}
}
```
## 🔄 智能轮换策略
### 1. 时间窗口轮换
```python
TIME_BASED_ROTATION = {
# 交易时段 (9:30-16:00 EST) - 优先使用快速API
'trading_hours': {
'primary_apis': ['alpha_vantage', 'webull'],
'backup_apis': ['yahoo_finance_15', 'seeking_alpha']
},
# 非交易时段 - 可以使用较慢但更详细的API
'after_hours': {
'primary_apis': ['seeking_alpha', 'yahoo_finance_15'],
'backup_apis': ['alpha_vantage', 'webull']
}
}
```
### 2. 负载感知轮换
```python
LOAD_AWARE_ROTATION = {
# 当某个API接近限制时自动切换到其他API
'rate_limit_thresholds': {
'alpha_vantage': 450, # 90% of 500/min
'yahoo_finance_15': 450,
'webull': 450,
'seeking_alpha': 450
},
# 故障转移优先级
'failover_priority': {
'alpha_vantage': ['webull', 'yahoo_finance_15'],
'yahoo_finance_15': ['webull', 'alpha_vantage'],
'webull': ['alpha_vantage', 'yahoo_finance_15'],
'seeking_alpha': ['yahoo_finance_15', 'alpha_vantage']
}
}
```
## 🏗️ 实现架构
### 核心组件
```python
class JixiaLoadBalancer:
"""稷下学宫负载均衡器"""
def __init__(self):
self.api_pool = APIPool(AVAILABLE_APIS)
self.immortal_mapping = DATA_TYPE_API_MAPPING
self.rate_limiter = RateLimiter()
self.health_checker = APIHealthChecker()
def get_data_for_immortal(self, immortal_name: str, data_type: str, symbol: str):
"""为特定仙人获取数据"""
# 1. 获取该仙人的首选API
preferred_api = self.immortal_mapping[data_type][immortal_name]
# 2. 检查API健康状态和速率限制
if self.is_api_available(preferred_api):
return self.call_api(preferred_api, data_type, symbol)
# 3. 故障转移到备用API
backup_apis = LOAD_AWARE_ROTATION['failover_priority'][preferred_api]
for backup_api in backup_apis:
if self.is_api_available(backup_api):
return self.call_api(backup_api, data_type, symbol)
# 4. 如果所有API都不可用返回缓存数据
return self.get_cached_data(data_type, symbol)
def is_api_available(self, api_name: str) -> bool:
"""检查API是否可用"""
# 检查健康状态
if not self.health_checker.is_healthy(api_name):
return False
# 检查速率限制
if self.rate_limiter.is_rate_limited(api_name):
return False
return True
```
### 数据统一化处理
```python
class DataNormalizer:
"""数据标准化处理器"""
def normalize_stock_quote(self, raw_data: dict, api_source: str) -> dict:
"""将不同API的股票报价数据标准化"""
if api_source == 'alpha_vantage':
return self._normalize_alpha_vantage_quote(raw_data)
elif api_source == 'yahoo_finance_15':
return self._normalize_yahoo_quote(raw_data)
elif api_source == 'webull':
return self._normalize_webull_quote(raw_data)
def _normalize_alpha_vantage_quote(self, data: dict) -> dict:
"""标准化Alpha Vantage数据格式"""
global_quote = data.get('Global Quote', {})
return {
'symbol': global_quote.get('01. symbol'),
'price': float(global_quote.get('05. price', 0)),
'change': float(global_quote.get('09. change', 0)),
'change_percent': global_quote.get('10. change percent', '0%'),
'volume': int(global_quote.get('06. volume', 0)),
'source': 'alpha_vantage'
}
def _normalize_yahoo_quote(self, data: dict) -> dict:
"""标准化Yahoo Finance数据格式"""
body = data.get('body', {})
return {
'symbol': body.get('symbol'),
'price': float(body.get('regularMarketPrice', 0)),
'change': float(body.get('regularMarketChange', 0)),
'change_percent': f"{body.get('regularMarketChangePercent', 0):.2f}%",
'volume': int(body.get('regularMarketVolume', 0)),
'source': 'yahoo_finance_15'
}
```
## 📊 监控和统计
### API使用统计
```python
class APIUsageMonitor:
"""API使用监控器"""
def __init__(self):
self.usage_stats = {
'alpha_vantage': {'calls': 0, 'errors': 0, 'avg_response_time': 0},
'yahoo_finance_15': {'calls': 0, 'errors': 0, 'avg_response_time': 0},
'webull': {'calls': 0, 'errors': 0, 'avg_response_time': 0},
'seeking_alpha': {'calls': 0, 'errors': 0, 'avg_response_time': 0}
}
def record_api_call(self, api_name: str, response_time: float, success: bool):
"""记录API调用统计"""
stats = self.usage_stats[api_name]
stats['calls'] += 1
if not success:
stats['errors'] += 1
# 更新平均响应时间
current_avg = stats['avg_response_time']
total_calls = stats['calls']
stats['avg_response_time'] = (current_avg * (total_calls - 1) + response_time) / total_calls
def get_load_distribution(self) -> dict:
"""获取负载分布统计"""
total_calls = sum(stats['calls'] for stats in self.usage_stats.values())
if total_calls == 0:
return {}
return {
api: {
'percentage': (stats['calls'] / total_calls) * 100,
'success_rate': ((stats['calls'] - stats['errors']) / stats['calls']) * 100 if stats['calls'] > 0 else 0,
'avg_response_time': stats['avg_response_time']
}
for api, stats in self.usage_stats.items()
}
```
## 🎯 实施计划
### 第一阶段:基础负载均衡
1. **实现核心负载均衡器** - 基本的API轮换逻辑
2. **数据标准化处理** - 统一不同API的数据格式
3. **简单故障转移** - 基本的备用API切换
### 第二阶段:智能优化
1. **速率限制监控** - 实时监控API使用情况
2. **健康检查机制** - 定期检测API可用性
3. **性能优化** - 基于响应时间优化API选择
### 第三阶段:高级功能
1. **预测性负载均衡** - 基于历史数据预测API负载
2. **成本优化** - 基于API成本优化调用策略
3. **实时监控面板** - 可视化API使用情况
## 📈 预期效果
### 性能提升
- **API负载分散**: 单个API负载降低60-70%
- **系统稳定性**: 故障率降低80%以上
- **响应速度**: 平均响应时间提升30%
### 成本控制
- **API使用优化**: 避免单一API过度使用
- **故障恢复**: 减少因API故障导致的数据缺失
- **扩展性**: 支持更多API的无缝接入
## 🔧 配置示例
### 环境配置
```bash
# Doppler环境变量
RAPIDAPI_KEY=your_rapidapi_key
ALPHA_VANTAGE_API_KEY=your_alpha_vantage_key
# 负载均衡配置
LOAD_BALANCER_ENABLED=true
API_HEALTH_CHECK_INTERVAL=300 # 5分钟
RATE_LIMIT_BUFFER=50 # 保留50个请求的缓冲
```
### 使用示例
```python
# 在稷下学宫系统中使用
from jixia_load_balancer import JixiaLoadBalancer
load_balancer = JixiaLoadBalancer()
# 八仙论道时,每个仙人获取数据
for immortal in ['吕洞宾', '何仙姑', '张果老', '韩湘子', '汉钟离', '蓝采和', '曹国舅', '铁拐李']:
quote_data = load_balancer.get_data_for_immortal(immortal, 'stock_quote', 'TSLA')
overview_data = load_balancer.get_data_for_immortal(immortal, 'company_overview', 'TSLA')
print(f"{immortal}: 获取到{quote_data['source']}的数据")
```
---
## 🎉 总结
通过这套负载分担策略,稷下学宫八仙论道系统可以:
1. **智能分配API调用** - 不同仙人使用不同API获取相同数据
2. **实现真正的负载均衡** - 避免单一API过载
3. **提高系统稳定性** - 多重故障转移保障
4. **优化成本效益** - 充分利用现有API资源
5. **支持无缝扩展** - 新API可轻松接入系统
**"八仙过海,各显神通"** - 让每个仙人都有自己的数据获取路径,共同构建稳定可靠的智能投资决策系统!🚀

View File

@ -2,7 +2,7 @@
## 📊 总体概况 ## 📊 总体概况
**API Key**: `6731900a13msh816fbe854209ac2p1bded2jsn1538144d52a4` **API Key**: `[REDACTED - 从Doppler获取RAPIDAPI_KEY]`
**订阅总数**: 16个 (根据控制台显示) **订阅总数**: 16个 (根据控制台显示)
**24小时调用**: 9次 **24小时调用**: 9次
**已确认可用**: 4个核心API **已确认可用**: 4个核心API

View File

@ -64,7 +64,7 @@
verifier = LingbaoFieldVerifier( verifier = LingbaoFieldVerifier(
openmanus_url="your-openmanus-url", openmanus_url="your-openmanus-url",
api_key="your-openmanus-key", api_key="your-openmanus-key",
openhands_api_key="hA04ZDQbdKUbBCqmN5ZPFkcdK0xsKLwX" openhands_api_key="[REDACTED - 从Doppler获取OPENHANDS_API_KEY]"
) )
# 执行验证 # 执行验证
@ -110,7 +110,7 @@ from src.core.openhands_integration import LingbaoOpenHandsVerifier
# 创建验证器 # 创建验证器
verifier = LingbaoOpenHandsVerifier( verifier = LingbaoOpenHandsVerifier(
api_key="hA04ZDQbdKUbBCqmN5ZPFkcdK0xsKLwX" api_key="[REDACTED - 从Doppler获取OPENHANDS_API_KEY]"
) )
# 验证辩论结论 # 验证辩论结论
@ -132,7 +132,7 @@ from src.core.lingbao_field_verification import LingbaoFieldVerifier
verifier = LingbaoFieldVerifier( verifier = LingbaoFieldVerifier(
openmanus_url="your-openmanus-url", openmanus_url="your-openmanus-url",
api_key="your-api-key", api_key="your-api-key",
openhands_api_key="hA04ZDQbdKUbBCqmN5ZPFkcdK0xsKLwX" openhands_api_key="[REDACTED - 从Doppler获取OPENHANDS_API_KEY]"
) )
# 执行完整验证流程 # 执行完整验证流程
@ -190,7 +190,7 @@ tianzun_report = await verifier.verify_debate_result(debate_result)
```bash ```bash
# OpenHands配置 # OpenHands配置
OPENHANDS_API_KEY=hA04ZDQbdKUbBCqmN5ZPFkcdK0xsKLwX OPENHANDS_API_KEY=[REDACTED - 从Doppler获取OPENHANDS_API_KEY]
OPENHANDS_BASE_URL=https://app.all-hands.dev OPENHANDS_BASE_URL=https://app.all-hands.dev
OPENHANDS_TIMEOUT=300 OPENHANDS_TIMEOUT=300
@ -224,7 +224,7 @@ pip install aiohttp pydantic
```python ```python
# 在.env文件中配置 # 在.env文件中配置
OPENHANDS_API_KEY=hA04ZDQbdKUbBCqmN5ZPFkcdK0xsKLwX OPENHANDS_API_KEY=[REDACTED - 从Doppler获取OPENHANDS_API_KEY]
``` ```
### 3. 运行演示 ### 3. 运行演示

2884
meta_analysis_results.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +0,0 @@
{
"error": "无法连接到数据库"
}

View File

@ -9,7 +9,9 @@ from pymongo import MongoClient
def add_sequence_ids(): def add_sequence_ids():
"""为现有文章添加流水号""" """为现有文章添加流水号"""
# 连接MongoDB # 连接MongoDB
mongo_uri = os.getenv('MONGODB_URI', 'mongodb+srv://ben:313131@cauldron.tx3qnoq.mongodb.net/') mongo_uri = os.getenv('MONGODB_URI')
if not mongo_uri:
raise ValueError("MONGODB_URI environment variable is required")
client = MongoClient(mongo_uri) client = MongoClient(mongo_uri)
db = client['taigong'] db = client['taigong']
collection = db['articles'] collection = db['articles']

View File

@ -21,7 +21,9 @@ def generate_stable_id(title, pub_date, content):
def cleanup_duplicates(): def cleanup_duplicates():
"""清理重复数据""" """清理重复数据"""
# 连接MongoDB # 连接MongoDB
mongo_uri = os.getenv('MONGODB_URI', 'mongodb+srv://ben:313131@cauldron.tx3qnoq.mongodb.net/') mongo_uri = os.getenv('MONGODB_URI')
if not mongo_uri:
raise ValueError("MONGODB_URI environment variable is required")
client = MongoClient(mongo_uri) client = MongoClient(mongo_uri)
db = client['taigong'] db = client['taigong']
collection = db['articles'] collection = db['articles']

View File

@ -0,0 +1,35 @@
// MongoDB Atlas Vector Search Index Creation Script
// 为swarm辩论系统创建向量索引
// 连接到数据库
use('taigong');
// 创建向量索引用于语义搜索和内容聚类
// 这个索引将支持swarm辩论系统的语义相似性匹配
db.articles.createSearchIndex(
"vector_search_index",
{
"fields": [
{
"type": "vector",
"path": "embedding",
"numDimensions": 1536, // OpenAI text-embedding-ada-002 维度
"similarity": "cosine"
},
{
"type": "filter",
"path": "published_time"
},
{
"type": "filter",
"path": "title"
}
]
}
);
print("向量索引创建完成!");
print("索引名称: vector_search_index");
print("向量维度: 1536 (OpenAI embedding)");
print("相似性算法: cosine");
print("支持过滤字段: published_time, title");

View File

@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
为MongoDB中的文章生成向量embeddings
用于swarm辩论系统的语义搜索和内容聚类
"""
import os
import openai
from pymongo import MongoClient
from typing import List, Dict
import time
def get_mongodb_client():
"""从Doppler获取MongoDB连接"""
mongodb_uri = os.getenv('MONGODB_URI')
if not mongodb_uri:
raise ValueError("MONGODB_URI not found in environment variables")
return MongoClient(mongodb_uri)
def generate_embedding(text: str) -> List[float]:
"""使用OpenAI API生成文本embedding"""
try:
response = openai.Embedding.create(
model="text-embedding-ada-002",
input=text
)
return response['data'][0]['embedding']
except Exception as e:
print(f"生成embedding失败: {e}")
return None
def update_articles_with_embeddings():
"""为所有文章添加embedding字段"""
client = get_mongodb_client()
db = client.taigong
collection = db.articles
# 获取所有没有embedding的文章
articles = collection.find({"embedding": {"$exists": False}})
count = 0
for article in articles:
title = article.get('title', '')
if not title:
continue
print(f"处理文章: {title[:50]}...")
# 生成embedding
embedding = generate_embedding(title)
if embedding:
# 更新文档
collection.update_one(
{"_id": article["_id"]},
{"$set": {"embedding": embedding}}
)
count += 1
print(f"✓ 已更新 {count} 篇文章")
# 避免API rate limit
time.sleep(0.1)
else:
print(f"× 跳过文章: {title[:50]}")
print(f"\n完成!共处理 {count} 篇文章")
client.close()
if __name__ == "__main__":
# 设置OpenAI API密钥 (应该从Doppler获取)
openai.api_key = os.getenv('OPENAI_API_KEY')
if not openai.api_key:
print("警告: OPENAI_API_KEY 未设置请先在Doppler中配置")
exit(1)
update_articles_with_embeddings()

View File

@ -0,0 +1,462 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
稷下学宫负载均衡器
实现八仙论道的API负载分担策略
"""
import time
import random
import requests
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from collections import defaultdict
import json
import os
@dataclass
class APIResult:
"""API调用结果"""
success: bool
data: Dict[str, Any]
api_used: str
response_time: float
error: Optional[str] = None
cached: bool = False
class RateLimiter:
"""速率限制器"""
def __init__(self):
self.api_calls = defaultdict(list)
self.limits = {
'alpha_vantage': {'per_minute': 500, 'per_month': 500000},
'yahoo_finance_15': {'per_minute': 500, 'per_month': 500000},
'webull': {'per_minute': 500, 'per_month': 500000},
'seeking_alpha': {'per_minute': 500, 'per_month': 500000}
}
def is_rate_limited(self, api_name: str) -> bool:
"""检查是否达到速率限制"""
now = time.time()
calls = self.api_calls[api_name]
# 清理1分钟前的记录
self.api_calls[api_name] = [call_time for call_time in calls if now - call_time < 60]
# 检查每分钟限制
if len(self.api_calls[api_name]) >= self.limits[api_name]['per_minute'] * 0.9: # 90%阈值
return True
return False
def record_call(self, api_name: str):
"""记录API调用"""
self.api_calls[api_name].append(time.time())
class APIHealthChecker:
"""API健康检查器"""
def __init__(self):
self.health_status = {
'alpha_vantage': {'healthy': True, 'last_check': 0, 'consecutive_failures': 0},
'yahoo_finance_15': {'healthy': True, 'last_check': 0, 'consecutive_failures': 0},
'webull': {'healthy': True, 'last_check': 0, 'consecutive_failures': 0},
'seeking_alpha': {'healthy': True, 'last_check': 0, 'consecutive_failures': 0}
}
self.check_interval = 300 # 5分钟检查一次
def is_healthy(self, api_name: str) -> bool:
"""检查API是否健康"""
status = self.health_status[api_name]
now = time.time()
# 如果距离上次检查超过间隔时间,进行健康检查
if now - status['last_check'] > self.check_interval:
self._perform_health_check(api_name)
return status['healthy']
def _perform_health_check(self, api_name: str):
"""执行健康检查"""
# 这里可以实现具体的健康检查逻辑
# 暂时简化为基于连续失败次数判断
status = self.health_status[api_name]
status['last_check'] = time.time()
# 如果连续失败超过3次标记为不健康
if status['consecutive_failures'] > 3:
status['healthy'] = False
else:
status['healthy'] = True
def record_success(self, api_name: str):
"""记录成功调用"""
self.health_status[api_name]['consecutive_failures'] = 0
self.health_status[api_name]['healthy'] = True
def record_failure(self, api_name: str):
"""记录失败调用"""
self.health_status[api_name]['consecutive_failures'] += 1
class DataNormalizer:
"""数据标准化处理器"""
def normalize_stock_quote(self, raw_data: dict, api_source: str) -> dict:
"""将不同API的股票报价数据标准化"""
try:
if api_source == 'alpha_vantage':
return self._normalize_alpha_vantage_quote(raw_data)
elif api_source == 'yahoo_finance_15':
return self._normalize_yahoo_quote(raw_data)
elif api_source == 'webull':
return self._normalize_webull_quote(raw_data)
elif api_source == 'seeking_alpha':
return self._normalize_seeking_alpha_quote(raw_data)
else:
return {'error': f'Unknown API source: {api_source}'}
except Exception as e:
return {'error': f'Data normalization failed: {str(e)}'}
def _normalize_alpha_vantage_quote(self, data: dict) -> dict:
"""标准化Alpha Vantage数据格式"""
global_quote = data.get('Global Quote', {})
return {
'symbol': global_quote.get('01. symbol'),
'price': float(global_quote.get('05. price', 0)),
'change': float(global_quote.get('09. change', 0)),
'change_percent': global_quote.get('10. change percent', '0%'),
'volume': int(global_quote.get('06. volume', 0)),
'high': float(global_quote.get('03. high', 0)),
'low': float(global_quote.get('04. low', 0)),
'source': 'alpha_vantage',
'timestamp': global_quote.get('07. latest trading day')
}
def _normalize_yahoo_quote(self, data: dict) -> dict:
"""标准化Yahoo Finance数据格式"""
body = data.get('body', {})
return {
'symbol': body.get('symbol'),
'price': float(body.get('regularMarketPrice', 0)),
'change': float(body.get('regularMarketChange', 0)),
'change_percent': f"{body.get('regularMarketChangePercent', 0):.2f}%",
'volume': int(body.get('regularMarketVolume', 0)),
'high': float(body.get('regularMarketDayHigh', 0)),
'low': float(body.get('regularMarketDayLow', 0)),
'source': 'yahoo_finance_15',
'timestamp': body.get('regularMarketTime')
}
def _normalize_webull_quote(self, data: dict) -> dict:
"""标准化Webull数据格式"""
if 'stocks' in data and len(data['stocks']) > 0:
stock = data['stocks'][0]
return {
'symbol': stock.get('symbol'),
'price': float(stock.get('close', 0)),
'change': float(stock.get('change', 0)),
'change_percent': f"{stock.get('changeRatio', 0):.2f}%",
'volume': int(stock.get('volume', 0)),
'high': float(stock.get('high', 0)),
'low': float(stock.get('low', 0)),
'source': 'webull',
'timestamp': stock.get('timeStamp')
}
return {'error': 'No stock data found in Webull response'}
def _normalize_seeking_alpha_quote(self, data: dict) -> dict:
"""标准化Seeking Alpha数据格式"""
if 'data' in data and len(data['data']) > 0:
stock_data = data['data'][0]
attributes = stock_data.get('attributes', {})
return {
'symbol': attributes.get('slug'),
'price': float(attributes.get('lastPrice', 0)),
'change': float(attributes.get('dayChange', 0)),
'change_percent': f"{attributes.get('dayChangePercent', 0):.2f}%",
'volume': int(attributes.get('volume', 0)),
'source': 'seeking_alpha',
'market_cap': attributes.get('marketCap'),
'pe_ratio': attributes.get('peRatio')
}
return {'error': 'No data found in Seeking Alpha response'}
class JixiaLoadBalancer:
"""稷下学宫负载均衡器"""
def __init__(self, rapidapi_key: str):
self.rapidapi_key = rapidapi_key
self.rate_limiter = RateLimiter()
self.health_checker = APIHealthChecker()
self.data_normalizer = DataNormalizer()
self.cache = {} # 简单的内存缓存
self.cache_ttl = 300 # 5分钟缓存
# API配置
self.api_configs = {
'alpha_vantage': {
'host': 'alpha-vantage.p.rapidapi.com',
'endpoints': {
'stock_quote': '/query?function=GLOBAL_QUOTE&symbol={symbol}',
'company_overview': '/query?function=OVERVIEW&symbol={symbol}',
'earnings': '/query?function=EARNINGS&symbol={symbol}'
}
},
'yahoo_finance_15': {
'host': 'yahoo-finance15.p.rapidapi.com',
'endpoints': {
'stock_quote': '/api/yahoo/qu/quote/{symbol}',
'market_movers': '/api/yahoo/co/collections/day_gainers',
'market_news': '/api/yahoo/ne/news'
}
},
'webull': {
'host': 'webull.p.rapidapi.com',
'endpoints': {
'stock_quote': '/stock/search?keyword={symbol}',
'market_movers': '/market/get-active-gainers'
}
},
'seeking_alpha': {
'host': 'seeking-alpha.p.rapidapi.com',
'endpoints': {
'company_overview': '/symbols/get-profile?symbols={symbol}',
'market_news': '/news/list?category=market-news'
}
}
}
# 八仙API分配策略
self.immortal_api_mapping = {
'stock_quote': {
'吕洞宾': 'alpha_vantage', # 主力剑仙用最快的API
'何仙姑': 'yahoo_finance_15', # 风控专家用稳定的API
'张果老': 'webull', # 技术分析师用搜索强的API
'韩湘子': 'alpha_vantage', # 基本面研究用专业API
'汉钟离': 'yahoo_finance_15', # 量化专家用市场数据API
'蓝采和': 'webull', # 情绪分析师用活跃数据API
'曹国舅': 'seeking_alpha', # 宏观分析师用分析API
'铁拐李': 'alpha_vantage' # 逆向投资用基础数据API
},
'company_overview': {
'吕洞宾': 'alpha_vantage',
'何仙姑': 'seeking_alpha',
'张果老': 'alpha_vantage',
'韩湘子': 'seeking_alpha',
'汉钟离': 'alpha_vantage',
'蓝采和': 'seeking_alpha',
'曹国舅': 'seeking_alpha',
'铁拐李': 'alpha_vantage'
},
'market_movers': {
'吕洞宾': 'yahoo_finance_15',
'何仙姑': 'webull',
'张果老': 'yahoo_finance_15',
'韩湘子': 'webull',
'汉钟离': 'yahoo_finance_15',
'蓝采和': 'webull',
'曹国舅': 'yahoo_finance_15',
'铁拐李': 'webull'
},
'market_news': {
'吕洞宾': 'yahoo_finance_15',
'何仙姑': 'seeking_alpha',
'张果老': 'yahoo_finance_15',
'韩湘子': 'seeking_alpha',
'汉钟离': 'yahoo_finance_15',
'蓝采和': 'seeking_alpha',
'曹国舅': 'seeking_alpha',
'铁拐李': 'yahoo_finance_15'
}
}
# 故障转移优先级
self.failover_priority = {
'alpha_vantage': ['webull', 'yahoo_finance_15'],
'yahoo_finance_15': ['webull', 'alpha_vantage'],
'webull': ['alpha_vantage', 'yahoo_finance_15'],
'seeking_alpha': ['yahoo_finance_15', 'alpha_vantage']
}
def get_data_for_immortal(self, immortal_name: str, data_type: str, symbol: str = None) -> APIResult:
"""为特定仙人获取数据"""
print(f"🎭 {immortal_name} 正在获取 {data_type} 数据...")
# 检查缓存
cache_key = f"{immortal_name}_{data_type}_{symbol}"
cached_result = self._get_cached_data(cache_key)
if cached_result:
print(f" 📦 使用缓存数据")
return cached_result
# 获取该仙人的首选API
if data_type not in self.immortal_api_mapping:
return APIResult(False, {}, '', 0, f"Unsupported data type: {data_type}")
preferred_api = self.immortal_api_mapping[data_type][immortal_name]
# 尝试首选API
result = self._try_api(preferred_api, data_type, symbol)
if result.success:
self._cache_data(cache_key, result)
print(f" ✅ 成功从 {preferred_api} 获取数据 (响应时间: {result.response_time:.2f}s)")
return result
# 故障转移到备用API
print(f" ⚠️ {preferred_api} 不可用尝试备用API...")
backup_apis = self.failover_priority.get(preferred_api, [])
for backup_api in backup_apis:
if data_type in self.api_configs[backup_api]['endpoints']:
result = self._try_api(backup_api, data_type, symbol)
if result.success:
self._cache_data(cache_key, result)
print(f" ✅ 成功从备用API {backup_api} 获取数据 (响应时间: {result.response_time:.2f}s)")
return result
# 所有API都失败
print(f" ❌ 所有API都不可用")
return APIResult(False, {}, '', 0, "All APIs failed")
def _try_api(self, api_name: str, data_type: str, symbol: str = None) -> APIResult:
"""尝试调用指定API"""
# 检查API健康状态和速率限制
if not self.health_checker.is_healthy(api_name):
return APIResult(False, {}, api_name, 0, "API is unhealthy")
if self.rate_limiter.is_rate_limited(api_name):
return APIResult(False, {}, api_name, 0, "Rate limited")
# 构建请求
config = self.api_configs[api_name]
if data_type not in config['endpoints']:
return APIResult(False, {}, api_name, 0, f"Endpoint {data_type} not supported")
endpoint = config['endpoints'][data_type]
if symbol and '{symbol}' in endpoint:
endpoint = endpoint.format(symbol=symbol)
url = f"https://{config['host']}{endpoint}"
headers = {
'X-RapidAPI-Key': self.rapidapi_key,
'X-RapidAPI-Host': config['host']
}
# 发起请求
start_time = time.time()
try:
response = requests.get(url, headers=headers, timeout=10)
response_time = time.time() - start_time
self.rate_limiter.record_call(api_name)
if response.status_code == 200:
data = response.json()
# 数据标准化
if data_type == 'stock_quote':
normalized_data = self.data_normalizer.normalize_stock_quote(data, api_name)
else:
normalized_data = data
self.health_checker.record_success(api_name)
return APIResult(True, normalized_data, api_name, response_time)
else:
error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
self.health_checker.record_failure(api_name)
return APIResult(False, {}, api_name, response_time, error_msg)
except Exception as e:
response_time = time.time() - start_time
self.health_checker.record_failure(api_name)
return APIResult(False, {}, api_name, response_time, str(e))
def _get_cached_data(self, cache_key: str) -> Optional[APIResult]:
"""获取缓存数据"""
if cache_key in self.cache:
cached_item = self.cache[cache_key]
if time.time() - cached_item['timestamp'] < self.cache_ttl:
result = cached_item['result']
result.cached = True
return result
else:
# 缓存过期,删除
del self.cache[cache_key]
return None
def _cache_data(self, cache_key: str, result: APIResult):
"""缓存数据"""
self.cache[cache_key] = {
'result': result,
'timestamp': time.time()
}
def get_load_distribution(self) -> dict:
"""获取负载分布统计"""
api_calls = {}
total_calls = 0
for api_name, calls in self.rate_limiter.api_calls.items():
call_count = len(calls)
api_calls[api_name] = call_count
total_calls += call_count
if total_calls == 0:
return {}
distribution = {}
for api_name, call_count in api_calls.items():
health_status = self.health_checker.health_status[api_name]
distribution[api_name] = {
'calls': call_count,
'percentage': (call_count / total_calls) * 100,
'healthy': health_status['healthy'],
'consecutive_failures': health_status['consecutive_failures']
}
return distribution
def conduct_immortal_debate(self, topic_symbol: str) -> Dict[str, APIResult]:
"""进行八仙论道,每个仙人获取不同的数据"""
print(f"\n🏛️ 稷下学宫八仙论道开始 - 主题: {topic_symbol}")
print("=" * 60)
immortals = ['吕洞宾', '何仙姑', '张果老', '韩湘子', '汉钟离', '蓝采和', '曹国舅', '铁拐李']
debate_results = {}
# 每个仙人获取股票报价数据
for immortal in immortals:
result = self.get_data_for_immortal(immortal, 'stock_quote', topic_symbol)
debate_results[immortal] = result
if result.success:
data = result.data
if 'price' in data:
print(f" 💰 {immortal}: ${data['price']:.2f} ({data.get('change_percent', 'N/A')}) via {result.api_used}")
time.sleep(0.2) # 避免过快请求
print("\n📊 负载分布统计:")
distribution = self.get_load_distribution()
for api_name, stats in distribution.items():
print(f" {api_name}: {stats['calls']} 次调用 ({stats['percentage']:.1f}%) - {'健康' if stats['healthy'] else '异常'}")
return debate_results
# 使用示例
if __name__ == "__main__":
# 从环境变量获取API密钥
rapidapi_key = os.getenv('RAPIDAPI_KEY')
if not rapidapi_key:
print("❌ 请设置RAPIDAPI_KEY环境变量")
exit(1)
# 创建负载均衡器
load_balancer = JixiaLoadBalancer(rapidapi_key)
# 进行八仙论道
results = load_balancer.conduct_immortal_debate('TSLA')
print("\n🎉 八仙论道完成!")

View File

@ -0,0 +1,30 @@
/* global use, db */
// MongoDB Playground
// Use Ctrl+Space inside a snippet or a string literal to trigger completions.
// The current database to use.
use('taigong');
// Search for documents in the current collection.
db.getCollection('articles')
.find(
{
/*
* Filter
* fieldA: value or expression
*/
},
{
/*
* Projection
* _id: 0, // exclude _id
* fieldA: 1 // include field
*/
}
)
.sort({
/*
* fieldA: 1 // ascending
* fieldB: -1 // descending
*/
});

148
src/swarm_trigger.py Normal file
View File

@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
Swarm辩论触发器
基于时间群聚效应和语义相似性触发蜂群辩论
"""
import os
from datetime import datetime, timedelta
from pymongo import MongoClient
from typing import List, Dict, Optional
import numpy as np
class SwarmDebateTrigger:
def __init__(self):
self.mongodb_uri = os.getenv('MONGODB_URI')
self.client = MongoClient(self.mongodb_uri)
self.db = self.client.taigong
self.collection = self.db.articles
# 配置参数
self.swarm_threshold = int(os.getenv('SWARM_THRESHOLD', 5))
self.time_window_hours = int(os.getenv('SWARM_TIME_WINDOW_HOURS', 24))
def detect_time_clustering(self) -> List[Dict]:
"""检测时间窗口内的文章群聚效应"""
# 计算时间窗口
now = datetime.utcnow()
time_threshold = now - timedelta(hours=self.time_window_hours)
# 使用published_time_index查询最近文章
recent_articles = list(self.collection.find({
"published_time": {"$gte": time_threshold}
}).sort("published_time", -1))
print(f"最近{self.time_window_hours}小时内发现 {len(recent_articles)} 篇文章")
if len(recent_articles) >= self.swarm_threshold:
print(f"✓ 触发群聚效应!文章数量({len(recent_articles)}) >= 阈值({self.swarm_threshold})")
return recent_articles
else:
print(f"× 未达到群聚阈值,需要至少 {self.swarm_threshold} 篇文章")
return []
def find_semantic_clusters(self, articles: List[Dict], similarity_threshold: float = 0.8) -> List[List[Dict]]:
"""基于向量相似性找到语义聚类"""
if not articles:
return []
# 过滤有embedding的文章
articles_with_embeddings = [
article for article in articles
if 'embedding' in article and article['embedding']
]
if len(articles_with_embeddings) < 2:
print("× 没有足够的embedding数据进行语义聚类")
return [articles_with_embeddings] if articles_with_embeddings else []
print(f"{len(articles_with_embeddings)} 篇文章进行语义聚类分析...")
# 简单的相似性聚类算法
clusters = []
used_indices = set()
for i, article1 in enumerate(articles_with_embeddings):
if i in used_indices:
continue
cluster = [article1]
used_indices.add(i)
for j, article2 in enumerate(articles_with_embeddings):
if j in used_indices or i == j:
continue
# 计算余弦相似度
similarity = self.cosine_similarity(
article1['embedding'],
article2['embedding']
)
if similarity >= similarity_threshold:
cluster.append(article2)
used_indices.add(j)
if len(cluster) >= 2: # 至少2篇文章才算一个有效聚类
clusters.append(cluster)
print(f"✓ 发现语义聚类,包含 {len(cluster)} 篇相关文章")
return clusters
def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""计算两个向量的余弦相似度"""
try:
vec1 = np.array(vec1)
vec2 = np.array(vec2)
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0
return dot_product / (norm1 * norm2)
except Exception as e:
print(f"计算相似度失败: {e}")
return 0
def trigger_swarm_debate(self, clusters: List[List[Dict]]) -> bool:
"""触发swarm蜂群辩论"""
if not clusters:
print("× 没有发现有效的语义聚类,不触发辩论")
return False
print(f"\n🔥 触发Swarm蜂群辩论")
print(f"发现 {len(clusters)} 个语义聚类")
for i, cluster in enumerate(clusters):
print(f"\n聚类 {i+1}: {len(cluster)} 篇文章")
for article in cluster:
title = article.get('title', '无标题')[:50]
time_str = article.get('published_time', '').strftime('%Y-%m-%d %H:%M') if article.get('published_time') else '未知时间'
print(f" - {title}... ({time_str})")
# TODO: 在这里调用实际的辩论系统
# 例如: jixia_swarm_debate(clusters)
return True
def run(self) -> bool:
"""运行swarm辩论触发检测"""
print("🔍 开始检测swarm辩论触发条件...")
# 1. 检测时间群聚效应
recent_articles = self.detect_time_clustering()
if not recent_articles:
return False
# 2. 进行语义聚类分析
semantic_clusters = self.find_semantic_clusters(recent_articles)
# 3. 触发辩论
return self.trigger_swarm_debate(semantic_clusters)
if __name__ == "__main__":
trigger = SwarmDebateTrigger()
trigger.run()

122
test_alpha_vantage_meta.py Normal file
View File

@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
Alpha Vantage API 测试脚本 - Meta (META) 财报和分析师评级
"""
import os
import requests
import json
from datetime import datetime
def get_alpha_vantage_key():
"""从环境变量获取 Alpha Vantage API Key"""
api_key = os.getenv('ALPHA_VANTAGE_API_KEY')
if not api_key:
raise ValueError("未找到 ALPHA_VANTAGE_API_KEY 环境变量")
return api_key
def get_company_overview(symbol, api_key):
"""获取公司基本信息和财务概览"""
url = f"https://www.alphavantage.co/query"
params = {
'function': 'OVERVIEW',
'symbol': symbol,
'apikey': api_key
}
response = requests.get(url, params=params)
return response.json()
def get_earnings_data(symbol, api_key):
"""获取财报数据"""
url = f"https://www.alphavantage.co/query"
params = {
'function': 'EARNINGS',
'symbol': symbol,
'apikey': api_key
}
response = requests.get(url, params=params)
return response.json()
def get_analyst_ratings(symbol, api_key):
"""获取分析师评级(需要付费版本,这里尝试调用看是否有数据)"""
url = f"https://www.alphavantage.co/query"
params = {
'function': 'ANALYST_RECOMMENDATIONS',
'symbol': symbol,
'apikey': api_key
}
response = requests.get(url, params=params)
return response.json()
def get_income_statement(symbol, api_key):
"""获取损益表"""
url = f"https://www.alphavantage.co/query"
params = {
'function': 'INCOME_STATEMENT',
'symbol': symbol,
'apikey': api_key
}
response = requests.get(url, params=params)
return response.json()
def format_financial_data(data, title):
"""格式化财务数据输出"""
print(f"\n{'='*60}")
print(f"{title}")
print(f"{'='*60}")
if isinstance(data, dict):
if 'Error Message' in data:
print(f"❌ 错误: {data['Error Message']}")
elif 'Note' in data:
print(f"⚠️ 注意: {data['Note']}")
else:
print(json.dumps(data, indent=2, ensure_ascii=False))
else:
print(data)
def main():
"""主函数"""
try:
# 获取 API Key
api_key = get_alpha_vantage_key()
print(f"✅ 成功获取 Alpha Vantage API Key: {api_key[:8]}...")
symbol = "META" # Meta Platforms Inc.
print(f"\n🔍 正在获取 {symbol} 的财务数据...")
# 1. 公司概览
print("\n📊 获取公司概览...")
overview = get_company_overview(symbol, api_key)
format_financial_data(overview, f"{symbol} - 公司概览")
# 2. 财报数据
print("\n📈 获取财报数据...")
earnings = get_earnings_data(symbol, api_key)
format_financial_data(earnings, f"{symbol} - 财报数据")
# 3. 分析师评级
print("\n⭐ 获取分析师评级...")
ratings = get_analyst_ratings(symbol, api_key)
format_financial_data(ratings, f"{symbol} - 分析师评级")
# 4. 损益表
print("\n💰 获取损益表...")
income_statement = get_income_statement(symbol, api_key)
format_financial_data(income_statement, f"{symbol} - 损益表")
print(f"\n{symbol} 数据获取完成!")
print(f"⏰ 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
except Exception as e:
print(f"❌ 错误: {str(e)}")
return 1
return 0
if __name__ == "__main__":
exit(main())

View File

@ -1,488 +0,0 @@
#!/usr/bin/env python3
"""
RSS数据读取测试器
测试从MongoDB读取RSS新闻数据并分析索引需求
"""
import asyncio
import json
import logging
import time
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Any, Optional
from src.mcp.swarm_mongodb_client import SwarmMongoDBClient
class RSSDataReader:
"""RSS数据读取器和分析器"""
def __init__(self, mongodb_client: SwarmMongoDBClient, database_name: str = "news_debate_db"):
self.mongodb_client = mongodb_client
self.database_name = database_name
self.collection_name = "news_articles"
self.logger = logging.getLogger(__name__)
async def connect_to_database(self) -> bool:
"""连接到数据库"""
try:
result = self.mongodb_client.connect(self.database_name)
if result.get('success'):
self.logger.info(f"成功连接到数据库: {self.database_name}")
return True
else:
self.logger.error(f"数据库连接失败: {result}")
return False
except Exception as e:
self.logger.error(f"数据库连接异常: {e}")
return False
async def get_collection_stats(self) -> Dict[str, Any]:
"""获取集合统计信息"""
try:
# 获取文档总数
count_result = self.mongodb_client.count_documents(self.collection_name)
total_count = count_result.get('count', 0) if count_result.get('success') else 0
# 获取最新的几条记录来分析数据结构
latest_docs = self.mongodb_client.find_documents(
self.collection_name,
query={},
sort={'collected_at': -1},
limit=5
)
# 获取最早的记录
earliest_docs = self.mongodb_client.find_documents(
self.collection_name,
query={},
sort={'collected_at': 1},
limit=1
)
stats = {
'total_documents': total_count,
'latest_documents': latest_docs.get('documents', []) if latest_docs.get('success') else [],
'earliest_document': earliest_docs.get('documents', []) if earliest_docs.get('success') else [],
'collection_exists': total_count > 0
}
return stats
except Exception as e:
self.logger.error(f"获取集合统计信息失败: {e}")
return {'error': str(e)}
async def analyze_data_structure(self, sample_size: int = 10) -> Dict[str, Any]:
"""分析数据结构"""
try:
# 获取样本数据
sample_result = self.mongodb_client.find_documents(
self.collection_name,
query={},
limit=sample_size
)
if not sample_result.get('success'):
return {'error': '无法获取样本数据'}
documents = sample_result.get('documents', [])
if not documents:
return {'error': '没有找到任何文档'}
# 分析字段结构
field_analysis = {}
for doc in documents:
for field, value in doc.items():
if field not in field_analysis:
field_analysis[field] = {
'type': type(value).__name__,
'sample_values': [],
'count': 0
}
field_analysis[field]['count'] += 1
if len(field_analysis[field]['sample_values']) < 3:
field_analysis[field]['sample_values'].append(str(value)[:100]) # 限制长度
# 分析常见查询字段
query_fields = {
'title': '标题搜索',
'category': '分类筛选',
'published': '时间范围查询',
'collected_at': '收集时间排序',
'tags': '标签搜索',
'source_title': '来源筛选'
}
return {
'sample_count': len(documents),
'field_analysis': field_analysis,
'recommended_query_fields': query_fields,
'sample_document': documents[0] if documents else None
}
except Exception as e:
self.logger.error(f"数据结构分析失败: {e}")
return {'error': str(e)}
async def test_query_performance(self) -> Dict[str, Any]:
"""测试查询性能"""
performance_results = {}
# 测试不同类型的查询
test_queries = [
{
'name': '全表扫描',
'query': {},
'sort': None,
'limit': 10
},
{
'name': '按时间排序',
'query': {},
'sort': {'collected_at': -1},
'limit': 10
},
{
'name': '标题文本搜索',
'query': {'title': {'$regex': '市场', '$options': 'i'}},
'sort': None,
'limit': 10
},
{
'name': '分类筛选',
'query': {'category': '财经新闻'},
'sort': None,
'limit': 10
},
{
'name': '时间范围查询',
'query': {
'collected_at': {
'$gte': datetime.now(timezone.utc) - timedelta(days=7)
}
},
'sort': {'collected_at': -1},
'limit': 10
}
]
for test in test_queries:
try:
start_time = time.time()
result = self.mongodb_client.find_documents(
self.collection_name,
query=test['query'],
sort=test.get('sort'),
limit=test['limit']
)
end_time = time.time()
query_time = (end_time - start_time) * 1000 # 转换为毫秒
performance_results[test['name']] = {
'query_time_ms': round(query_time, 2),
'success': result.get('success', False),
'document_count': len(result.get('documents', [])),
'query': test['query']
}
except Exception as e:
performance_results[test['name']] = {
'error': str(e),
'query': test['query']
}
return performance_results
async def check_existing_indexes(self) -> Dict[str, Any]:
"""检查现有索引"""
try:
# 注意这里需要使用MongoDB的原生命令来获取索引信息
# 由于SwarmMongoDBClient可能没有直接的索引查询方法我们尝试其他方式
# 尝试通过聚合管道获取索引信息
pipeline = [
{"$indexStats": {}}
]
# 如果客户端支持聚合查询
if hasattr(self.mongodb_client, 'aggregate_documents'):
result = self.mongodb_client.aggregate_documents(
self.collection_name,
pipeline=pipeline
)
if result.get('success'):
return {
'indexes': result.get('documents', []),
'method': 'aggregation'
}
# 如果无法直接获取索引信息,返回建议
return {
'message': '无法直接查询索引信息,建议手动检查',
'method': 'manual_check_needed'
}
except Exception as e:
return {
'error': str(e),
'message': '索引查询失败'
}
def generate_index_recommendations(self, performance_results: Dict[str, Any],
data_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""生成索引建议"""
recommendations = {
'basic_indexes': [],
'compound_indexes': [],
'text_indexes': [],
'vector_indexes': [],
'reasoning': []
}
# 基础索引建议
slow_queries = [name for name, result in performance_results.items()
if isinstance(result, dict) and result.get('query_time_ms', 0) > 100]
if slow_queries:
recommendations['reasoning'].append(f"发现慢查询: {', '.join(slow_queries)}")
# 基于数据结构的索引建议
field_analysis = data_analysis.get('field_analysis', {})
# 时间字段索引(用于排序和范围查询)
if 'collected_at' in field_analysis:
recommendations['basic_indexes'].append({
'field': 'collected_at',
'type': 'descending',
'reason': '用于时间排序和范围查询'
})
if 'published' in field_analysis:
recommendations['basic_indexes'].append({
'field': 'published',
'type': 'descending',
'reason': '用于发布时间查询'
})
# 分类字段索引
if 'category' in field_analysis:
recommendations['basic_indexes'].append({
'field': 'category',
'type': 'ascending',
'reason': '用于分类筛选'
})
# 唯一标识符索引
if 'article_id' in field_analysis:
recommendations['basic_indexes'].append({
'field': 'article_id',
'type': 'ascending',
'unique': True,
'reason': '唯一标识符,防止重复'
})
# 复合索引建议
recommendations['compound_indexes'].append({
'fields': ['category', 'collected_at'],
'reason': '支持按分类筛选并按时间排序'
})
# 文本搜索索引
text_fields = []
for field in ['title', 'description', 'summary']:
if field in field_analysis:
text_fields.append(field)
if text_fields:
recommendations['text_indexes'].append({
'fields': text_fields,
'type': 'text',
'reason': '支持全文搜索'
})
# 向量索引建议
recommendations['vector_indexes'].append({
'consideration': '如果需要语义搜索',
'fields': ['title', 'description'],
'method': 'embedding + vector_search',
'reason': '用于基于内容相似性的智能搜索和推荐'
})
return recommendations
async def test_sample_queries(self) -> Dict[str, Any]:
"""测试一些示例查询"""
sample_queries = {}
try:
# 1. 获取最新10条新闻
latest_news = self.mongodb_client.find_documents(
self.collection_name,
query={},
sort={'collected_at': -1},
limit=10
)
sample_queries['latest_news'] = {
'success': latest_news.get('success'),
'count': len(latest_news.get('documents', [])),
'sample_titles': [doc.get('title', 'N/A')[:50] + '...'
for doc in latest_news.get('documents', [])[:3]]
}
# 2. 按分类查询
category_news = self.mongodb_client.find_documents(
self.collection_name,
query={'category': '财经新闻'},
limit=5
)
sample_queries['category_news'] = {
'success': category_news.get('success'),
'count': len(category_news.get('documents', [])),
'category': '财经新闻'
}
# 3. 关键词搜索
keyword_search = self.mongodb_client.find_documents(
self.collection_name,
query={'title': {'$regex': '投资|股票|市场', '$options': 'i'}},
limit=5
)
sample_queries['keyword_search'] = {
'success': keyword_search.get('success'),
'count': len(keyword_search.get('documents', [])),
'keywords': '投资|股票|市场'
}
except Exception as e:
sample_queries['error'] = str(e)
return sample_queries
async def run_comprehensive_analysis(self) -> Dict[str, Any]:
"""运行完整的数据分析"""
self.logger.info("开始RSS数据分析...")
# 连接数据库
if not await self.connect_to_database():
return {'error': '无法连接到数据库'}
analysis_results = {}
# 1. 获取集合统计信息
self.logger.info("获取集合统计信息...")
analysis_results['collection_stats'] = await self.get_collection_stats()
# 2. 分析数据结构
self.logger.info("分析数据结构...")
analysis_results['data_structure'] = await self.analyze_data_structure()
# 3. 测试查询性能
self.logger.info("测试查询性能...")
analysis_results['query_performance'] = await self.test_query_performance()
# 4. 检查现有索引
self.logger.info("检查现有索引...")
analysis_results['existing_indexes'] = await self.check_existing_indexes()
# 5. 生成索引建议
self.logger.info("生成索引建议...")
analysis_results['index_recommendations'] = self.generate_index_recommendations(
analysis_results['query_performance'],
analysis_results['data_structure']
)
# 6. 测试示例查询
self.logger.info("测试示例查询...")
analysis_results['sample_queries'] = await self.test_sample_queries()
return analysis_results
async def main():
"""主函数"""
# 初始化MongoDB客户端
mongodb_client = SwarmMongoDBClient(
mcp_server_url="http://localhost:8080",
default_database="news_debate_db"
)
# 创建数据读取器
reader = RSSDataReader(mongodb_client)
# 运行分析
results = await reader.run_comprehensive_analysis()
# 输出结果
print("\n" + "="*60)
print("RSS数据分析报告")
print("="*60)
# 集合统计
stats = results.get('collection_stats', {})
print(f"\n📊 集合统计:")
print(f" 总文档数: {stats.get('total_documents', 0)}")
print(f" 集合存在: {stats.get('collection_exists', False)}")
# 数据结构
structure = results.get('data_structure', {})
if 'field_analysis' in structure:
print(f"\n🏗️ 数据结构:")
for field, info in structure['field_analysis'].items():
print(f" {field}: {info['type']} (出现{info['count']}次)")
# 查询性能
performance = results.get('query_performance', {})
print(f"\n⚡ 查询性能:")
for query_name, result in performance.items():
if isinstance(result, dict) and 'query_time_ms' in result:
print(f" {query_name}: {result['query_time_ms']}ms ({result['document_count']}条结果)")
# 索引建议
recommendations = results.get('index_recommendations', {})
print(f"\n💡 索引建议:")
basic_indexes = recommendations.get('basic_indexes', [])
if basic_indexes:
print(f" 基础索引:")
for idx in basic_indexes:
print(f" - {idx['field']} ({idx.get('type', 'ascending')}): {idx['reason']}")
compound_indexes = recommendations.get('compound_indexes', [])
if compound_indexes:
print(f" 复合索引:")
for idx in compound_indexes:
print(f" - {', '.join(idx['fields'])}: {idx['reason']}")
text_indexes = recommendations.get('text_indexes', [])
if text_indexes:
print(f" 文本索引:")
for idx in text_indexes:
print(f" - {', '.join(idx['fields'])}: {idx['reason']}")
vector_indexes = recommendations.get('vector_indexes', [])
if vector_indexes:
print(f" 向量索引建议:")
for idx in vector_indexes:
print(f" - {idx['consideration']}: {idx['reason']}")
# 示例查询结果
samples = results.get('sample_queries', {})
print(f"\n🔍 示例查询:")
for query_name, result in samples.items():
if isinstance(result, dict) and 'count' in result:
print(f" {query_name}: {result['count']}条结果")
print(f"\n" + "="*60)
print("分析完成!")
print("="*60)
# 保存详细结果到文件
with open('/home/ben/liurenchaxin/rss_analysis_report.json', 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2, default=str)
print("\n详细报告已保存到: rss_analysis_report.json")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
asyncio.run(main())