256 lines
9.5 KiB
Python
256 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
三体解读文档分片和翻译工具
|
||
用于将英文的三体解读文档分片并翻译成中文,便于Milvus向量检索
|
||
"""
|
||
|
||
import re
|
||
import os
|
||
from typing import List, Dict, Tuple
|
||
|
||
class ThreeBodyChunker:
|
||
def __init__(self, input_file: str, output_dir: str):
|
||
self.input_file = input_file
|
||
self.output_dir = output_dir
|
||
self.chunks = []
|
||
|
||
def read_file(self) -> str:
|
||
"""读取原始文件"""
|
||
with open(self.input_file, 'r', encoding='utf-8') as f:
|
||
return f.read()
|
||
|
||
def split_by_episodes(self, content: str) -> List[Dict]:
|
||
"""按集数分割内容"""
|
||
# 匹配EP1, EP2等模式
|
||
episode_pattern = r'(EP\d+:.*?)(?=EP\d+:|$)'
|
||
episodes = re.findall(episode_pattern, content, re.DOTALL)
|
||
|
||
chunks = []
|
||
for i, episode in enumerate(episodes, 1):
|
||
# 提取标题
|
||
title_match = re.match(r'EP\d+:\s*(.+)', episode.split('\n')[0])
|
||
title = title_match.group(1) if title_match else f"Episode {i}"
|
||
|
||
chunks.append({
|
||
'id': f'ep{i:02d}',
|
||
'title': title,
|
||
'content': episode.strip(),
|
||
'type': 'episode'
|
||
})
|
||
|
||
return chunks
|
||
|
||
def split_by_paragraphs(self, episode_chunks: List[Dict]) -> List[Dict]:
|
||
"""将每集进一步按段落分割"""
|
||
all_chunks = []
|
||
|
||
for episode in episode_chunks:
|
||
content = episode['content']
|
||
# 按段落分割(两个换行符)
|
||
paragraphs = re.split(r'\n\s*\n', content)
|
||
|
||
for i, paragraph in enumerate(paragraphs):
|
||
if len(paragraph.strip()) > 50: # 过滤太短的段落
|
||
chunk_id = f"{episode['id']}_p{i+1:02d}"
|
||
all_chunks.append({
|
||
'id': chunk_id,
|
||
'episode_id': episode['id'],
|
||
'episode_title': episode['title'],
|
||
'content': paragraph.strip(),
|
||
'type': 'paragraph',
|
||
'length': len(paragraph.strip())
|
||
})
|
||
|
||
return all_chunks
|
||
|
||
def translate_content(self, text: str) -> str:
|
||
"""翻译内容(这里先做标记,实际翻译需要调用翻译API)"""
|
||
# 这里可以集成翻译API,比如Google Translate, DeepL等
|
||
# 现在先返回原文,标记需要翻译
|
||
return f"[需要翻译] {text}"
|
||
|
||
def create_chunk_metadata(self, chunk: Dict) -> Dict:
|
||
"""创建分片元数据"""
|
||
return {
|
||
'chunk_id': chunk['id'],
|
||
'episode_id': chunk.get('episode_id', ''),
|
||
'episode_title': chunk.get('episode_title', ''),
|
||
'content_type': chunk['type'],
|
||
'content_length': chunk.get('length', len(chunk['content'])),
|
||
'language': 'en', # 原文是英文
|
||
'source': 'three_body_analysis',
|
||
'author': 'huhan3000_project'
|
||
}
|
||
|
||
def process(self):
|
||
"""主处理流程"""
|
||
print("开始处理三体解读文档...")
|
||
|
||
# 1. 读取文件
|
||
content = self.read_file()
|
||
print(f"文件读取完成,总长度: {len(content)} 字符")
|
||
|
||
# 2. 按集数分割
|
||
episode_chunks = self.split_by_episodes(content)
|
||
print(f"按集数分割完成,共 {len(episode_chunks)} 集")
|
||
|
||
# 3. 按段落进一步分割
|
||
paragraph_chunks = self.split_by_paragraphs(episode_chunks)
|
||
print(f"按段落分割完成,共 {len(paragraph_chunks)} 个段落")
|
||
|
||
# 4. 创建输出目录
|
||
os.makedirs(self.output_dir, exist_ok=True)
|
||
os.makedirs(f"{self.output_dir}/episodes", exist_ok=True)
|
||
os.makedirs(f"{self.output_dir}/chunks", exist_ok=True)
|
||
os.makedirs(f"{self.output_dir}/metadata", exist_ok=True)
|
||
|
||
# 5. 保存集数级别的分片
|
||
for episode in episode_chunks:
|
||
filename = f"{self.output_dir}/episodes/{episode['id']}_{episode['title'].replace(' ', '_').replace(':', '')}.md"
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
f.write(f"# {episode['title']}\n\n")
|
||
f.write(f"**集数ID**: {episode['id']}\n")
|
||
f.write(f"**类型**: {episode['type']}\n\n")
|
||
f.write("## 原文内容\n\n")
|
||
f.write(episode['content'])
|
||
f.write("\n\n## 中文翻译\n\n")
|
||
f.write("[待翻译]")
|
||
|
||
# 6. 保存段落级别的分片
|
||
for chunk in paragraph_chunks:
|
||
filename = f"{self.output_dir}/chunks/{chunk['id']}.md"
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
f.write(f"# 分片 {chunk['id']}\n\n")
|
||
f.write(f"**所属集数**: {chunk['episode_title']} ({chunk['episode_id']})\n")
|
||
f.write(f"**分片类型**: {chunk['type']}\n")
|
||
f.write(f"**内容长度**: {chunk['length']} 字符\n\n")
|
||
f.write("## 原文内容\n\n")
|
||
f.write(chunk['content'])
|
||
f.write("\n\n## 中文翻译\n\n")
|
||
f.write("[待翻译]")
|
||
|
||
# 7. 生成元数据文件
|
||
import json
|
||
|
||
# 集数元数据
|
||
episodes_metadata = []
|
||
for episode in episode_chunks:
|
||
metadata = {
|
||
'id': episode['id'],
|
||
'title': episode['title'],
|
||
'type': episode['type'],
|
||
'content_length': len(episode['content']),
|
||
'language': 'en',
|
||
'source': 'three_body_analysis'
|
||
}
|
||
episodes_metadata.append(metadata)
|
||
|
||
with open(f"{self.output_dir}/metadata/episodes_metadata.json", 'w', encoding='utf-8') as f:
|
||
json.dump(episodes_metadata, f, ensure_ascii=False, indent=2)
|
||
|
||
# 段落元数据
|
||
chunks_metadata = []
|
||
for chunk in paragraph_chunks:
|
||
metadata = self.create_chunk_metadata(chunk)
|
||
chunks_metadata.append(metadata)
|
||
|
||
with open(f"{self.output_dir}/metadata/chunks_metadata.json", 'w', encoding='utf-8') as f:
|
||
json.dump(chunks_metadata, f, ensure_ascii=False, indent=2)
|
||
|
||
# 8. 生成Milvus导入脚本
|
||
self.generate_milvus_script(paragraph_chunks)
|
||
|
||
print(f"处理完成!")
|
||
print(f"- 集数文件: {len(episode_chunks)} 个")
|
||
print(f"- 分片文件: {len(paragraph_chunks)} 个")
|
||
print(f"- 输出目录: {self.output_dir}")
|
||
|
||
return episode_chunks, paragraph_chunks
|
||
|
||
def generate_milvus_script(self, chunks: List[Dict]):
|
||
"""生成Milvus导入脚本"""
|
||
script_content = '''#!/usr/bin/env python3
|
||
"""
|
||
三体解读文档Milvus导入脚本
|
||
"""
|
||
|
||
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
|
||
import json
|
||
import os
|
||
|
||
def create_collection():
|
||
"""创建Milvus集合"""
|
||
# 定义字段
|
||
fields = [
|
||
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
|
||
FieldSchema(name="episode_id", dtype=DataType.VARCHAR, max_length=50),
|
||
FieldSchema(name="episode_title", dtype=DataType.VARCHAR, max_length=200),
|
||
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=10000),
|
||
FieldSchema(name="content_zh", dtype=DataType.VARCHAR, max_length=10000),
|
||
FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=50),
|
||
FieldSchema(name="content_length", dtype=DataType.INT64),
|
||
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768) # 假设使用768维向量
|
||
]
|
||
|
||
# 创建集合schema
|
||
schema = CollectionSchema(fields, "三体解读文档向量数据库")
|
||
|
||
# 创建集合
|
||
collection = Collection("three_body_analysis", schema)
|
||
|
||
# 创建索引
|
||
index_params = {
|
||
"metric_type": "COSINE",
|
||
"index_type": "IVF_FLAT",
|
||
"params": {"nlist": 128}
|
||
}
|
||
collection.create_index("embedding", index_params)
|
||
|
||
return collection
|
||
|
||
def load_and_insert_data(collection, chunks_dir, metadata_file):
|
||
"""加载数据并插入Milvus"""
|
||
# 这里需要实现:
|
||
# 1. 读取分片文件
|
||
# 2. 生成文本向量(使用sentence-transformers等)
|
||
# 3. 插入到Milvus
|
||
pass
|
||
|
||
if __name__ == "__main__":
|
||
# 连接Milvus
|
||
connections.connect("default", host="localhost", port="19530")
|
||
|
||
# 创建集合
|
||
collection = create_collection()
|
||
|
||
# 加载数据
|
||
load_and_insert_data(collection, "chunks", "metadata/chunks_metadata.json")
|
||
|
||
print("数据导入完成!")
|
||
'''
|
||
|
||
with open(f"{self.output_dir}/milvus_import.py", 'w', encoding='utf-8') as f:
|
||
f.write(script_content)
|
||
|
||
def main():
|
||
"""主函数"""
|
||
input_file = "literary-works/analysis/3body/the scripts.md"
|
||
output_dir = "literary-works/analysis/3body/processed"
|
||
|
||
chunker = ThreeBodyChunker(input_file, output_dir)
|
||
episodes, chunks = chunker.process()
|
||
|
||
print("\n=== 处理结果统计 ===")
|
||
print(f"总集数: {len(episodes)}")
|
||
print(f"总分片: {len(chunks)}")
|
||
|
||
# 显示前几个分片的信息
|
||
print("\n=== 前5个分片预览 ===")
|
||
for i, chunk in enumerate(chunks[:5]):
|
||
print(f"{i+1}. {chunk['id']} - {chunk['episode_title']}")
|
||
print(f" 长度: {chunk['length']} 字符")
|
||
print(f" 内容预览: {chunk['content'][:100]}...")
|
||
print()
|
||
|
||
if __name__ == "__main__":
|
||
main() |