#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 快速文档归并工具 - 简化版 专门用于整理胡汉三千年项目的文档系统 """ import os import re import shutil import yaml from datetime import datetime from pathlib import Path class QuickDocMerger: def __init__(self, project_root="/home/ben/code/huhan3000"): self.project_root = Path(project_root) self.target_dir = self.project_root / "core-docs" self.backup_dir = self.project_root / "core-docs_backup" # 定义源目录映射 self.source_mapping = { "理论框架": [ "core-theory", "爹学整理中心", "KYM三链理论与符号传承整合.md" ], "孔子研究": [ "专题研究-孔子研究", "核心研究文档" ], "文化研究": [ "专题研究-文化研究", "专题研究-历史文化", "昆仑正音.md" ], "社会制度": [ "专题研究-社会制度", "专题研究-哲学思想", "专题研究-经济理论" ], "汇票研究": [ "汇票" ], "应用实践": [ "胡汉三千年项目", "丝路数字手语共识验证项目", "practical-applications" ], "学术成果": [ "academic-papers", "research", "thematic-research" ] } # 创建目标目录结构 self.setup_target_structure() def setup_target_structure(self): """创建目标目录结构""" categories = list(self.source_mapping.keys()) for category in categories: (self.target_dir / category).mkdir(parents=True, exist_ok=True) print(f"✅ 目标目录结构已创建: {self.target_dir}") def extract_title_from_content(self, content): """从内容中提取标题""" # 查找Markdown标题 title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE) if title_match: return title_match.group(1).strip() # 查找文件名中的标题 return None def generate_metadata(self, file_path, content): """生成文档元数据""" title = self.extract_title_from_content(content) if not title: title = file_path.stem.replace('_', ' ').replace('-', ' ') # 统计字数 word_count = len(re.findall(r'\b\w+\b', content)) return { 'title': title, 'source_path': str(file_path), 'created': datetime.now().strftime('%Y-%m-%d'), 'updated': datetime.now().strftime('%Y-%m-%d'), 'word_count': word_count, 'category': self.determine_category(file_path), 'tags': self.extract_tags(content) } def determine_category(self, file_path): """根据文件路径确定分类""" path_str = str(file_path) for category, sources in self.source_mapping.items(): for source in sources: if source in path_str: return category return "其他" def extract_tags(self, content): """从内容中提取关键词作为标签""" # 简单的关键词提取 keywords = [ '孔子', '商', '周', '音韵', '考古', '理论', '研究', '分析', '汇票', '阴间', '金融', '经济学', '文化', '社会', '制度', '爹学', 'KYM', '三链', '传播', '策略', '应用' ] found_tags = [] for keyword in keywords: if keyword in content: found_tags.append(keyword) return found_tags[:5] # 最多返回5个标签 def process_file(self, file_path): """处理单个文件""" try: # 读取文件内容 with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # 生成元数据 metadata = self.generate_metadata(file_path, content) category = metadata['category'] # 生成目标文件名 safe_title = re.sub(r'[^\w\u4e00-\u9fff-]', '_', metadata['title']) target_filename = f"{category}-{safe_title}.md" target_path = self.target_dir / category / target_filename # 写入新文件 with open(target_path, 'w', encoding='utf-8') as f: # 写入元数据 f.write("---\n") yaml.dump(metadata, f, allow_unicode=True, default_flow_style=False) f.write("---\n\n") # 写入内容 f.write(content) return True, metadata except Exception as e: return False, str(e) def scan_and_merge(self): """扫描并归并所有文档""" print("🔍 开始扫描文档...") stats = { 'total_files': 0, 'processed': 0, 'success': 0, 'errors': 0, 'categories': {} } # 遍历所有源目录 for category, sources in self.source_mapping.items(): stats['categories'][category] = 0 for source in sources: source_path = self.project_root / source if source_path.is_file(): # 处理单个文件 stats['total_files'] += 1 success, result = self.process_file(source_path) if success: stats['success'] += 1 stats['categories'][category] += 1 print(f"✅ 处理: {source}") else: stats['errors'] += 1 print(f"❌ 错误: {source} - {result}") elif source_path.is_dir(): # 处理目录 for root, dirs, files in os.walk(source_path): for file in files: if file.endswith('.md'): file_path = Path(root) / file stats['total_files'] += 1 success, result = self.process_file(file_path) if success: stats['success'] += 1 stats['categories'][category] += 1 print(f"✅ 处理: {file_path.relative_to(self.project_root)}") else: stats['errors'] += 1 print(f"❌ 错误: {file_path.relative_to(self.project_root)} - {result}") stats['processed'] = stats['success'] + stats['errors'] return stats def generate_index(self): """生成索引文件""" print("📚 生成索引文件...") index_content = "# 胡汉三千年项目文档索引\n\n" index_content += f"> 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" # 按分类生成索引 for category in self.source_mapping.keys(): category_dir = self.target_dir / category if category_dir.exists(): md_files = list(category_dir.glob("*.md")) if md_files: index_content += f"## {category} ({len(md_files)}篇)\n\n" for md_file in sorted(md_files): # 读取元数据 with open(md_file, 'r', encoding='utf-8') as f: content = f.read() metadata_match = re.search(r'^---\s*(.*?)\s*---', content, re.DOTALL) if metadata_match: try: metadata = yaml.safe_load(metadata_match.group(1)) title = metadata.get('title', md_file.stem) word_count = metadata.get('word_count', 0) relative_path = md_file.relative_to(self.target_dir) index_content += f"- [{title}]({relative_path}) - {word_count}字\n" except: relative_path = md_file.relative_to(self.target_dir) index_content += f"- [{md_file.stem}]({relative_path})\n" index_content += "\n" # 写入索引文件 index_file = self.target_dir / "README.md" with open(index_file, 'w', encoding='utf-8') as f: f.write(index_content) print(f"✅ 索引文件已生成: {index_file}") return index_file def create_backup(self): """创建备份""" if self.target_dir.exists(): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = self.backup_dir / f"backup_{timestamp}" shutil.copytree(self.target_dir, backup_path) print(f"✅ 备份已创建: {backup_path}") def run(self): """运行完整的归并流程""" print("🚀 开始文档归并流程...") print("=" * 50) # 创建备份 self.create_backup() # 扫描并归并文档 stats = self.scan_and_merge() print("=" * 50) print("📊 归并统计:") print(f" 总文件数: {stats['total_files']}") print(f" 成功处理: {stats['success']}") print(f" 处理错误: {stats['errors']}") print("\n📂 分类统计:") for category, count in stats['categories'].items(): if count > 0: print(f" {category}: {count}篇") # 生成索引 self.generate_index() print("=" * 50) print(f"🎉 文档归并完成!") print(f" 目标目录: {self.target_dir}") print(f" 索引文件: {self.target_dir / 'README.md'}") if __name__ == "__main__": merger = QuickDocMerger() merger.run()