liurenchaxin/engines/market_fsm.py

109 lines
4.1 KiB
Python

# market_fsm.py
from typing import Dict, Any
from mythology import MythologyEngine, DaoistMythologyEngine
from cycle_models import CycleModel, TwelveStagesOfLifeCycleModel
class MarketFSM:
"""
一个简化的市场分析有限状态机 (FSM)。
本 FSM 演示了如何通过抽象基类来调用外部的“神话引擎”和“周期模型”,
从而实现了核心逻辑与具体实现(如道家神话、十二长生模型)的解耦。
"""
def __init__(self, mythology_engine: MythologyEngine, cycle_model: CycleModel):
"""
FSM 在初始化时,注入所需引擎的抽象实例。
它不关心传入的是哪个具体的神话引擎或周期模型。
"""
if not isinstance(mythology_engine, MythologyEngine):
raise TypeError("mythology_engine must be a subclass of MythologyEngine")
if not isinstance(cycle_model, CycleModel):
raise TypeError("cycle_model must be a subclass of CycleModel")
self.mythology_engine = mythology_engine
self.cycle_model = cycle_model
self.current_state = "Idle"
self.context: Dict[str, Any] = {}
print(f"--- FSM 已启动 ---")
print(f"世界观: {self.mythology_engine.get_system_narrative()}")
print(f"周期模型: {self.cycle_model.__class__.__name__}")
print("-" * 50)
def run_analysis(self, market_data: Dict[str, Any]):
"""
执行一次完整的市场分析流程。
"""
print(f"\n【状态: {self.current_state}】-> [开始分析] -> 【状态: Collecting】")
self.current_state = "Collecting"
self._collect_data(market_data)
print(f"\n【状态: {self.current_state}】-> [分析周期] -> 【状态: CycleAnalysis】")
self.current_state = "CycleAnalysis"
self._analyze_cycle()
print(f"\n【状态: {self.current_state}】-> [生成报告] -> 【状态: Reporting】")
self.current_state = "Reporting"
self._generate_report()
print(f"\n【状态: {self.current_state}】-> [完成] -> 【状态: Idle】")
self.current_state = "Idle"
print("\n--- 分析流程结束 ---")
def _collect_data(self, market_data: Dict[str, Any]):
"""
模拟数据收集阶段。
"""
actor = self.mythology_engine.get_actor_name('collector')
metaphor = self.mythology_engine.get_process_metaphor('multi_agent_debate')
print(f"执行者: [{actor}]")
print(f"动作: [{metaphor}]")
self.context['market_data'] = market_data
print("数据收集完成。")
def _analyze_cycle(self):
"""
分析市场周期阶段。
"""
current_stage = self.cycle_model.get_current_stage(self.context['market_data'])
self.context['cycle_stage'] = current_stage
print(f"周期模型分析完成,当前阶段为: 【{current_stage}")
def _generate_report(self):
"""
生成最终报告。
"""
actor = self.mythology_engine.get_actor_name('synthesizer')
metaphor = self.mythology_engine.get_process_metaphor('final_decision')
stage = self.context['cycle_stage']
characteristics = self.cycle_model.get_stage_characteristics(stage)
print(f"执行者: [{actor}]")
print(f"动作: [{metaphor}]")
print("\n--- 最终分析报告 ---")
print(f"市场周期阶段: {stage}")
print(f"阶段特征: {characteristics.get('description', 'N/A')}")
print(f"建议策略: {characteristics.get('strategy', 'N/A')}")
print("--- 报告结束 ---")
if __name__ == '__main__':
# 1. 在系统启动时,选择并实例化具体的引擎和模型
daoist_mythology = DaoistMythologyEngine()
twelve_stages_cycle = TwelveStagesOfLifeCycleModel()
# 2. 将实例注入到 FSM 中
# FSM 只依赖于抽象,不关心具体实现
fsm = MarketFSM(
mythology_engine=daoist_mythology,
cycle_model=twelve_stages_cycle
)
# 3. 运行 FSM
# 模拟的市场数据
simulated_data = {"mock_score": 4} # 模拟处于“帝旺”阶段
fsm.run_analysis(simulated_data)