huhan3000/tools/text-processing/three_body_chunker.py

256 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
三体解读文档分片和翻译工具
用于将英文的三体解读文档分片并翻译成中文便于Milvus向量检索
"""
import re
import os
from typing import List, Dict, Tuple
class ThreeBodyChunker:
def __init__(self, input_file: str, output_dir: str):
self.input_file = input_file
self.output_dir = output_dir
self.chunks = []
def read_file(self) -> str:
"""读取原始文件"""
with open(self.input_file, 'r', encoding='utf-8') as f:
return f.read()
def split_by_episodes(self, content: str) -> List[Dict]:
"""按集数分割内容"""
# 匹配EP1, EP2等模式
episode_pattern = r'(EP\d+:.*?)(?=EP\d+:|$)'
episodes = re.findall(episode_pattern, content, re.DOTALL)
chunks = []
for i, episode in enumerate(episodes, 1):
# 提取标题
title_match = re.match(r'EP\d+:\s*(.+)', episode.split('\n')[0])
title = title_match.group(1) if title_match else f"Episode {i}"
chunks.append({
'id': f'ep{i:02d}',
'title': title,
'content': episode.strip(),
'type': 'episode'
})
return chunks
def split_by_paragraphs(self, episode_chunks: List[Dict]) -> List[Dict]:
"""将每集进一步按段落分割"""
all_chunks = []
for episode in episode_chunks:
content = episode['content']
# 按段落分割(两个换行符)
paragraphs = re.split(r'\n\s*\n', content)
for i, paragraph in enumerate(paragraphs):
if len(paragraph.strip()) > 50: # 过滤太短的段落
chunk_id = f"{episode['id']}_p{i+1:02d}"
all_chunks.append({
'id': chunk_id,
'episode_id': episode['id'],
'episode_title': episode['title'],
'content': paragraph.strip(),
'type': 'paragraph',
'length': len(paragraph.strip())
})
return all_chunks
def translate_content(self, text: str) -> str:
"""翻译内容这里先做标记实际翻译需要调用翻译API"""
# 这里可以集成翻译API比如Google Translate, DeepL等
# 现在先返回原文,标记需要翻译
return f"[需要翻译] {text}"
def create_chunk_metadata(self, chunk: Dict) -> Dict:
"""创建分片元数据"""
return {
'chunk_id': chunk['id'],
'episode_id': chunk.get('episode_id', ''),
'episode_title': chunk.get('episode_title', ''),
'content_type': chunk['type'],
'content_length': chunk.get('length', len(chunk['content'])),
'language': 'en', # 原文是英文
'source': 'three_body_analysis',
'author': 'huhan3000_project'
}
def process(self):
"""主处理流程"""
print("开始处理三体解读文档...")
# 1. 读取文件
content = self.read_file()
print(f"文件读取完成,总长度: {len(content)} 字符")
# 2. 按集数分割
episode_chunks = self.split_by_episodes(content)
print(f"按集数分割完成,共 {len(episode_chunks)}")
# 3. 按段落进一步分割
paragraph_chunks = self.split_by_paragraphs(episode_chunks)
print(f"按段落分割完成,共 {len(paragraph_chunks)} 个段落")
# 4. 创建输出目录
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(f"{self.output_dir}/episodes", exist_ok=True)
os.makedirs(f"{self.output_dir}/chunks", exist_ok=True)
os.makedirs(f"{self.output_dir}/metadata", exist_ok=True)
# 5. 保存集数级别的分片
for episode in episode_chunks:
filename = f"{self.output_dir}/episodes/{episode['id']}_{episode['title'].replace(' ', '_').replace(':', '')}.md"
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"# {episode['title']}\n\n")
f.write(f"**集数ID**: {episode['id']}\n")
f.write(f"**类型**: {episode['type']}\n\n")
f.write("## 原文内容\n\n")
f.write(episode['content'])
f.write("\n\n## 中文翻译\n\n")
f.write("[待翻译]")
# 6. 保存段落级别的分片
for chunk in paragraph_chunks:
filename = f"{self.output_dir}/chunks/{chunk['id']}.md"
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"# 分片 {chunk['id']}\n\n")
f.write(f"**所属集数**: {chunk['episode_title']} ({chunk['episode_id']})\n")
f.write(f"**分片类型**: {chunk['type']}\n")
f.write(f"**内容长度**: {chunk['length']} 字符\n\n")
f.write("## 原文内容\n\n")
f.write(chunk['content'])
f.write("\n\n## 中文翻译\n\n")
f.write("[待翻译]")
# 7. 生成元数据文件
import json
# 集数元数据
episodes_metadata = []
for episode in episode_chunks:
metadata = {
'id': episode['id'],
'title': episode['title'],
'type': episode['type'],
'content_length': len(episode['content']),
'language': 'en',
'source': 'three_body_analysis'
}
episodes_metadata.append(metadata)
with open(f"{self.output_dir}/metadata/episodes_metadata.json", 'w', encoding='utf-8') as f:
json.dump(episodes_metadata, f, ensure_ascii=False, indent=2)
# 段落元数据
chunks_metadata = []
for chunk in paragraph_chunks:
metadata = self.create_chunk_metadata(chunk)
chunks_metadata.append(metadata)
with open(f"{self.output_dir}/metadata/chunks_metadata.json", 'w', encoding='utf-8') as f:
json.dump(chunks_metadata, f, ensure_ascii=False, indent=2)
# 8. 生成Milvus导入脚本
self.generate_milvus_script(paragraph_chunks)
print(f"处理完成!")
print(f"- 集数文件: {len(episode_chunks)}")
print(f"- 分片文件: {len(paragraph_chunks)}")
print(f"- 输出目录: {self.output_dir}")
return episode_chunks, paragraph_chunks
def generate_milvus_script(self, chunks: List[Dict]):
"""生成Milvus导入脚本"""
script_content = '''#!/usr/bin/env python3
"""
三体解读文档Milvus导入脚本
"""
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import json
import os
def create_collection():
"""创建Milvus集合"""
# 定义字段
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
FieldSchema(name="episode_id", dtype=DataType.VARCHAR, max_length=50),
FieldSchema(name="episode_title", dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=10000),
FieldSchema(name="content_zh", dtype=DataType.VARCHAR, max_length=10000),
FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=50),
FieldSchema(name="content_length", dtype=DataType.INT64),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768) # 假设使用768维向量
]
# 创建集合schema
schema = CollectionSchema(fields, "三体解读文档向量数据库")
# 创建集合
collection = Collection("three_body_analysis", schema)
# 创建索引
index_params = {
"metric_type": "COSINE",
"index_type": "IVF_FLAT",
"params": {"nlist": 128}
}
collection.create_index("embedding", index_params)
return collection
def load_and_insert_data(collection, chunks_dir, metadata_file):
"""加载数据并插入Milvus"""
# 这里需要实现:
# 1. 读取分片文件
# 2. 生成文本向量使用sentence-transformers等
# 3. 插入到Milvus
pass
if __name__ == "__main__":
# 连接Milvus
connections.connect("default", host="localhost", port="19530")
# 创建集合
collection = create_collection()
# 加载数据
load_and_insert_data(collection, "chunks", "metadata/chunks_metadata.json")
print("数据导入完成!")
'''
with open(f"{self.output_dir}/milvus_import.py", 'w', encoding='utf-8') as f:
f.write(script_content)
def main():
"""主函数"""
input_file = "literary-works/analysis/3body/the scripts.md"
output_dir = "literary-works/analysis/3body/processed"
chunker = ThreeBodyChunker(input_file, output_dir)
episodes, chunks = chunker.process()
print("\n=== 处理结果统计 ===")
print(f"总集数: {len(episodes)}")
print(f"总分片: {len(chunks)}")
# 显示前几个分片的信息
print("\n=== 前5个分片预览 ===")
for i, chunk in enumerate(chunks[:5]):
print(f"{i+1}. {chunk['id']} - {chunk['episode_title']}")
print(f" 长度: {chunk['length']} 字符")
print(f" 内容预览: {chunk['content'][:100]}...")
print()
if __name__ == "__main__":
main()