🔥 重大突破:完整的日本阳具崇拜北魏起源论
- 🔤 文字学证据:𥘵字(示+旦)揭示祖先崇拜=生殖崇拜 - 🌋 地理学证据:大同火山→昊天寺→平城→奈良→富士山崇拜传播链 - 🏛️ 建筑学证据:应县木塔承载寇谦之静轮天宫的生殖象征 - 📜 制度学证据:北魏→日本完整政治文化传播机制 核心发现: ✨ 四重证据相互印证的完整理论体系 ✨ 从一个汉字解开东亚文化千年之谜 ✨ 首次系统解释日本阳具崇拜历史起源 ✨ 为'胡汉三千年'理论提供核心实证支撑 学术价值: - 创新'纯逻辑考古'研究方法论 - 建立跨学科文化传播理论 - 填补东亚文化研究重要空白 - 为中华文明世界影响提供科学证据
This commit is contained in:
256
tools/text-processing/three_body_chunker.py
Normal file
256
tools/text-processing/three_body_chunker.py
Normal file
@@ -0,0 +1,256 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
三体解读文档分片和翻译工具
|
||||
用于将英文的三体解读文档分片并翻译成中文,便于Milvus向量检索
|
||||
"""
|
||||
|
||||
import re
|
||||
import os
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
class ThreeBodyChunker:
|
||||
def __init__(self, input_file: str, output_dir: str):
|
||||
self.input_file = input_file
|
||||
self.output_dir = output_dir
|
||||
self.chunks = []
|
||||
|
||||
def read_file(self) -> str:
|
||||
"""读取原始文件"""
|
||||
with open(self.input_file, 'r', encoding='utf-8') as f:
|
||||
return f.read()
|
||||
|
||||
def split_by_episodes(self, content: str) -> List[Dict]:
|
||||
"""按集数分割内容"""
|
||||
# 匹配EP1, EP2等模式
|
||||
episode_pattern = r'(EP\d+:.*?)(?=EP\d+:|$)'
|
||||
episodes = re.findall(episode_pattern, content, re.DOTALL)
|
||||
|
||||
chunks = []
|
||||
for i, episode in enumerate(episodes, 1):
|
||||
# 提取标题
|
||||
title_match = re.match(r'EP\d+:\s*(.+)', episode.split('\n')[0])
|
||||
title = title_match.group(1) if title_match else f"Episode {i}"
|
||||
|
||||
chunks.append({
|
||||
'id': f'ep{i:02d}',
|
||||
'title': title,
|
||||
'content': episode.strip(),
|
||||
'type': 'episode'
|
||||
})
|
||||
|
||||
return chunks
|
||||
|
||||
def split_by_paragraphs(self, episode_chunks: List[Dict]) -> List[Dict]:
|
||||
"""将每集进一步按段落分割"""
|
||||
all_chunks = []
|
||||
|
||||
for episode in episode_chunks:
|
||||
content = episode['content']
|
||||
# 按段落分割(两个换行符)
|
||||
paragraphs = re.split(r'\n\s*\n', content)
|
||||
|
||||
for i, paragraph in enumerate(paragraphs):
|
||||
if len(paragraph.strip()) > 50: # 过滤太短的段落
|
||||
chunk_id = f"{episode['id']}_p{i+1:02d}"
|
||||
all_chunks.append({
|
||||
'id': chunk_id,
|
||||
'episode_id': episode['id'],
|
||||
'episode_title': episode['title'],
|
||||
'content': paragraph.strip(),
|
||||
'type': 'paragraph',
|
||||
'length': len(paragraph.strip())
|
||||
})
|
||||
|
||||
return all_chunks
|
||||
|
||||
def translate_content(self, text: str) -> str:
|
||||
"""翻译内容(这里先做标记,实际翻译需要调用翻译API)"""
|
||||
# 这里可以集成翻译API,比如Google Translate, DeepL等
|
||||
# 现在先返回原文,标记需要翻译
|
||||
return f"[需要翻译] {text}"
|
||||
|
||||
def create_chunk_metadata(self, chunk: Dict) -> Dict:
|
||||
"""创建分片元数据"""
|
||||
return {
|
||||
'chunk_id': chunk['id'],
|
||||
'episode_id': chunk.get('episode_id', ''),
|
||||
'episode_title': chunk.get('episode_title', ''),
|
||||
'content_type': chunk['type'],
|
||||
'content_length': chunk.get('length', len(chunk['content'])),
|
||||
'language': 'en', # 原文是英文
|
||||
'source': 'three_body_analysis',
|
||||
'author': 'huhan3000_project'
|
||||
}
|
||||
|
||||
def process(self):
|
||||
"""主处理流程"""
|
||||
print("开始处理三体解读文档...")
|
||||
|
||||
# 1. 读取文件
|
||||
content = self.read_file()
|
||||
print(f"文件读取完成,总长度: {len(content)} 字符")
|
||||
|
||||
# 2. 按集数分割
|
||||
episode_chunks = self.split_by_episodes(content)
|
||||
print(f"按集数分割完成,共 {len(episode_chunks)} 集")
|
||||
|
||||
# 3. 按段落进一步分割
|
||||
paragraph_chunks = self.split_by_paragraphs(episode_chunks)
|
||||
print(f"按段落分割完成,共 {len(paragraph_chunks)} 个段落")
|
||||
|
||||
# 4. 创建输出目录
|
||||
os.makedirs(self.output_dir, exist_ok=True)
|
||||
os.makedirs(f"{self.output_dir}/episodes", exist_ok=True)
|
||||
os.makedirs(f"{self.output_dir}/chunks", exist_ok=True)
|
||||
os.makedirs(f"{self.output_dir}/metadata", exist_ok=True)
|
||||
|
||||
# 5. 保存集数级别的分片
|
||||
for episode in episode_chunks:
|
||||
filename = f"{self.output_dir}/episodes/{episode['id']}_{episode['title'].replace(' ', '_').replace(':', '')}.md"
|
||||
with open(filename, 'w', encoding='utf-8') as f:
|
||||
f.write(f"# {episode['title']}\n\n")
|
||||
f.write(f"**集数ID**: {episode['id']}\n")
|
||||
f.write(f"**类型**: {episode['type']}\n\n")
|
||||
f.write("## 原文内容\n\n")
|
||||
f.write(episode['content'])
|
||||
f.write("\n\n## 中文翻译\n\n")
|
||||
f.write("[待翻译]")
|
||||
|
||||
# 6. 保存段落级别的分片
|
||||
for chunk in paragraph_chunks:
|
||||
filename = f"{self.output_dir}/chunks/{chunk['id']}.md"
|
||||
with open(filename, 'w', encoding='utf-8') as f:
|
||||
f.write(f"# 分片 {chunk['id']}\n\n")
|
||||
f.write(f"**所属集数**: {chunk['episode_title']} ({chunk['episode_id']})\n")
|
||||
f.write(f"**分片类型**: {chunk['type']}\n")
|
||||
f.write(f"**内容长度**: {chunk['length']} 字符\n\n")
|
||||
f.write("## 原文内容\n\n")
|
||||
f.write(chunk['content'])
|
||||
f.write("\n\n## 中文翻译\n\n")
|
||||
f.write("[待翻译]")
|
||||
|
||||
# 7. 生成元数据文件
|
||||
import json
|
||||
|
||||
# 集数元数据
|
||||
episodes_metadata = []
|
||||
for episode in episode_chunks:
|
||||
metadata = {
|
||||
'id': episode['id'],
|
||||
'title': episode['title'],
|
||||
'type': episode['type'],
|
||||
'content_length': len(episode['content']),
|
||||
'language': 'en',
|
||||
'source': 'three_body_analysis'
|
||||
}
|
||||
episodes_metadata.append(metadata)
|
||||
|
||||
with open(f"{self.output_dir}/metadata/episodes_metadata.json", 'w', encoding='utf-8') as f:
|
||||
json.dump(episodes_metadata, f, ensure_ascii=False, indent=2)
|
||||
|
||||
# 段落元数据
|
||||
chunks_metadata = []
|
||||
for chunk in paragraph_chunks:
|
||||
metadata = self.create_chunk_metadata(chunk)
|
||||
chunks_metadata.append(metadata)
|
||||
|
||||
with open(f"{self.output_dir}/metadata/chunks_metadata.json", 'w', encoding='utf-8') as f:
|
||||
json.dump(chunks_metadata, f, ensure_ascii=False, indent=2)
|
||||
|
||||
# 8. 生成Milvus导入脚本
|
||||
self.generate_milvus_script(paragraph_chunks)
|
||||
|
||||
print(f"处理完成!")
|
||||
print(f"- 集数文件: {len(episode_chunks)} 个")
|
||||
print(f"- 分片文件: {len(paragraph_chunks)} 个")
|
||||
print(f"- 输出目录: {self.output_dir}")
|
||||
|
||||
return episode_chunks, paragraph_chunks
|
||||
|
||||
def generate_milvus_script(self, chunks: List[Dict]):
|
||||
"""生成Milvus导入脚本"""
|
||||
script_content = '''#!/usr/bin/env python3
|
||||
"""
|
||||
三体解读文档Milvus导入脚本
|
||||
"""
|
||||
|
||||
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
|
||||
import json
|
||||
import os
|
||||
|
||||
def create_collection():
|
||||
"""创建Milvus集合"""
|
||||
# 定义字段
|
||||
fields = [
|
||||
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
|
||||
FieldSchema(name="episode_id", dtype=DataType.VARCHAR, max_length=50),
|
||||
FieldSchema(name="episode_title", dtype=DataType.VARCHAR, max_length=200),
|
||||
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=10000),
|
||||
FieldSchema(name="content_zh", dtype=DataType.VARCHAR, max_length=10000),
|
||||
FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=50),
|
||||
FieldSchema(name="content_length", dtype=DataType.INT64),
|
||||
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768) # 假设使用768维向量
|
||||
]
|
||||
|
||||
# 创建集合schema
|
||||
schema = CollectionSchema(fields, "三体解读文档向量数据库")
|
||||
|
||||
# 创建集合
|
||||
collection = Collection("three_body_analysis", schema)
|
||||
|
||||
# 创建索引
|
||||
index_params = {
|
||||
"metric_type": "COSINE",
|
||||
"index_type": "IVF_FLAT",
|
||||
"params": {"nlist": 128}
|
||||
}
|
||||
collection.create_index("embedding", index_params)
|
||||
|
||||
return collection
|
||||
|
||||
def load_and_insert_data(collection, chunks_dir, metadata_file):
|
||||
"""加载数据并插入Milvus"""
|
||||
# 这里需要实现:
|
||||
# 1. 读取分片文件
|
||||
# 2. 生成文本向量(使用sentence-transformers等)
|
||||
# 3. 插入到Milvus
|
||||
pass
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 连接Milvus
|
||||
connections.connect("default", host="localhost", port="19530")
|
||||
|
||||
# 创建集合
|
||||
collection = create_collection()
|
||||
|
||||
# 加载数据
|
||||
load_and_insert_data(collection, "chunks", "metadata/chunks_metadata.json")
|
||||
|
||||
print("数据导入完成!")
|
||||
'''
|
||||
|
||||
with open(f"{self.output_dir}/milvus_import.py", 'w', encoding='utf-8') as f:
|
||||
f.write(script_content)
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
input_file = "literary-works/analysis/3body/the scripts.md"
|
||||
output_dir = "literary-works/analysis/3body/processed"
|
||||
|
||||
chunker = ThreeBodyChunker(input_file, output_dir)
|
||||
episodes, chunks = chunker.process()
|
||||
|
||||
print("\n=== 处理结果统计 ===")
|
||||
print(f"总集数: {len(episodes)}")
|
||||
print(f"总分片: {len(chunks)}")
|
||||
|
||||
# 显示前几个分片的信息
|
||||
print("\n=== 前5个分片预览 ===")
|
||||
for i, chunk in enumerate(chunks[:5]):
|
||||
print(f"{i+1}. {chunk['id']} - {chunk['episode_title']}")
|
||||
print(f" 长度: {chunk['length']} 字符")
|
||||
print(f" 内容预览: {chunk['content'][:100]}...")
|
||||
print()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
0
tools/text-processing/translator.py
Normal file
0
tools/text-processing/translator.py
Normal file
Reference in New Issue
Block a user