283 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			283 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
| #!/usr/bin/env python3
 | ||
| # -*- coding: utf-8 -*-
 | ||
| """
 | ||
| 稷下学宫负载均衡演示脚本
 | ||
| 展示八仙论道的API负载分担策略
 | ||
| """
 | ||
| 
 | ||
| import os
 | ||
| import sys
 | ||
| import time
 | ||
| import json
 | ||
| from datetime import datetime
 | ||
| 
 | ||
| # 添加项目路径
 | ||
| sys.path.append('/home/ben/liurenchaxin/src')
 | ||
| 
 | ||
| from jixia.engines.jixia_load_balancer import JixiaLoadBalancer, APIResult
 | ||
| 
 | ||
| def print_banner():
 | ||
|     """打印横幅"""
 | ||
|     print("\n" + "="*80)
 | ||
|     print("🏛️  稷下学宫八仙论道 - API负载均衡演示系统")
 | ||
|     print("📊  RapidAPI多源数据整合与负载分担策略")
 | ||
|     print("="*80)
 | ||
|     print(f"⏰ 演示时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
 | ||
|     print()
 | ||
| 
 | ||
| def print_immortal_intro():
 | ||
|     """介绍八仙角色"""
 | ||
|     immortals_info = {
 | ||
|         '吕洞宾': '主力剑仙 - 综合分析与决策 (Alpha Vantage)',
 | ||
|         '何仙姑': '风控专家 - 风险管理与合规 (Yahoo Finance)',
 | ||
|         '张果老': '技术分析师 - 技术指标与图表分析 (Webull)',
 | ||
|         '韩湘子': '基本面研究员 - 财务分析与估值 (Seeking Alpha)',
 | ||
|         '汉钟离': '量化专家 - 数据挖掘与算法交易 (Yahoo Finance)',
 | ||
|         '蓝采和': '情绪分析师 - 市场情绪与舆情监控 (Webull)',
 | ||
|         '曹国舅': '宏观分析师 - 宏观经济与政策分析 (Seeking Alpha)',
 | ||
|         '铁拐李': '逆向投资专家 - 价值发现与逆向思维 (Alpha Vantage)'
 | ||
|     }
 | ||
|     
 | ||
|     print("👥 八仙角色与API分配:")
 | ||
|     print("-" * 60)
 | ||
|     for immortal, description in immortals_info.items():
 | ||
|         print(f"   {immortal}: {description}")
 | ||
|     print()
 | ||
| 
 | ||
| def demonstrate_single_immortal(load_balancer, immortal_name, symbol):
 | ||
|     """演示单个仙人的数据获取"""
 | ||
|     print(f"\n🎭 {immortal_name} 单独获取数据演示:")
 | ||
|     print("-" * 40)
 | ||
|     
 | ||
|     # 获取股票报价
 | ||
|     result = load_balancer.get_data_for_immortal(immortal_name, 'stock_quote', symbol)
 | ||
|     
 | ||
|     if result.success:
 | ||
|         data = result.data
 | ||
|         print(f"   ✅ 成功获取 {symbol} 数据:")
 | ||
|         print(f"      💰 价格: ${data.get('price', 'N/A')}")
 | ||
|         print(f"      📈 涨跌: {data.get('change_percent', 'N/A')}")
 | ||
|         volume = data.get('volume', 'N/A')
 | ||
|         if isinstance(volume, (int, float)):
 | ||
|             print(f"      📊 成交量: {volume:,}")
 | ||
|         else:
 | ||
|             print(f"      📊 成交量: {volume}")
 | ||
|         print(f"      🔗 数据源: {result.api_used}")
 | ||
|         print(f"      ⏱️ 响应时间: {result.response_time:.2f}秒")
 | ||
|         print(f"      💾 缓存状态: {'是' if result.cached else '否'}")
 | ||
|     else:
 | ||
|         print(f"   ❌ 获取失败: {result.error}")
 | ||
| 
 | ||
| def demonstrate_load_distribution(load_balancer):
 | ||
|     """演示负载分布"""
 | ||
|     print("\n📊 API负载分布统计:")
 | ||
|     print("-" * 40)
 | ||
|     
 | ||
|     distribution = load_balancer.get_load_distribution()
 | ||
|     
 | ||
|     if not distribution:
 | ||
|         print("   📝 暂无API调用记录")
 | ||
|         return
 | ||
|     
 | ||
|     total_calls = sum(stats['calls'] for stats in distribution.values())
 | ||
|     
 | ||
|     for api_name, stats in distribution.items():
 | ||
|         status_icon = "🟢" if stats['healthy'] else "🔴"
 | ||
|         print(f"   {status_icon} {api_name}:")
 | ||
|         print(f"      📞 调用次数: {stats['calls']}")
 | ||
|         print(f"      📊 负载占比: {stats['percentage']:.1f}%")
 | ||
|         print(f"      ❌ 连续失败: {stats['consecutive_failures']}次")
 | ||
|         print()
 | ||
| 
 | ||
| def demonstrate_api_comparison(load_balancer, symbol):
 | ||
|     """演示不同API的数据对比"""
 | ||
|     print(f"\n🔍 {symbol} 多API数据对比:")
 | ||
|     print("-" * 50)
 | ||
|     
 | ||
|     apis = ['alpha_vantage', 'yahoo_finance_15', 'webull']
 | ||
|     results = {}
 | ||
|     
 | ||
|     for api in apis:
 | ||
|         # 临时修改API分配来测试不同数据源
 | ||
|         original_mapping = load_balancer.immortal_api_mapping['stock_quote']['吕洞宾']
 | ||
|         load_balancer.immortal_api_mapping['stock_quote']['吕洞宾'] = api
 | ||
|         
 | ||
|         result = load_balancer.get_data_for_immortal('吕洞宾', 'stock_quote', symbol)
 | ||
|         results[api] = result
 | ||
|         
 | ||
|         # 恢复原始配置
 | ||
|         load_balancer.immortal_api_mapping['stock_quote']['吕洞宾'] = original_mapping
 | ||
|         
 | ||
|         time.sleep(0.5)  # 避免过快请求
 | ||
|     
 | ||
|     # 显示对比结果
 | ||
|     print("   API数据源对比:")
 | ||
|     for api, result in results.items():
 | ||
|         if result.success:
 | ||
|             data = result.data
 | ||
|             print(f"   📡 {api}:")
 | ||
|             print(f"      💰 ${data.get('price', 'N/A')} ({data.get('change_percent', 'N/A')})")
 | ||
|             print(f"      ⏱️ {result.response_time:.2f}s")
 | ||
|         else:
 | ||
|             print(f"   📡 {api}: ❌ {result.error}")
 | ||
|         print()
 | ||
| 
 | ||
| def demonstrate_cache_effectiveness(load_balancer, symbol):
 | ||
|     """演示缓存效果"""
 | ||
|     print(f"\n💾 缓存效果演示 - {symbol}:")
 | ||
|     print("-" * 40)
 | ||
|     
 | ||
|     # 第一次调用(无缓存)
 | ||
|     print("   🔄 第一次调用(无缓存):")
 | ||
|     start_time = time.time()
 | ||
|     result1 = load_balancer.get_data_for_immortal('吕洞宾', 'stock_quote', symbol)
 | ||
|     first_call_time = time.time() - start_time
 | ||
|     
 | ||
|     if result1.success:
 | ||
|         print(f"      ⏱️ 响应时间: {result1.response_time:.2f}秒")
 | ||
|         print(f"      💾 缓存状态: {'命中' if result1.cached else '未命中'}")
 | ||
|     
 | ||
|     time.sleep(1)
 | ||
|     
 | ||
|     # 第二次调用(有缓存)
 | ||
|     print("   🔄 第二次调用(有缓存):")
 | ||
|     start_time = time.time()
 | ||
|     result2 = load_balancer.get_data_for_immortal('吕洞宾', 'stock_quote', symbol)
 | ||
|     second_call_time = time.time() - start_time
 | ||
|     
 | ||
|     if result2.success:
 | ||
|         print(f"      ⏱️ 响应时间: {result2.response_time:.2f}秒")
 | ||
|         print(f"      💾 缓存状态: {'命中' if result2.cached else '未命中'}")
 | ||
|         
 | ||
|         if result2.cached:
 | ||
|             speedup = (first_call_time / second_call_time) if second_call_time > 0 else float('inf')
 | ||
|             print(f"      🚀 性能提升: {speedup:.1f}倍")
 | ||
| 
 | ||
| def demonstrate_failover(load_balancer, symbol):
 | ||
|     """演示故障转移"""
 | ||
|     print(f"\n🔄 故障转移演示 - {symbol}:")
 | ||
|     print("-" * 40)
 | ||
|     
 | ||
|     # 模拟API故障
 | ||
|     print("   ⚠️ 模拟主API故障...")
 | ||
|     
 | ||
|     # 临时标记API为不健康
 | ||
|     original_health = load_balancer.health_checker.health_status['alpha_vantage']['healthy']
 | ||
|     load_balancer.health_checker.health_status['alpha_vantage']['healthy'] = False
 | ||
|     load_balancer.health_checker.health_status['alpha_vantage']['consecutive_failures'] = 5
 | ||
|     
 | ||
|     # 尝试获取数据(应该自动故障转移)
 | ||
|     result = load_balancer.get_data_for_immortal('吕洞宾', 'stock_quote', symbol)
 | ||
|     
 | ||
|     if result.success:
 | ||
|         print(f"   ✅ 故障转移成功,使用备用API: {result.api_used}")
 | ||
|         print(f"   💰 获取到价格: ${result.data.get('price', 'N/A')}")
 | ||
|     else:
 | ||
|         print(f"   ❌ 故障转移失败: {result.error}")
 | ||
|     
 | ||
|     # 恢复API健康状态
 | ||
|     load_balancer.health_checker.health_status['alpha_vantage']['healthy'] = original_health
 | ||
|     load_balancer.health_checker.health_status['alpha_vantage']['consecutive_failures'] = 0
 | ||
|     
 | ||
|     print("   🔧 API健康状态已恢复")
 | ||
| 
 | ||
| def save_demo_results(results, filename='demo_results.json'):
 | ||
|     """保存演示结果"""
 | ||
|     demo_data = {
 | ||
|         'timestamp': datetime.now().isoformat(),
 | ||
|         'results': {},
 | ||
|         'summary': {
 | ||
|             'total_immortals': len(results),
 | ||
|             'successful_calls': sum(1 for r in results.values() if r.success),
 | ||
|             'failed_calls': sum(1 for r in results.values() if not r.success)
 | ||
|         }
 | ||
|     }
 | ||
|     
 | ||
|     for immortal, result in results.items():
 | ||
|         demo_data['results'][immortal] = {
 | ||
|             'success': result.success,
 | ||
|             'api_used': result.api_used,
 | ||
|             'response_time': result.response_time,
 | ||
|             'cached': result.cached,
 | ||
|             'error': result.error,
 | ||
|             'data_summary': {
 | ||
|                 'symbol': result.data.get('symbol') if result.success else None,
 | ||
|                 'price': result.data.get('price') if result.success else None,
 | ||
|                 'change_percent': result.data.get('change_percent') if result.success else None
 | ||
|             }
 | ||
|         }
 | ||
|     
 | ||
|     with open(filename, 'w', encoding='utf-8') as f:
 | ||
|         json.dump(demo_data, f, indent=2, ensure_ascii=False)
 | ||
|     
 | ||
|     print(f"\n💾 演示结果已保存到: {filename}")
 | ||
| 
 | ||
| def main():
 | ||
|     """主演示函数"""
 | ||
|     # 检查API密钥
 | ||
|     rapidapi_key = os.getenv('RAPIDAPI_KEY')
 | ||
|     if not rapidapi_key:
 | ||
|         print("❌ 错误: 请设置RAPIDAPI_KEY环境变量")
 | ||
|         print("   提示: 使用 'doppler run python demo_jixia_load_balancing.py' 运行")
 | ||
|         return
 | ||
|     
 | ||
|     print_banner()
 | ||
|     print_immortal_intro()
 | ||
|     
 | ||
|     # 创建负载均衡器
 | ||
|     print("🔧 初始化稷下学宫负载均衡器...")
 | ||
|     load_balancer = JixiaLoadBalancer(rapidapi_key)
 | ||
|     print("✅ 负载均衡器初始化完成\n")
 | ||
|     
 | ||
|     # 演示股票代码
 | ||
|     demo_symbols = ['AAPL', 'TSLA', 'MSFT']
 | ||
|     
 | ||
|     for i, symbol in enumerate(demo_symbols, 1):
 | ||
|         print(f"\n{'='*20} 演示 {i}: {symbol} {'='*20}")
 | ||
|         
 | ||
|         # 1. 单个仙人演示
 | ||
|         demonstrate_single_immortal(load_balancer, '吕洞宾', symbol)
 | ||
|         
 | ||
|         # 2. 八仙论道演示
 | ||
|         print(f"\n🏛️ 八仙论道完整演示 - {symbol}:")
 | ||
|         debate_results = load_balancer.conduct_immortal_debate(symbol)
 | ||
|         
 | ||
|         # 3. 负载分布演示
 | ||
|         demonstrate_load_distribution(load_balancer)
 | ||
|         
 | ||
|         # 只在第一个股票上演示高级功能
 | ||
|         if i == 1:
 | ||
|             # 4. API对比演示
 | ||
|             demonstrate_api_comparison(load_balancer, symbol)
 | ||
|             
 | ||
|             # 5. 缓存效果演示
 | ||
|             demonstrate_cache_effectiveness(load_balancer, symbol)
 | ||
|             
 | ||
|             # 6. 故障转移演示
 | ||
|             demonstrate_failover(load_balancer, symbol)
 | ||
|         
 | ||
|         # 保存结果
 | ||
|         save_demo_results(debate_results, f'demo_results_{symbol.lower()}.json')
 | ||
|         
 | ||
|         if i < len(demo_symbols):
 | ||
|             print("\n⏳ 等待3秒后继续下一个演示...")
 | ||
|             time.sleep(3)
 | ||
|     
 | ||
|     # 最终统计
 | ||
|     print("\n" + "="*80)
 | ||
|     print("📈 演示完成 - 最终负载分布统计:")
 | ||
|     demonstrate_load_distribution(load_balancer)
 | ||
|     
 | ||
|     print("\n🎉 稷下学宫API负载均衡演示完成!")
 | ||
|     print("\n💡 关键特性:")
 | ||
|     print("   ✅ 智能负载分担 - 八仙各司其职,分散API压力")
 | ||
|     print("   ✅ 自动故障转移 - API异常时自动切换备用源")
 | ||
|     print("   ✅ 数据标准化 - 统一不同API的数据格式")
 | ||
|     print("   ✅ 智能缓存 - 减少重复调用,提升响应速度")
 | ||
|     print("   ✅ 实时监控 - 跟踪API健康状态和负载分布")
 | ||
|     print("\n📚 查看详细配置: /home/ben/liurenchaxin/src/jixia/config/immortal_api_config.json")
 | ||
|     print("🔧 核心引擎: /home/ben/liurenchaxin/src/jixia/engines/jixia_load_balancer.py")
 | ||
|     print("="*80)
 | ||
| 
 | ||
| if __name__ == "__main__":
 | ||
|     main() |