This commit is contained in:
ben
2025-11-09 08:57:31 +00:00
parent 8c9cc2660a
commit bcec078c2c
249 changed files with 34877 additions and 0 deletions

View File

@@ -0,0 +1,288 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
胡汉三千年项目文档索引工具
功能:
1. 自动扫描文档目录
2. 生成文档索引
3. 更新统一索引文件
4. 检测文档变更
作者:胡汉三千年项目团队
版本1.0.0
"""
import os
import json
import hashlib
import datetime
from pathlib import Path
class DocumentIndexer:
def __init__(self, base_path="/home/ben/code/huhan3000/unified-docs"):
self.base_path = Path(base_path)
self.index_file = self.base_path / "unified-index.json"
self.categories = [
"01-core-theory",
"02-thematic-research",
"03-historical-analysis",
"04-methodology",
"05-applications",
"06-resources"
]
def scan_documents(self):
"""扫描所有文档目录,收集文档信息"""
documents = {}
for category in self.categories:
category_path = self.base_path / category
if not category_path.exists():
continue
documents[category] = []
# 扫描Markdown文件
for md_file in category_path.rglob("*.md"):
if md_file.name == "README.md":
continue
doc_info = self._get_document_info(md_file, category)
documents[category].append(doc_info)
return documents
def _get_document_info(self, file_path, category):
"""获取单个文档的详细信息"""
stat = file_path.stat()
# 计算文件哈希
file_hash = self._calculate_file_hash(file_path)
# 读取文件内容获取基本信息
title = file_path.stem
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 尝试从内容中提取标题
lines = content.split('\n')
for line in lines:
if line.startswith('# '):
title = line[2:].strip()
break
return {
"title": title,
"filename": file_path.name,
"path": str(file_path.relative_to(self.base_path)),
"category": category,
"size": stat.st_size,
"modified": datetime.datetime.fromtimestamp(stat.st_mtime).isoformat(),
"hash": file_hash,
"word_count": len(content.split())
}
def _calculate_file_hash(self, file_path):
"""计算文件内容的哈希值"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def generate_index(self, documents):
"""生成索引文件"""
index_data = {
"metadata": {
"generated_at": datetime.datetime.now().isoformat(),
"total_documents": sum(len(docs) for docs in documents.values()),
"tool_version": "1.0.0"
},
"categories": {},
"documents": documents,
"statistics": self._calculate_statistics(documents)
}
# 按类别统计
for category, docs in documents.items():
index_data["categories"][category] = {
"count": len(docs),
"total_size": sum(doc["size"] for doc in docs),
"last_modified": max(doc["modified"] for doc in docs) if docs else None
}
return index_data
def _calculate_statistics(self, documents):
"""计算文档统计信息"""
all_docs = []
for docs in documents.values():
all_docs.extend(docs)
if not all_docs:
return {
"total_documents": 0,
"total_size_bytes": 0,
"total_size_mb": 0,
"total_words": 0,
"average_words_per_doc": 0,
"last_modified": None
}
total_size = sum(doc["size"] for doc in all_docs)
total_words = sum(doc["word_count"] for doc in all_docs)
return {
"total_documents": len(all_docs),
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"total_words": total_words,
"average_words_per_doc": round(total_words / len(all_docs), 2),
"last_modified": max(doc["modified"] for doc in all_docs)
}
def save_index(self, index_data):
"""保存索引到文件"""
with open(self.index_file, 'w', encoding='utf-8') as f:
json.dump(index_data, f, ensure_ascii=False, indent=2)
def update_markdown_index(self, index_data):
"""更新Markdown格式的索引文件"""
md_index_file = self.base_path / "unified-index.md"
# 读取现有的Markdown索引
if md_index_file.exists():
with open(md_index_file, 'r', encoding='utf-8') as f:
content = f.read()
else:
content = ""
# 生成新的索引内容
new_content = self._generate_markdown_index(index_data)
# 更新文档迁移状态部分
updated_content = self._update_migration_status(content, new_content, index_data)
with open(md_index_file, 'w', encoding='utf-8') as f:
f.write(updated_content)
def _generate_markdown_index(self, index_data):
"""生成Markdown格式的索引内容"""
lines = []
# 统计信息
stats = index_data["statistics"]
lines.append("## 文档统计信息\n")
lines.append(f"- **总文档数**: {stats['total_documents']}")
lines.append(f"- **总大小**: {stats['total_size_mb']} MB")
lines.append(f"- **总字数**: {stats['total_words']:,}")
lines.append(f"- **平均每文档字数**: {stats['average_words_per_doc']}")
lines.append(f"- **最后更新时间**: {stats['last_modified']}\n")
# 按类别列出文档
for category, docs in index_data["documents"].items():
if docs:
lines.append(f"\n### {category.replace('-', ' ').title()}\n")
for doc in sorted(docs, key=lambda x: x["title"]):
lines.append(f"- **{doc['title']}** - `{doc['filename']}` ")
lines.append(f" - 大小: {round(doc['size']/1024, 1)} KB")
lines.append(f" - 字数: {doc['word_count']}")
lines.append(f" - 修改: {doc['modified'][:10]}")
return '\n'.join(lines)
def _update_migration_status(self, old_content, new_index_content, index_data):
"""更新文档迁移状态部分"""
# 查找文档迁移状态部分
migration_start = old_content.find("## 文档迁移状态")
if migration_start == -1:
# 如果没有找到,在适当位置插入
insert_pos = old_content.find("## 更新日志")
if insert_pos == -1:
insert_pos = len(old_content)
migration_content = self._generate_migration_status(index_data)
updated_content = old_content[:insert_pos] + "\n" + migration_content + "\n" + old_content[insert_pos:]
else:
# 替换现有的迁移状态部分
migration_end = old_content.find("##", migration_start + 1)
if migration_end == -1:
migration_end = len(old_content)
migration_content = self._generate_migration_status(index_data)
updated_content = old_content[:migration_start] + migration_content + old_content[migration_end:]
# 更新索引内容部分
index_start = updated_content.find("## 文档统计信息")
if index_start != -1:
index_end = updated_content.find("##", index_start + 1)
if index_end == -1:
index_end = len(updated_content)
updated_content = updated_content[:index_start] + new_index_content + updated_content[index_end:]
return updated_content
def _generate_migration_status(self, index_data):
"""生成文档迁移状态内容"""
lines = []
lines.append("## 文档迁移状态\n")
stats = index_data["statistics"]
total_migrated = stats["total_documents"]
# 估算core-docs和thematic-research中的文档数
estimated_core_docs = 399 # 根据之前的统计
estimated_thematic = 142 # 根据之前的统计
total_estimated = estimated_core_docs + estimated_thematic
migration_percentage = (total_migrated / total_estimated * 100) if total_estimated > 0 else 0
lines.append(f"### 迁移进度: {migration_percentage:.1f}%\n")
lines.append(f"- **已迁移文档**: {total_migrated}")
lines.append(f"- **预计总文档**: {total_estimated}")
lines.append(f"- **剩余文档**: {total_estimated - total_migrated}\n")
lines.append("### 按类别迁移情况\n")
for category, info in index_data["categories"].items():
lines.append(f"- **{category.replace('-', ' ').title()}**: {info['count']} 个文档")
return '\n'.join(lines)
def run(self):
"""运行索引工具"""
print("=== 胡汉三千年项目文档索引工具 ===")
print(f"扫描目录: {self.base_path}")
# 扫描文档
print("正在扫描文档...")
documents = self.scan_documents()
# 生成索引
print("正在生成索引...")
index_data = self.generate_index(documents)
# 保存JSON索引
print("正在保存索引文件...")
self.save_index(index_data)
# 更新Markdown索引
print("正在更新Markdown索引...")
self.update_markdown_index(index_data)
# 输出统计信息
stats = index_data["statistics"]
print(f"\n=== 索引完成 ===")
print(f"处理文档数: {stats['total_documents']}")
print(f"总大小: {stats['total_size_mb']} MB")
print(f"索引文件: {self.index_file}")
print(f"生成时间: {index_data['metadata']['generated_at']}")
def main():
"""主函数"""
indexer = DocumentIndexer()
indexer.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,373 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
胡汉三千年项目文档迁移工具
功能:
1. 从core-docs和thematic-research迁移文档到统一文档系统
2. 自动分类和组织文档
3. 保持文档结构和元数据
4. 生成迁移报告
作者:胡汉三千年项目团队
版本1.0.0
"""
import os
import json
import shutil
import hashlib
from pathlib import Path
import datetime
class DocumentMigrator:
def __init__(self):
self.base_path = Path("/home/ben/code/huhan3000")
self.unified_docs_path = self.base_path / "unified-docs"
self.core_docs_path = self.base_path / "core-docs"
self.thematic_research_path = self.base_path / "thematic-research"
# 分类映射规则
self.category_mapping = {
# core-docs 分类规则
"core-docs": {
"音韵考古学": "01-core-theory/01-phonological-archaeology",
"文明传播模型": "01-core-theory/02-civilization-diffusion",
"方法论体系": "01-core-theory/03-methodology",
"学术成果": "01-core-theory/04-academic-achievements",
"理论框架": "01-core-theory/05-theoretical-framework",
"实证研究": "02-thematic-research/01-empirical-studies",
"历史分析": "03-historical-analysis/01-historical-events",
"文化比较": "04-cultural-comparison/01-cross-cultural",
"技术实现": "05-technical-implementation/01-tools",
"项目文档": "06-project-docs/01-management"
},
# thematic-research 分类规则
"thematic-research": {
"civilization-studies": "02-thematic-research/02-civilization-studies",
"phonological-studies": "02-thematic-research/03-phonological-studies",
"commercial-studies": "02-thematic-research/04-commercial-studies",
"historical-studies": "03-historical-analysis/02-historical-studies",
"cultural-studies": "04-cultural-comparison/02-cultural-studies",
"theory-studies": "01-core-theory/06-theory-studies",
"methodology-studies": "01-core-theory/03-methodology",
"empirical-studies": "02-thematic-research/01-empirical-studies",
"comparative-studies": "04-cultural-comparison/03-comparative-studies"
}
}
# 文件扩展名映射
self.file_extensions = {
".md": "markdown",
".txt": "text",
".py": "python",
".json": "json",
".yaml": "yaml",
".yml": "yaml"
}
self.migration_report = {
"metadata": {
"migration_date": datetime.datetime.now().isoformat(),
"tool_version": "1.0.0"
},
"statistics": {
"total_files_scanned": 0,
"total_files_migrated": 0,
"total_files_skipped": 0,
"total_errors": 0
},
"migration_details": {
"core-docs": {"scanned": 0, "migrated": 0, "skipped": 0},
"thematic-research": {"scanned": 0, "migrated": 0, "skipped": 0}
},
"errors": [],
"migrated_files": []
}
def _calculate_file_hash(self, file_path):
"""计算文件内容的哈希值"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _get_file_category(self, source_type, file_path, content=None):
"""根据文件路径和内容确定分类"""
file_path_str = str(file_path)
# 首先尝试基于路径的分类
for keyword, target_category in self.category_mapping[source_type].items():
if keyword.lower() in file_path_str.lower():
return target_category
# 如果基于路径无法分类,尝试基于内容(如果提供了内容)
if content:
content_lower = content.lower()
# 关键词匹配
keyword_categories = {
"音韵": "01-core-theory/01-phonological-archaeology",
"文明": "01-core-theory/02-civilization-diffusion",
"方法": "01-core-theory/03-methodology",
"理论": "01-core-theory/05-theoretical-framework",
"实证": "02-thematic-research/01-empirical-studies",
"历史": "03-historical-analysis/01-historical-events",
"文化": "04-cultural-comparison/01-cross-cultural",
"技术": "05-technical-implementation/01-tools",
"项目": "06-project-docs/01-management"
}
for keyword, category in keyword_categories.items():
if keyword in content_lower:
return category
# 默认分类
if source_type == "core-docs":
return "01-core-theory/99-uncategorized"
else:
return "02-thematic-research/99-uncategorized"
def _ensure_directory(self, dir_path):
"""确保目录存在"""
dir_path.mkdir(parents=True, exist_ok=True)
def _copy_file_with_metadata(self, source_path, target_path):
"""复制文件并保持元数据"""
try:
# 复制文件
shutil.copy2(source_path, target_path)
# 获取文件信息
stat = source_path.stat()
file_info = {
"source_path": str(source_path),
"target_path": str(target_path),
"size": stat.st_size,
"modified_time": datetime.datetime.fromtimestamp(stat.st_mtime).isoformat(),
"hash": self._calculate_file_hash(source_path),
"file_type": self.file_extensions.get(source_path.suffix, "unknown")
}
return file_info
except Exception as e:
raise Exception(f"文件复制失败: {e}")
def _create_migration_metadata(self, source_path, target_path, category):
"""创建迁移元数据文件"""
metadata_path = target_path.with_suffix(target_path.suffix + ".metadata.json")
metadata = {
"original_source": str(source_path),
"migration_date": datetime.datetime.now().isoformat(),
"category": category,
"tool_version": "1.0.0"
}
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
def migrate_core_docs(self, dry_run=False):
"""迁移core-docs文档"""
print("开始迁移 core-docs 文档...")
migrated_files = []
# 扫描core-docs目录
for file_path in self.core_docs_path.rglob("*"):
if file_path.is_file() and file_path.suffix in [".md", ".txt", ".py", ".json"]:
self.migration_report["statistics"]["total_files_scanned"] += 1
self.migration_report["migration_details"]["core-docs"]["scanned"] += 1
try:
# 读取文件内容用于分类
content = None
if file_path.suffix in [".md", ".txt"]:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 确定目标分类
category = self._get_file_category("core-docs", file_path, content)
# 构建目标路径
relative_path = file_path.relative_to(self.core_docs_path)
target_dir = self.unified_docs_path / category
target_path = target_dir / relative_path.name
# 确保目标目录存在
self._ensure_directory(target_dir)
if not dry_run:
# 复制文件
file_info = self._copy_file_with_metadata(file_path, target_path)
# 创建元数据文件
self._create_migration_metadata(file_path, target_path, category)
file_info["category"] = category
migrated_files.append(file_info)
self.migration_report["statistics"]["total_files_migrated"] += 1
self.migration_report["migration_details"]["core-docs"]["migrated"] += 1
print(f"✓ 已迁移: {file_path.name} -> {category}")
else:
print(f"[模拟] 将迁移: {file_path.name} -> {category}")
except Exception as e:
error_msg = f"迁移失败 {file_path}: {e}"
self.migration_report["errors"].append(error_msg)
self.migration_report["statistics"]["total_errors"] += 1
self.migration_report["migration_details"]["core-docs"]["skipped"] += 1
print(f"{error_msg}")
return migrated_files
def migrate_thematic_research(self, dry_run=False):
"""迁移thematic-research文档"""
print("开始迁移 thematic-research 文档...")
migrated_files = []
# 扫描thematic-research目录
for file_path in self.thematic_research_path.rglob("*"):
if file_path.is_file() and file_path.suffix in [".md", ".txt", ".py", ".json"]:
self.migration_report["statistics"]["total_files_scanned"] += 1
self.migration_report["migration_details"]["thematic-research"]["scanned"] += 1
try:
# 读取文件内容用于分类
content = None
if file_path.suffix in [".md", ".txt"]:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 确定目标分类
category = self._get_file_category("thematic-research", file_path, content)
# 构建目标路径
relative_path = file_path.relative_to(self.thematic_research_path)
target_dir = self.unified_docs_path / category
target_path = target_dir / relative_path.name
# 确保目标目录存在
self._ensure_directory(target_dir)
if not dry_run:
# 复制文件
file_info = self._copy_file_with_metadata(file_path, target_path)
# 创建元数据文件
self._create_migration_metadata(file_path, target_path, category)
file_info["category"] = category
migrated_files.append(file_info)
self.migration_report["statistics"]["total_files_migrated"] += 1
self.migration_report["migration_details"]["thematic-research"]["migrated"] += 1
print(f"✓ 已迁移: {file_path.name} -> {category}")
else:
print(f"[模拟] 将迁移: {file_path.name} -> {category}")
except Exception as e:
error_msg = f"迁移失败 {file_path}: {e}"
self.migration_report["errors"].append(error_msg)
self.migration_report["statistics"]["total_errors"] += 1
self.migration_report["migration_details"]["thematic-research"]["skipped"] += 1
print(f"{error_msg}")
return migrated_files
def save_migration_report(self):
"""保存迁移报告"""
report_path = self.unified_docs_path / "migration-report.json"
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(self.migration_report, f, ensure_ascii=False, indent=2)
print(f"迁移报告已保存到: {report_path}")
return report_path
def print_summary(self):
"""打印迁移摘要"""
stats = self.migration_report["statistics"]
details = self.migration_report["migration_details"]
print("\n=== 迁移摘要 ===")
print(f"总扫描文件数: {stats['total_files_scanned']}")
print(f"总迁移文件数: {stats['total_files_migrated']}")
print(f"总跳过文件数: {stats['total_files_skipped']}")
print(f"总错误数: {stats['total_errors']}")
print("\n=== 详细统计 ===")
for source_type, detail in details.items():
print(f"{source_type}:")
print(f" 扫描: {detail['scanned']}")
print(f" 迁移: {detail['migrated']}")
print(f" 跳过: {detail['skipped']}")
if self.migration_report["errors"]:
print("\n=== 错误列表 ===")
for error in self.migration_report["errors"]:
print(f" - {error}")
def main():
"""主函数"""
import sys
migrator = DocumentMigrator()
if len(sys.argv) < 2:
print("用法:")
print(" python doc-migrator.py migrate [--dry-run]")
print(" python doc-migrator.py migrate-core [--dry-run]")
print(" python doc-migrator.py migrate-thematic [--dry-run]")
print(" python doc-migrator.py summary")
return
command = sys.argv[1]
dry_run = "--dry-run" in sys.argv
if command == "migrate":
print("开始完整迁移过程...")
# 迁移core-docs
migrator.migrate_core_docs(dry_run)
# 迁移thematic-research
migrator.migrate_thematic_research(dry_run)
# 保存报告
if not dry_run:
migrator.save_migration_report()
migrator.print_summary()
elif command == "migrate-core":
print("开始迁移 core-docs...")
migrator.migrate_core_docs(dry_run)
if not dry_run:
migrator.save_migration_report()
migrator.print_summary()
elif command == "migrate-thematic":
print("开始迁移 thematic-research...")
migrator.migrate_thematic_research(dry_run)
if not dry_run:
migrator.save_migration_report()
migrator.print_summary()
elif command == "summary":
migrator.print_summary()
else:
print(f"未知命令: {command}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,303 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
胡汉三千年项目文档搜索工具
功能:
1. 全文搜索文档内容
2. 按关键词检索
3. 按类别过滤
4. 支持模糊搜索
作者:胡汉三千年项目团队
版本1.0.0
"""
import os
import json
import re
from pathlib import Path
class DocumentSearcher:
def __init__(self, base_path="/home/ben/code/huhan3000/unified-docs"):
self.base_path = Path(base_path)
self.index_file = self.base_path / "unified-index.json"
self.index_data = self._load_index()
def _load_index(self):
"""加载索引文件"""
if not self.index_file.exists():
print("警告:索引文件不存在,请先运行文档索引工具")
return {"documents": {}}
with open(self.index_file, 'r', encoding='utf-8') as f:
return json.load(f)
def search_by_keyword(self, keyword, category=None, case_sensitive=False):
"""按关键词搜索文档"""
results = []
for cat, docs in self.index_data.get("documents", {}).items():
# 如果指定了类别,只搜索该类别
if category and cat != category:
continue
for doc in docs:
file_path = self.base_path / doc["path"]
if not file_path.exists():
continue
# 搜索文件内容
matches = self._search_in_file(file_path, keyword, case_sensitive)
if matches:
result = {
"document": doc,
"matches": matches,
"match_count": len(matches)
}
results.append(result)
# 按匹配数量排序
results.sort(key=lambda x: x["match_count"], reverse=True)
return results
def _search_in_file(self, file_path, keyword, case_sensitive):
"""在单个文件中搜索关键词"""
matches = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 构建搜索模式
if case_sensitive:
pattern = re.escape(keyword)
else:
pattern = re.escape(keyword)
flags = re.IGNORECASE
# 搜索关键词
for match in re.finditer(pattern, content, flags if not case_sensitive else 0):
start_line = content[:match.start()].count('\n') + 1
end_line = content[:match.end()].count('\n') + 1
# 获取匹配行的上下文
lines = content.split('\n')
context_start = max(0, start_line - 3)
context_end = min(len(lines), end_line + 3)
context = '\n'.join(lines[context_start:context_end])
matches.append({
"start_line": start_line,
"end_line": end_line,
"match_text": match.group(),
"context": context
})
except Exception as e:
print(f"搜索文件时出错 {file_path}: {e}")
return matches
def search_by_title(self, title_pattern, category=None):
"""按标题模式搜索文档"""
results = []
for cat, docs in self.index_data.get("documents", {}).items():
# 如果指定了类别,只搜索该类别
if category and cat != category:
continue
for doc in docs:
if re.search(title_pattern, doc["title"], re.IGNORECASE):
results.append({
"document": doc,
"match_type": "title",
"match_score": self._calculate_match_score(title_pattern, doc["title"])
})
# 按匹配分数排序
results.sort(key=lambda x: x["match_score"], reverse=True)
return results
def _calculate_match_score(self, pattern, text):
"""计算匹配分数"""
# 简单的匹配分数计算
if pattern.lower() in text.lower():
return 1.0
# 模糊匹配分数
pattern_words = set(pattern.lower().split())
text_words = set(text.lower().split())
if pattern_words.intersection(text_words):
return len(pattern_words.intersection(text_words)) / len(pattern_words)
return 0.0
def list_documents(self, category=None, sort_by="title"):
"""列出文档"""
documents = []
for cat, docs in self.index_data.get("documents", {}).items():
# 如果指定了类别,只列出该类别
if category and cat != category:
continue
documents.extend(docs)
# 排序
if sort_by == "title":
documents.sort(key=lambda x: x["title"])
elif sort_by == "modified":
documents.sort(key=lambda x: x["modified"], reverse=True)
elif sort_by == "size":
documents.sort(key=lambda x: x["size"], reverse=True)
return documents
def get_category_stats(self):
"""获取类别统计信息"""
return self.index_data.get("categories", {})
def get_overall_stats(self):
"""获取总体统计信息"""
return self.index_data.get("statistics", {})
def print_search_results(self, results, max_results=10):
"""打印搜索结果"""
if not results:
print("未找到匹配的文档")
return
print(f"找到 {len(results)} 个匹配结果:")
print("-" * 80)
for i, result in enumerate(results[:max_results]):
doc = result["document"]
print(f"{i+1}. {doc['title']}")
print(f" 文件: {doc['filename']}")
print(f" 类别: {doc['category']}")
print(f" 大小: {round(doc['size']/1024, 1)} KB")
print(f" 修改: {doc['modified'][:10]}")
if "matches" in result:
print(f" 匹配数: {result['match_count']}")
if result["match_count"] > 0:
match = result["matches"][0]
print(f" 示例匹配: 第{match['start_line']}行 - {match['match_text'][:50]}...")
print()
def interactive_search(self):
"""交互式搜索界面"""
print("=== 胡汉三千年项目文档搜索工具 ===")
print("输入 'quit' 退出搜索")
print("输入 'help' 查看帮助")
print("-" * 50)
while True:
try:
query = input("\n搜索关键词: ").strip()
if query.lower() == 'quit':
break
elif query.lower() == 'help':
self._print_help()
continue
elif not query:
continue
# 解析搜索选项
options = self._parse_search_options(query)
# 执行搜索
if options["search_type"] == "content":
results = self.search_by_keyword(
options["keyword"],
options["category"],
options["case_sensitive"]
)
else:
results = self.search_by_title(
options["keyword"],
options["category"]
)
self.print_search_results(results, options["max_results"])
except KeyboardInterrupt:
print("\n搜索已取消")
break
except Exception as e:
print(f"搜索出错: {e}")
def _parse_search_options(self, query):
"""解析搜索选项"""
options = {
"search_type": "content", # content 或 title
"keyword": query,
"category": None,
"case_sensitive": False,
"max_results": 10
}
# 简单的选项解析
if query.startswith("title:"):
options["search_type"] = "title"
options["keyword"] = query[6:].strip()
elif query.startswith("cat:"):
parts = query.split(" ")
if len(parts) >= 2:
options["category"] = parts[0][4:]
options["keyword"] = " ".join(parts[1:])
return options
def _print_help(self):
"""打印帮助信息"""
print("\n搜索语法:")
print(" 普通搜索: 关键词")
print(" 标题搜索: title:关键词")
print(" 类别搜索: cat:类别名 关键词")
print("\n可用类别:")
stats = self.get_category_stats()
for category, info in stats.items():
print(f" {category}: {info.get('count', 0)} 个文档")
print("\n示例:")
print(" 搜索音韵相关内容: 音韵")
print(" 搜索标题包含'蒙古'的文档: title:蒙古")
print(" 在核心理论中搜索'方法论': cat:01-core-theory 方法论")
def main():
"""主函数"""
import sys
searcher = DocumentSearcher()
if len(sys.argv) > 1:
# 命令行模式
query = " ".join(sys.argv[1:])
options = searcher._parse_search_options(query)
if options["search_type"] == "content":
results = searcher.search_by_keyword(
options["keyword"],
options["category"],
options["case_sensitive"]
)
else:
results = searcher.search_by_title(
options["keyword"],
options["category"]
)
searcher.print_search_results(results, options["max_results"])
else:
# 交互式模式
searcher.interactive_search()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,345 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
胡汉三千年项目文档版本管理工具
功能:
1. 文档版本控制
2. 变更记录管理
3. 版本比较和恢复
4. 变更统计
作者:胡汉三千年项目团队
版本1.0.0
"""
import os
import json
import hashlib
import datetime
import shutil
from pathlib import Path
class VersionManager:
def __init__(self, base_path="/home/ben/code/huhan3000/unified-docs"):
self.base_path = Path(base_path)
self.versions_dir = self.base_path / ".versions"
self.version_db = self.versions_dir / "version-db.json"
# 初始化版本目录
self.versions_dir.mkdir(exist_ok=True)
# 加载版本数据库
self.db = self._load_version_db()
def _load_version_db(self):
"""加载版本数据库"""
if self.version_db.exists():
with open(self.version_db, 'r', encoding='utf-8') as f:
return json.load(f)
else:
return {
"metadata": {
"created_at": datetime.datetime.now().isoformat(),
"last_updated": datetime.datetime.now().isoformat(),
"tool_version": "1.0.0"
},
"documents": {},
"statistics": {
"total_versions": 0,
"total_documents": 0,
"total_changes": 0
}
}
def _save_version_db(self):
"""保存版本数据库"""
self.db["metadata"]["last_updated"] = datetime.datetime.now().isoformat()
with open(self.version_db, 'w', encoding='utf-8') as f:
json.dump(self.db, f, ensure_ascii=False, indent=2)
def _calculate_file_hash(self, file_path):
"""计算文件内容的哈希值"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def create_version(self, document_path, comment=""):
"""为文档创建新版本"""
doc_path = Path(document_path)
if not doc_path.exists():
print(f"错误:文档不存在 {doc_path}")
return False
# 计算文件哈希
file_hash = self._calculate_file_hash(doc_path)
# 获取文档信息
stat = doc_path.stat()
doc_info = {
"path": str(doc_path.relative_to(self.base_path)),
"size": stat.st_size,
"modified": datetime.datetime.fromtimestamp(stat.st_mtime).isoformat(),
"hash": file_hash
}
# 检查文档是否已存在版本记录
doc_key = str(doc_path.relative_to(self.base_path))
if doc_key not in self.db["documents"]:
self.db["documents"][doc_key] = {
"versions": [],
"created_at": datetime.datetime.now().isoformat(),
"total_versions": 0
}
# 检查是否需要创建新版本(内容是否改变)
current_versions = self.db["documents"][doc_key]["versions"]
if current_versions:
last_version = current_versions[-1]
if last_version["hash"] == file_hash:
print(f"文档 {doc_key} 内容未改变,跳过版本创建")
return False
# 创建版本目录
version_id = f"v{len(current_versions) + 1:04d}"
version_dir = self.versions_dir / doc_key.replace('/', '_') / version_id
version_dir.mkdir(parents=True, exist_ok=True)
# 保存版本文件
version_file = version_dir / doc_path.name
shutil.copy2(doc_path, version_file)
# 记录版本信息
version_info = {
"version_id": version_id,
"timestamp": datetime.datetime.now().isoformat(),
"comment": comment,
"hash": file_hash,
"size": stat.st_size,
"file_path": str(version_file.relative_to(self.versions_dir))
}
self.db["documents"][doc_key]["versions"].append(version_info)
self.db["documents"][doc_key]["total_versions"] = len(current_versions) + 1
self.db["documents"][doc_key]["last_updated"] = datetime.datetime.now().isoformat()
# 更新统计信息
self.db["statistics"]["total_versions"] += 1
self.db["statistics"]["total_documents"] = len(self.db["documents"])
if len(current_versions) > 0:
self.db["statistics"]["total_changes"] += 1
self._save_version_db()
print(f"已为文档 {doc_key} 创建版本 {version_id}")
return True
def list_versions(self, document_path=None):
"""列出文档版本"""
if document_path:
doc_key = str(Path(document_path).relative_to(self.base_path))
if doc_key not in self.db["documents"]:
print(f"文档 {doc_key} 没有版本记录")
return []
return self.db["documents"][doc_key]["versions"]
else:
# 列出所有文档的版本信息
all_versions = []
for doc_key, doc_info in self.db["documents"].items():
for version in doc_info["versions"]:
version["document"] = doc_key
all_versions.append(version)
# 按时间排序
all_versions.sort(key=lambda x: x["timestamp"], reverse=True)
return all_versions
def compare_versions(self, document_path, version1, version2):
"""比较两个版本的差异"""
doc_key = str(Path(document_path).relative_to(self.base_path))
if doc_key not in self.db["documents"]:
print(f"文档 {doc_key} 没有版本记录")
return None
versions = self.db["documents"][doc_key]["versions"]
v1_info = next((v for v in versions if v["version_id"] == version1), None)
v2_info = next((v for v in versions if v["version_id"] == version2), None)
if not v1_info or not v2_info:
print(f"版本 {version1}{version2} 不存在")
return None
# 读取两个版本的内容
v1_path = self.versions_dir / v1_info["file_path"]
v2_path = self.versions_dir / v2_info["file_path"]
with open(v1_path, 'r', encoding='utf-8') as f:
v1_content = f.read()
with open(v2_path, 'r', encoding='utf-8') as f:
v2_content = f.read()
# 简单的差异比较
diff_result = {
"document": doc_key,
"versions": [version1, version2],
"size_change": v2_info["size"] - v1_info["size"],
"hash_changed": v1_info["hash"] != v2_info["hash"],
"line_count_change": len(v2_content.split('\n')) - len(v1_content.split('\n'))
}
return diff_result
def restore_version(self, document_path, version_id):
"""恢复文档到指定版本"""
doc_key = str(Path(document_path).relative_to(self.base_path))
if doc_key not in self.db["documents"]:
print(f"文档 {doc_key} 没有版本记录")
return False
versions = self.db["documents"][doc_key]["versions"]
version_info = next((v for v in versions if v["version_id"] == version_id), None)
if not version_info:
print(f"版本 {version_id} 不存在")
return False
# 备份当前版本
current_path = self.base_path / doc_key
if current_path.exists():
backup_path = current_path.with_suffix(current_path.suffix + ".backup")
shutil.copy2(current_path, backup_path)
print(f"当前版本已备份到 {backup_path}")
# 恢复指定版本
version_path = self.versions_dir / version_info["file_path"]
shutil.copy2(version_path, current_path)
print(f"文档 {doc_key} 已恢复到版本 {version_id}")
return True
def get_statistics(self):
"""获取版本管理统计信息"""
return self.db["statistics"]
def print_statistics(self):
"""打印统计信息"""
stats = self.get_statistics()
print("=== 版本管理统计 ===")
print(f"总文档数: {stats['total_documents']}")
print(f"总版本数: {stats['total_versions']}")
print(f"总变更次数: {stats['total_changes']}")
# 按文档统计
print("\n=== 文档版本统计 ===")
for doc_key, doc_info in self.db["documents"].items():
print(f"{doc_key}: {doc_info['total_versions']} 个版本")
def batch_create_versions(self, directory_path, comment=""):
"""批量创建文档版本"""
dir_path = Path(directory_path)
if not dir_path.exists():
print(f"目录不存在: {dir_path}")
return False
created_count = 0
# 扫描目录中的Markdown文件
for md_file in dir_path.rglob("*.md"):
if md_file.name == "README.md":
continue
if self.create_version(md_file, comment):
created_count += 1
print(f"批量创建完成,共创建 {created_count} 个新版本")
return True
def main():
"""主函数"""
import sys
manager = VersionManager()
if len(sys.argv) < 2:
print("用法:")
print(" python version-manager.py create <文档路径> [注释]")
print(" python version-manager.py list [文档路径]")
print(" python version-manager.py compare <文档路径> <版本1> <版本2>")
print(" python version-manager.py restore <文档路径> <版本>")
print(" python version-manager.py stats")
print(" python version-manager.py batch <目录路径> [注释]")
return
command = sys.argv[1]
if command == "create":
if len(sys.argv) < 3:
print("错误:需要指定文档路径")
return
doc_path = sys.argv[2]
comment = sys.argv[3] if len(sys.argv) > 3 else ""
manager.create_version(doc_path, comment)
elif command == "list":
doc_path = sys.argv[2] if len(sys.argv) > 2 else None
versions = manager.list_versions(doc_path)
if versions:
print(f"找到 {len(versions)} 个版本:")
for version in versions:
doc = version.get("document", "当前文档")
print(f" {version['version_id']} - {version['timestamp'][:19]} - {version['comment']} ({doc})")
else:
print("没有找到版本记录")
elif command == "compare":
if len(sys.argv) < 5:
print("错误:需要指定文档路径和两个版本号")
return
doc_path = sys.argv[2]
version1 = sys.argv[3]
version2 = sys.argv[4]
diff = manager.compare_versions(doc_path, version1, version2)
if diff:
print(f"版本比较结果 ({version1} -> {version2}):")
print(f" 大小变化: {diff['size_change']} 字节")
print(f" 哈希变化: {'' if diff['hash_changed'] else ''}")
print(f" 行数变化: {diff['line_count_change']}")
elif command == "restore":
if len(sys.argv) < 4:
print("错误:需要指定文档路径和版本号")
return
doc_path = sys.argv[2]
version_id = sys.argv[3]
manager.restore_version(doc_path, version_id)
elif command == "stats":
manager.print_statistics()
elif command == "batch":
if len(sys.argv) < 3:
print("错误:需要指定目录路径")
return
dir_path = sys.argv[2]
comment = sys.argv[3] if len(sys.argv) > 3 else "批量创建版本"
manager.batch_create_versions(dir_path, comment)
else:
print(f"未知命令: {command}")
if __name__ == "__main__":
main()