huhan3000/phallic-worship-analysis/analysis/statistics/fertility_anxiety_analyzer.py

463 lines
19 KiB
Python

"""
生育焦虑量化分析系统
分析拓跋鲜卑皇室的生育焦虑程度及其与政治政策、宗教活动的关联
"""
import statistics
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Dict, Any, Tuple
import pandas as pd
from collections import Counter
from dataclasses import asdict
from analysis.models import Emperor, ReliabilityLevel
from data.emperors.northern_wei_emperors import NORTHERN_WEI_EMPERORS
class FertilityAnxietyAnalyzer:
"""生育焦虑量化分析器"""
def __init__(self, emperors: List[Emperor] = None):
self.emperors = emperors or NORTHERN_WEI_EMPERORS
# 生育焦虑评估标准
self.anxiety_indicators = {
"low_offspring": 0.3, # 子嗣少于平均值
"short_lifespan": 0.2, # 短寿
"religious_activity": 0.2, # 频繁宗教活动
"violent_death": 0.2, # 非自然死亡
"early_succession": 0.1 # 早期传位
}
# 宗教活动关键词
self.religious_keywords = [
"祭祀", "祈福", "天师", "道教", "佛教", "寺庙",
"昊天", "上帝", "", "祖先", "宗庙", "太庙"
]
# 生育相关宗教活动关键词
self.fertility_religious_keywords = [
"祈子", "求嗣", "生育", "多子", "繁衍", "子孙", "后代"
]
def calculate_fertility_anxiety_score(self, emperor: Emperor) -> float:
"""计算单个皇帝的生育焦虑评分"""
if emperor.fertility_anxiety_score is not None:
return emperor.fertility_anxiety_score
score = 0.0
# 1. 子嗣数量因子
if emperor.offspring_count is not None:
avg_offspring = self._get_average_offspring_count()
if emperor.offspring_count < avg_offspring:
score += self.anxiety_indicators["low_offspring"]
# 2. 寿命因子
if emperor.lifespan is not None and emperor.lifespan < 30:
score += self.anxiety_indicators["short_lifespan"]
# 3. 宗教活动因子
religious_score = self._analyze_religious_activities(emperor.religious_activities)
score += religious_score * self.anxiety_indicators["religious_activity"]
# 4. 死因因子
if emperor.death_cause and any(keyword in emperor.death_cause
for keyword in ["", "", "", "暗杀"]):
score += self.anxiety_indicators["violent_death"]
# 5. 在位时长因子(早期传位可能表示焦虑)
if emperor.reign_duration is not None and emperor.reign_duration < 10:
score += self.anxiety_indicators["early_succession"]
return min(score, 1.0) # 限制在0-1之间
def _get_average_offspring_count(self) -> float:
"""获取平均子嗣数量"""
offspring_counts = [emp.offspring_count for emp in self.emperors
if emp.offspring_count is not None]
return statistics.mean(offspring_counts) if offspring_counts else 5.0
def _analyze_religious_activities(self, activities: List[str]) -> float:
"""分析宗教活动的生育焦虑相关性"""
if not activities:
return 0.0
total_score = 0.0
activity_text = " ".join(activities)
# 检查一般宗教活动
religious_count = sum(1 for keyword in self.religious_keywords
if keyword in activity_text)
# 检查生育相关宗教活动(权重更高)
fertility_religious_count = sum(1 for keyword in self.fertility_religious_keywords
if keyword in activity_text)
# 计算评分
total_score = (religious_count * 0.1 + fertility_religious_count * 0.3) / len(activities)
return min(total_score, 1.0)
def analyze_anxiety_distribution(self) -> Dict[str, Any]:
"""分析生育焦虑分布"""
anxiety_scores = []
emperors_with_scores = []
for emperor in self.emperors:
score = self.calculate_fertility_anxiety_score(emperor)
anxiety_scores.append(score)
emperors_with_scores.append((emperor, score))
if not anxiety_scores:
return {"error": "无法计算生育焦虑评分"}
# 按焦虑程度分类
high_anxiety = [emp for emp, score in emperors_with_scores if score >= 0.7]
medium_anxiety = [emp for emp, score in emperors_with_scores if 0.4 <= score < 0.7]
low_anxiety = [emp for emp, score in emperors_with_scores if score < 0.4]
return {
"total_emperors": len(self.emperors),
"mean_anxiety": statistics.mean(anxiety_scores),
"median_anxiety": statistics.median(anxiety_scores),
"std_anxiety": statistics.stdev(anxiety_scores) if len(anxiety_scores) > 1 else 0,
"high_anxiety_count": len(high_anxiety),
"medium_anxiety_count": len(medium_anxiety),
"low_anxiety_count": len(low_anxiety),
"high_anxiety_emperors": [emp.name for emp in high_anxiety],
"anxiety_scores": dict(zip([emp.name for emp in self.emperors], anxiety_scores))
}
def analyze_anxiety_policy_correlation(self) -> Dict[str, Any]:
"""分析生育焦虑与政策变化的相关性"""
policy_changes = []
anxiety_levels = []
for emperor in self.emperors:
anxiety_score = self.calculate_fertility_anxiety_score(emperor)
# 分析政策变化指标
policy_score = self._calculate_policy_change_score(emperor)
if policy_score is not None:
anxiety_levels.append(anxiety_score)
policy_changes.append(policy_score)
if len(anxiety_levels) < 3:
return {"error": "数据不足,无法进行相关性分析"}
correlation = self._calculate_correlation(anxiety_levels, policy_changes)
return {
"sample_size": len(anxiety_levels),
"correlation": correlation,
"interpretation": self._interpret_policy_correlation(correlation)
}
def _calculate_policy_change_score(self, emperor: Emperor) -> float:
"""计算政策变化评分"""
score = 0.0
# 宗教政策变化
religious_activities = emperor.religious_activities or []
activity_text = " ".join(religious_activities)
# 重大宗教政策变化关键词
major_changes = ["改革", "迁都", "灭佛", "复兴", "建立", "废除"]
change_count = sum(1 for keyword in major_changes if keyword in activity_text)
score += change_count * 0.2
# 在位时长(可能反映政策稳定性)
if emperor.reign_duration is not None:
if emperor.reign_duration > 20:
score += 0.3 # 长期在位,政策相对稳定
elif emperor.reign_duration < 5:
score += 0.1 # 短期在位,政策变化有限
return min(score, 1.0)
def analyze_anxiety_religious_correlation(self) -> Dict[str, Any]:
"""分析生育焦虑与宗教活动频率的关联"""
anxiety_scores = []
religious_frequencies = []
for emperor in self.emperors:
anxiety_score = self.calculate_fertility_anxiety_score(emperor)
religious_freq = len(emperor.religious_activities or [])
anxiety_scores.append(anxiety_score)
religious_frequencies.append(religious_freq)
if len(anxiety_scores) < 3:
return {"error": "数据不足"}
correlation = self._calculate_correlation(anxiety_scores, religious_frequencies)
# 分析特定类型的宗教活动
fertility_religious_analysis = self._analyze_fertility_religious_activities()
return {
"sample_size": len(anxiety_scores),
"anxiety_religious_correlation": correlation,
"fertility_religious_analysis": fertility_religious_analysis,
"interpretation": self._interpret_religious_correlation(correlation)
}
def _analyze_fertility_religious_activities(self) -> Dict[str, Any]:
"""分析生育相关宗教活动"""
fertility_activities = []
for emperor in self.emperors:
activities = emperor.religious_activities or []
activity_text = " ".join(activities)
fertility_count = sum(1 for keyword in self.fertility_religious_keywords
if keyword in activity_text)
if fertility_count > 0:
fertility_activities.append({
"emperor": emperor.name,
"anxiety_score": self.calculate_fertility_anxiety_score(emperor),
"fertility_activities": fertility_count,
"total_activities": len(activities)
})
return {
"emperors_with_fertility_activities": len(fertility_activities),
"fertility_activities_details": fertility_activities
}
def _calculate_correlation(self, x: List[float], y: List[float]) -> Dict[str, float]:
"""计算皮尔逊相关系数"""
if len(x) != len(y) or len(x) < 2:
return {"correlation": 0.0, "p_value": 1.0}
n = len(x)
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(xi * yi for xi, yi in zip(x, y))
sum_x2 = sum(xi * xi for xi in x)
sum_y2 = sum(yi * yi for yi in y)
numerator = n * sum_xy - sum_x * sum_y
denominator = ((n * sum_x2 - sum_x * sum_x) * (n * sum_y2 - sum_y * sum_y)) ** 0.5
if denominator == 0:
correlation = 0.0
else:
correlation = numerator / denominator
# 简化的p值估算
t_stat = correlation * ((n - 2) / (1 - correlation**2)) ** 0.5 if abs(correlation) != 1 else float('inf')
p_value = 2 * (1 - abs(t_stat) / (abs(t_stat) + n - 2)) if t_stat != float('inf') else 0.0
return {
"correlation": correlation,
"p_value": p_value,
"sample_size": n
}
def _interpret_policy_correlation(self, correlation: Dict[str, float]) -> str:
"""解释政策相关性"""
corr = correlation["correlation"]
p_val = correlation["p_value"]
if abs(corr) >= 0.5 and p_val < 0.05:
direction = "" if corr > 0 else ""
return f"生育焦虑与政策变化呈{direction}相关,相关性较强且统计显著"
elif abs(corr) >= 0.3:
direction = "" if corr > 0 else ""
return f"生育焦虑与政策变化呈{direction}相关,相关性中等"
else:
return "生育焦虑与政策变化相关性较弱"
def _interpret_religious_correlation(self, correlation: Dict[str, float]) -> str:
"""解释宗教相关性"""
corr = correlation["correlation"]
p_val = correlation["p_value"]
if corr >= 0.3 and p_val < 0.05:
return "生育焦虑与宗教活动频率呈正相关,支持'焦虑驱动宗教活动'假说"
elif corr >= 0.1:
return "生育焦虑与宗教活动频率呈弱正相关"
else:
return "生育焦虑与宗教活动频率相关性不明显"
def generate_anxiety_visualization(self, save_path: str = None) -> str:
"""生成生育焦虑可视化图表"""
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
# 计算所有皇帝的焦虑评分
emperors_scores = [(emp, self.calculate_fertility_anxiety_score(emp))
for emp in self.emperors]
names = [emp.name.split('拓跋')[0] for emp, _ in emperors_scores]
anxiety_scores = [score for _, score in emperors_scores]
# 1. 生育焦虑评分条形图
colors = ['red' if score >= 0.7 else 'orange' if score >= 0.4 else 'green'
for score in anxiety_scores]
bars = ax1.bar(range(len(names)), anxiety_scores, color=colors, alpha=0.7)
ax1.set_xlabel('皇帝')
ax1.set_ylabel('生育焦虑评分')
ax1.set_title('北魏皇帝生育焦虑评分')
ax1.set_xticks(range(len(names)))
ax1.set_xticklabels(names, rotation=45, ha='right')
ax1.axhline(0.7, color='red', linestyle='--', alpha=0.5, label='高焦虑阈值')
ax1.axhline(0.4, color='orange', linestyle='--', alpha=0.5, label='中焦虑阈值')
ax1.legend()
# 添加数值标签
for bar, score in zip(bars, anxiety_scores):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{score:.2f}', ha='center', va='bottom', fontsize=8)
# 2. 焦虑评分分布直方图
ax2.hist(anxiety_scores, bins=10, alpha=0.7, color='lightblue', edgecolor='black')
ax2.axvline(statistics.mean(anxiety_scores), color='red', linestyle='--',
label=f'平均焦虑: {statistics.mean(anxiety_scores):.2f}')
ax2.set_xlabel('生育焦虑评分')
ax2.set_ylabel('频数')
ax2.set_title('生育焦虑评分分布')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. 焦虑与子嗣数量关系
offspring_data = [(self.calculate_fertility_anxiety_score(emp), emp.offspring_count)
for emp in self.emperors if emp.offspring_count is not None]
if offspring_data:
anxiety_vals, offspring_vals = zip(*offspring_data)
ax3.scatter(anxiety_vals, offspring_vals, alpha=0.7, s=60)
# 添加趋势线
z = np.polyfit(anxiety_vals, offspring_vals, 1)
p = np.poly1d(z)
ax3.plot(anxiety_vals, p(anxiety_vals), "r--", alpha=0.8)
ax3.set_xlabel('生育焦虑评分')
ax3.set_ylabel('子嗣数量')
ax3.set_title('生育焦虑与子嗣数量关系')
ax3.grid(True, alpha=0.3)
# 4. 焦虑与宗教活动关系
religious_data = [(self.calculate_fertility_anxiety_score(emp),
len(emp.religious_activities or []))
for emp in self.emperors]
if religious_data:
anxiety_vals, religious_vals = zip(*religious_data)
ax4.scatter(anxiety_vals, religious_vals, alpha=0.7, s=60, color='purple')
# 添加趋势线
z = np.polyfit(anxiety_vals, religious_vals, 1)
p = np.poly1d(z)
ax4.plot(anxiety_vals, p(anxiety_vals), "r--", alpha=0.8)
ax4.set_xlabel('生育焦虑评分')
ax4.set_ylabel('宗教活动数量')
ax4.set_title('生育焦虑与宗教活动关系')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
return f"图表已保存到: {save_path}"
else:
plt.show()
return "图表已显示"
def generate_comprehensive_anxiety_report(self) -> Dict[str, Any]:
"""生成综合生育焦虑分析报告"""
report = {
"analysis_date": pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S"),
"methodology": {
"anxiety_indicators": self.anxiety_indicators,
"religious_keywords": len(self.religious_keywords),
"fertility_keywords": len(self.fertility_religious_keywords)
}
}
# 焦虑分布分析
report["anxiety_distribution"] = self.analyze_anxiety_distribution()
# 政策相关性分析
report["policy_correlation"] = self.analyze_anxiety_policy_correlation()
# 宗教活动相关性分析
report["religious_correlation"] = self.analyze_anxiety_religious_correlation()
# 关键发现
report["key_findings"] = self._extract_anxiety_findings(report)
return report
def _extract_anxiety_findings(self, report: Dict[str, Any]) -> List[str]:
"""提取生育焦虑关键发现"""
findings = []
# 焦虑水平发现
if "mean_anxiety" in report["anxiety_distribution"]:
mean_anxiety = report["anxiety_distribution"]["mean_anxiety"]
findings.append(f"北魏皇室平均生育焦虑评分为 {mean_anxiety:.2f},显示中等偏高的焦虑水平")
# 高焦虑皇帝发现
if "high_anxiety_count" in report["anxiety_distribution"]:
high_count = report["anxiety_distribution"]["high_anxiety_count"]
total_count = report["anxiety_distribution"]["total_emperors"]
findings.append(f"{high_count}/{total_count} 位皇帝表现出高度生育焦虑")
# 宗教活动相关性发现
if "anxiety_religious_correlation" in report["religious_correlation"]:
corr_data = report["religious_correlation"]["anxiety_religious_correlation"]
if corr_data["correlation"] > 0.3:
findings.append(f"生育焦虑与宗教活动呈正相关 (r={corr_data['correlation']:.3f}),支持宗教缓解焦虑假说")
return findings
# 创建分析器实例
fertility_analyzer = FertilityAnxietyAnalyzer()
def run_fertility_anxiety_analysis():
"""运行生育焦虑分析"""
print("开始北魏皇室生育焦虑量化分析...")
# 生成综合报告
report = fertility_analyzer.generate_comprehensive_anxiety_report()
print("\n=== 北魏皇室生育焦虑分析报告 ===")
print(f"分析时间: {report['analysis_date']}")
# 焦虑分布
dist = report['anxiety_distribution']
if 'error' not in dist:
print(f"\n平均生育焦虑评分: {dist['mean_anxiety']:.3f}")
print(f"高焦虑皇帝: {dist['high_anxiety_count']}/{dist['total_emperors']}")
print(f"高焦虑皇帝名单: {', '.join(dist['high_anxiety_emperors'])}")
# 相关性分析
if 'error' not in report['religious_correlation']:
rel_corr = report['religious_correlation']['anxiety_religious_correlation']
print(f"\n生育焦虑与宗教活动相关性: {rel_corr['correlation']:.3f}")
print(f"解释: {report['religious_correlation']['interpretation']}")
# 关键发现
print("\n=== 关键发现 ===")
for i, finding in enumerate(report['key_findings'], 1):
print(f"{i}. {finding}")
return report
if __name__ == "__main__":
report = run_fertility_anxiety_analysis()
# 生成可视化图表
chart_result = fertility_analyzer.generate_anxiety_visualization("fertility_anxiety_analysis.png")
print(f"\n{chart_result}")