""" 数据质量控制系统 实现史料来源验证、可靠性评分和多重史料交叉验证 """ import logging from typing import List, Dict, Any, Tuple, Optional from dataclasses import dataclass from enum import Enum import re from collections import Counter import statistics from analysis.models import ReliabilityLevel, Emperor, ReligiousBuilding, FolkCustom, CulturalTransmission logger = logging.getLogger(__name__) class ValidationResult(Enum): """验证结果""" PASS = "pass" WARNING = "warning" FAIL = "fail" @dataclass class QualityReport: """质量报告""" data_id: str data_type: str overall_score: float reliability_level: ReliabilityLevel validation_results: List[Dict[str, Any]] recommendations: List[str] cross_validation_status: bool class SourceValidator: """史料来源验证器""" # 可信史料来源等级 TRUSTED_SOURCES = { "high": [ "魏书", "北史", "资治通鉴", "竹书纪年", "山海经", "日本书纪", "古事记", "续日本纪", "元史", "明史" ], "medium": [ "太平御览", "册府元龟", "文献通考", "通典", "三国志", "晋书", "宋书", "南齐书" ], "low": [ "野史", "传说", "民间故事", "口传资料" ] } # 现代学术来源 ACADEMIC_SOURCES = { "high": [ "中国社会科学院", "北京大学", "清华大学", "复旦大学", "东京大学", "京都大学", "哈佛大学", "剑桥大学" ], "medium": [ "省级社科院", "重点大学", "专业研究机构" ] } def __init__(self): self.source_patterns = self._compile_source_patterns() def _compile_source_patterns(self) -> Dict[str, re.Pattern]: """编译史料来源识别模式""" patterns = {} # 古代史料模式 ancient_sources = [] for level_sources in self.TRUSTED_SOURCES.values(): ancient_sources.extend(level_sources) patterns['ancient'] = re.compile(f"({'|'.join(ancient_sources)})") # 现代学术模式 academic_sources = [] for level_sources in self.ACADEMIC_SOURCES.values(): academic_sources.extend(level_sources) patterns['academic'] = re.compile(f"({'|'.join(academic_sources)})") # 考古发现模式 patterns['archaeological'] = re.compile(r"考古|出土|发掘|遗址|文物") return patterns def validate_sources(self, sources: List[str]) -> Dict[str, Any]: """验证史料来源""" if not sources: return { "status": ValidationResult.FAIL, "score": 0.0, "message": "缺少史料来源", "source_analysis": {} } source_analysis = { "total_count": len(sources), "ancient_sources": 0, "academic_sources": 0, "archaeological_sources": 0, "reliability_distribution": Counter() } total_score = 0.0 for source in sources: source_score = self._evaluate_single_source(source) total_score += source_score['score'] # 统计来源类型 if source_score['type'] == 'ancient': source_analysis['ancient_sources'] += 1 elif source_score['type'] == 'academic': source_analysis['academic_sources'] += 1 elif source_score['type'] == 'archaeological': source_analysis['archaeological_sources'] += 1 source_analysis['reliability_distribution'][source_score['reliability']] += 1 average_score = total_score / len(sources) # 确定验证状态 if average_score >= 0.8: status = ValidationResult.PASS elif average_score >= 0.5: status = ValidationResult.WARNING else: status = ValidationResult.FAIL return { "status": status, "score": average_score, "message": f"平均史料可靠性评分: {average_score:.2f}", "source_analysis": source_analysis } def _evaluate_single_source(self, source: str) -> Dict[str, Any]: """评估单个史料来源""" source_lower = source.lower() # 检查古代史料 for reliability, source_list in self.TRUSTED_SOURCES.items(): if any(s in source for s in source_list): score_map = {"high": 1.0, "medium": 0.7, "low": 0.4} return { "score": score_map[reliability], "type": "ancient", "reliability": reliability } # 检查现代学术来源 for reliability, source_list in self.ACADEMIC_SOURCES.items(): if any(s in source for s in source_list): score_map = {"high": 0.9, "medium": 0.6} return { "score": score_map[reliability], "type": "academic", "reliability": reliability } # 检查考古来源 if self.source_patterns['archaeological'].search(source): return { "score": 0.8, "type": "archaeological", "reliability": "high" } # 未知来源 return { "score": 0.2, "type": "unknown", "reliability": "low" } class DataIntegrityChecker: """数据完整性检查器""" def __init__(self): self.required_fields = { "Emperor": ["name", "reign_period"], "ReligiousBuilding": ["name", "location", "construction_period"], "FolkCustom": ["name", "region", "historical_period"], "CulturalTransmission": ["source_region", "target_region", "transmission_period"] } def check_completeness(self, data: Any) -> Dict[str, Any]: """检查数据完整性""" data_type = type(data).__name__ required = self.required_fields.get(data_type, []) missing_fields = [] empty_fields = [] for field in required: if not hasattr(data, field): missing_fields.append(field) else: value = getattr(data, field) if value is None or (isinstance(value, (str, list)) and len(value) == 0): empty_fields.append(field) completeness_score = 1.0 - (len(missing_fields) + len(empty_fields)) / len(required) if completeness_score >= 0.9: status = ValidationResult.PASS elif completeness_score >= 0.7: status = ValidationResult.WARNING else: status = ValidationResult.FAIL return { "status": status, "score": completeness_score, "missing_fields": missing_fields, "empty_fields": empty_fields, "message": f"数据完整性: {completeness_score:.2%}" } def detect_anomalies(self, data: Any) -> Dict[str, Any]: """检测异常值""" anomalies = [] if isinstance(data, Emperor): # 检查皇帝寿命异常 if data.lifespan and (data.lifespan < 0 or data.lifespan > 120): anomalies.append(f"异常寿命: {data.lifespan}") # 检查在位时长异常 if data.reign_duration and (data.reign_duration < 0 or data.reign_duration > 80): anomalies.append(f"异常在位时长: {data.reign_duration}") # 检查子嗣数量异常 if data.offspring_count and (data.offspring_count < 0 or data.offspring_count > 50): anomalies.append(f"异常子嗣数量: {data.offspring_count}") elif isinstance(data, CulturalTransmission): # 检查传播时间跨度异常 if data.time_span and (data.time_span < 0 or data.time_span > 1000): anomalies.append(f"异常传播时间跨度: {data.time_span}") status = ValidationResult.FAIL if anomalies else ValidationResult.PASS return { "status": status, "anomalies": anomalies, "message": f"发现 {len(anomalies)} 个异常值" if anomalies else "未发现异常值" } class CrossValidator: """交叉验证器""" def __init__(self): self.validation_rules = self._load_validation_rules() def _load_validation_rules(self) -> Dict[str, List[str]]: """加载验证规则""" return { "emperor_lifespan": [ "北魏前期皇帝平均寿命应在25-30岁之间", "短寿现象应与史料记录一致", "生育焦虑评分应与子嗣数量负相关" ], "cultural_transmission": [ "传播时期应与历史事件时间线一致", "传播路径应符合地理逻辑", "文化载体应有史料支撑" ], "religious_building": [ "建造时期应与政治背景一致", "建筑功能应与文化需求匹配", "地理位置应符合选址逻辑" ] } def cross_validate_emperors(self, emperors: List[Emperor]) -> Dict[str, Any]: """交叉验证皇帝数据""" if len(emperors) < 3: return { "status": ValidationResult.WARNING, "message": "样本数量不足,无法进行有效交叉验证" } # 计算统计指标 lifespans = [emp.lifespan for emp in emperors if emp.lifespan] if not lifespans: return { "status": ValidationResult.FAIL, "message": "缺少寿命数据,无法进行交叉验证" } avg_lifespan = statistics.mean(lifespans) median_lifespan = statistics.median(lifespans) # 验证平均寿命是否符合预期 expected_range = (25, 30) lifespan_valid = expected_range[0] <= avg_lifespan <= expected_range[1] # 验证生育焦虑与子嗣数量的关系 fertility_correlation = self._calculate_fertility_correlation(emperors) validation_results = [] if lifespan_valid: validation_results.append({ "rule": "平均寿命范围验证", "status": ValidationResult.PASS, "message": f"平均寿命 {avg_lifespan:.1f} 岁符合预期范围" }) else: validation_results.append({ "rule": "平均寿命范围验证", "status": ValidationResult.WARNING, "message": f"平均寿命 {avg_lifespan:.1f} 岁超出预期范围 {expected_range}" }) if fertility_correlation < -0.3: validation_results.append({ "rule": "生育焦虑相关性验证", "status": ValidationResult.PASS, "message": f"生育焦虑与子嗣数量呈负相关 (r={fertility_correlation:.3f})" }) else: validation_results.append({ "rule": "生育焦虑相关性验证", "status": ValidationResult.WARNING, "message": f"生育焦虑与子嗣数量相关性不明显 (r={fertility_correlation:.3f})" }) overall_status = ValidationResult.PASS if any(result["status"] == ValidationResult.FAIL for result in validation_results): overall_status = ValidationResult.FAIL elif any(result["status"] == ValidationResult.WARNING for result in validation_results): overall_status = ValidationResult.WARNING return { "status": overall_status, "statistics": { "sample_size": len(emperors), "avg_lifespan": avg_lifespan, "median_lifespan": median_lifespan, "fertility_correlation": fertility_correlation }, "validation_results": validation_results } def _calculate_fertility_correlation(self, emperors: List[Emperor]) -> float: """计算生育焦虑与子嗣数量的相关性""" anxiety_scores = [] offspring_counts = [] for emp in emperors: if emp.fertility_anxiety_score is not None and emp.offspring_count is not None: anxiety_scores.append(emp.fertility_anxiety_score) offspring_counts.append(emp.offspring_count) if len(anxiety_scores) < 3: return 0.0 # 计算皮尔逊相关系数 n = len(anxiety_scores) sum_x = sum(anxiety_scores) sum_y = sum(offspring_counts) sum_xy = sum(x * y for x, y in zip(anxiety_scores, offspring_counts)) sum_x2 = sum(x * x for x in anxiety_scores) sum_y2 = sum(y * y for y in offspring_counts) numerator = n * sum_xy - sum_x * sum_y denominator = ((n * sum_x2 - sum_x * sum_x) * (n * sum_y2 - sum_y * sum_y)) ** 0.5 if denominator == 0: return 0.0 return numerator / denominator class QualityControlManager: """质量控制管理器""" def __init__(self): self.source_validator = SourceValidator() self.integrity_checker = DataIntegrityChecker() self.cross_validator = CrossValidator() def comprehensive_quality_check(self, data: Any, data_id: str = None) -> QualityReport: """综合质量检查""" data_type = type(data).__name__ data_id = data_id or f"{data_type}_{id(data)}" validation_results = [] recommendations = [] # 1. 史料来源验证 if hasattr(data, 'sources'): source_result = self.source_validator.validate_sources(data.sources) validation_results.append({ "category": "史料来源验证", "result": source_result }) if source_result["status"] != ValidationResult.PASS: recommendations.append("增加更多可靠的史料来源") # 2. 数据完整性检查 completeness_result = self.integrity_checker.check_completeness(data) validation_results.append({ "category": "数据完整性检查", "result": completeness_result }) if completeness_result["status"] != ValidationResult.PASS: recommendations.append("补充缺失的必要字段") # 3. 异常值检测 anomaly_result = self.integrity_checker.detect_anomalies(data) validation_results.append({ "category": "异常值检测", "result": anomaly_result }) if anomaly_result["status"] != ValidationResult.PASS: recommendations.append("检查并修正异常数据") # 4. 计算总体评分 scores = [] for result in validation_results: if 'score' in result['result']: scores.append(result['result']['score']) overall_score = statistics.mean(scores) if scores else 0.0 # 5. 确定可靠性等级 if overall_score >= 0.8: reliability_level = ReliabilityLevel.HIGH elif overall_score >= 0.6: reliability_level = ReliabilityLevel.MEDIUM elif overall_score >= 0.4: reliability_level = ReliabilityLevel.LOW else: reliability_level = ReliabilityLevel.UNCERTAIN # 6. 交叉验证状态 cross_validation_status = overall_score >= 0.6 return QualityReport( data_id=data_id, data_type=data_type, overall_score=overall_score, reliability_level=reliability_level, validation_results=validation_results, recommendations=recommendations, cross_validation_status=cross_validation_status ) def batch_quality_check(self, data_list: List[Any]) -> List[QualityReport]: """批量质量检查""" reports = [] for i, data in enumerate(data_list): report = self.comprehensive_quality_check(data, f"batch_{i}") reports.append(report) return reports def generate_quality_summary(self, reports: List[QualityReport]) -> Dict[str, Any]: """生成质量摘要报告""" if not reports: return {"message": "无数据报告"} reliability_distribution = Counter(report.reliability_level for report in reports) avg_score = statistics.mean(report.overall_score for report in reports) high_quality_count = sum(1 for report in reports if report.overall_score >= 0.8) low_quality_count = sum(1 for report in reports if report.overall_score < 0.4) return { "total_records": len(reports), "average_quality_score": avg_score, "reliability_distribution": dict(reliability_distribution), "high_quality_records": high_quality_count, "low_quality_records": low_quality_count, "quality_pass_rate": high_quality_count / len(reports), "recommendations": self._generate_batch_recommendations(reports) } def _generate_batch_recommendations(self, reports: List[QualityReport]) -> List[str]: """生成批量改进建议""" all_recommendations = [] for report in reports: all_recommendations.extend(report.recommendations) recommendation_counts = Counter(all_recommendations) # 返回最常见的建议 return [rec for rec, count in recommendation_counts.most_common(5)] # 全局质量控制管理器实例 quality_manager = QualityControlManager()