109 lines
4.1 KiB
Python
109 lines
4.1 KiB
Python
# market_fsm.py
|
|
|
|
from typing import Dict, Any
|
|
from mythology import MythologyEngine, DaoistMythologyEngine
|
|
from cycle_models import CycleModel, TwelveStagesOfLifeCycleModel
|
|
|
|
class MarketFSM:
|
|
"""
|
|
一个简化的市场分析有限状态机 (FSM)。
|
|
|
|
本 FSM 演示了如何通过抽象基类来调用外部的“神话引擎”和“周期模型”,
|
|
从而实现了核心逻辑与具体实现(如道家神话、十二长生模型)的解耦。
|
|
"""
|
|
|
|
def __init__(self, mythology_engine: MythologyEngine, cycle_model: CycleModel):
|
|
"""
|
|
FSM 在初始化时,注入所需引擎的抽象实例。
|
|
它不关心传入的是哪个具体的神话引擎或周期模型。
|
|
"""
|
|
if not isinstance(mythology_engine, MythologyEngine):
|
|
raise TypeError("mythology_engine must be a subclass of MythologyEngine")
|
|
if not isinstance(cycle_model, CycleModel):
|
|
raise TypeError("cycle_model must be a subclass of CycleModel")
|
|
|
|
self.mythology_engine = mythology_engine
|
|
self.cycle_model = cycle_model
|
|
self.current_state = "Idle"
|
|
self.context: Dict[str, Any] = {}
|
|
|
|
print(f"--- FSM 已启动 ---")
|
|
print(f"世界观: {self.mythology_engine.get_system_narrative()}")
|
|
print(f"周期模型: {self.cycle_model.__class__.__name__}")
|
|
print("-" * 50)
|
|
|
|
|
|
def run_analysis(self, market_data: Dict[str, Any]):
|
|
"""
|
|
执行一次完整的市场分析流程。
|
|
"""
|
|
print(f"\n【状态: {self.current_state}】-> [开始分析] -> 【状态: Collecting】")
|
|
self.current_state = "Collecting"
|
|
self._collect_data(market_data)
|
|
|
|
print(f"\n【状态: {self.current_state}】-> [分析周期] -> 【状态: CycleAnalysis】")
|
|
self.current_state = "CycleAnalysis"
|
|
self._analyze_cycle()
|
|
|
|
print(f"\n【状态: {self.current_state}】-> [生成报告] -> 【状态: Reporting】")
|
|
self.current_state = "Reporting"
|
|
self._generate_report()
|
|
|
|
print(f"\n【状态: {self.current_state}】-> [完成] -> 【状态: Idle】")
|
|
self.current_state = "Idle"
|
|
print("\n--- 分析流程结束 ---")
|
|
|
|
def _collect_data(self, market_data: Dict[str, Any]):
|
|
"""
|
|
模拟数据收集阶段。
|
|
"""
|
|
actor = self.mythology_engine.get_actor_name('collector')
|
|
metaphor = self.mythology_engine.get_process_metaphor('multi_agent_debate')
|
|
print(f"执行者: [{actor}]")
|
|
print(f"动作: [{metaphor}]")
|
|
self.context['market_data'] = market_data
|
|
print("数据收集完成。")
|
|
|
|
def _analyze_cycle(self):
|
|
"""
|
|
分析市场周期阶段。
|
|
"""
|
|
current_stage = self.cycle_model.get_current_stage(self.context['market_data'])
|
|
self.context['cycle_stage'] = current_stage
|
|
print(f"周期模型分析完成,当前阶段为: 【{current_stage}】")
|
|
|
|
def _generate_report(self):
|
|
"""
|
|
生成最终报告。
|
|
"""
|
|
actor = self.mythology_engine.get_actor_name('synthesizer')
|
|
metaphor = self.mythology_engine.get_process_metaphor('final_decision')
|
|
stage = self.context['cycle_stage']
|
|
characteristics = self.cycle_model.get_stage_characteristics(stage)
|
|
|
|
print(f"执行者: [{actor}]")
|
|
print(f"动作: [{metaphor}]")
|
|
print("\n--- 最终分析报告 ---")
|
|
print(f"市场周期阶段: {stage}")
|
|
print(f"阶段特征: {characteristics.get('description', 'N/A')}")
|
|
print(f"建议策略: {characteristics.get('strategy', 'N/A')}")
|
|
print("--- 报告结束 ---")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# 1. 在系统启动时,选择并实例化具体的引擎和模型
|
|
daoist_mythology = DaoistMythologyEngine()
|
|
twelve_stages_cycle = TwelveStagesOfLifeCycleModel()
|
|
|
|
# 2. 将实例注入到 FSM 中
|
|
# FSM 只依赖于抽象,不关心具体实现
|
|
fsm = MarketFSM(
|
|
mythology_engine=daoist_mythology,
|
|
cycle_model=twelve_stages_cycle
|
|
)
|
|
|
|
# 3. 运行 FSM
|
|
# 模拟的市场数据
|
|
simulated_data = {"mock_score": 4} # 模拟处于“帝旺”阶段
|
|
fsm.run_analysis(simulated_data)
|