#!/usr/bin/env python3 """ 三体解读文档分片和翻译工具 用于将英文的三体解读文档分片并翻译成中文,便于Milvus向量检索 """ import re import os from typing import List, Dict, Tuple class ThreeBodyChunker: def __init__(self, input_file: str, output_dir: str): self.input_file = input_file self.output_dir = output_dir self.chunks = [] def read_file(self) -> str: """读取原始文件""" with open(self.input_file, 'r', encoding='utf-8') as f: return f.read() def split_by_episodes(self, content: str) -> List[Dict]: """按集数分割内容""" # 匹配EP1, EP2等模式 episode_pattern = r'(EP\d+:.*?)(?=EP\d+:|$)' episodes = re.findall(episode_pattern, content, re.DOTALL) chunks = [] for i, episode in enumerate(episodes, 1): # 提取标题 title_match = re.match(r'EP\d+:\s*(.+)', episode.split('\n')[0]) title = title_match.group(1) if title_match else f"Episode {i}" chunks.append({ 'id': f'ep{i:02d}', 'title': title, 'content': episode.strip(), 'type': 'episode' }) return chunks def split_by_paragraphs(self, episode_chunks: List[Dict]) -> List[Dict]: """将每集进一步按段落分割""" all_chunks = [] for episode in episode_chunks: content = episode['content'] # 按段落分割(两个换行符) paragraphs = re.split(r'\n\s*\n', content) for i, paragraph in enumerate(paragraphs): if len(paragraph.strip()) > 50: # 过滤太短的段落 chunk_id = f"{episode['id']}_p{i+1:02d}" all_chunks.append({ 'id': chunk_id, 'episode_id': episode['id'], 'episode_title': episode['title'], 'content': paragraph.strip(), 'type': 'paragraph', 'length': len(paragraph.strip()) }) return all_chunks def translate_content(self, text: str) -> str: """翻译内容(这里先做标记,实际翻译需要调用翻译API)""" # 这里可以集成翻译API,比如Google Translate, DeepL等 # 现在先返回原文,标记需要翻译 return f"[需要翻译] {text}" def create_chunk_metadata(self, chunk: Dict) -> Dict: """创建分片元数据""" return { 'chunk_id': chunk['id'], 'episode_id': chunk.get('episode_id', ''), 'episode_title': chunk.get('episode_title', ''), 'content_type': chunk['type'], 'content_length': chunk.get('length', len(chunk['content'])), 'language': 'en', # 原文是英文 'source': 'three_body_analysis', 'author': 'huhan3000_project' } def process(self): """主处理流程""" print("开始处理三体解读文档...") # 1. 读取文件 content = self.read_file() print(f"文件读取完成,总长度: {len(content)} 字符") # 2. 按集数分割 episode_chunks = self.split_by_episodes(content) print(f"按集数分割完成,共 {len(episode_chunks)} 集") # 3. 按段落进一步分割 paragraph_chunks = self.split_by_paragraphs(episode_chunks) print(f"按段落分割完成,共 {len(paragraph_chunks)} 个段落") # 4. 创建输出目录 os.makedirs(self.output_dir, exist_ok=True) os.makedirs(f"{self.output_dir}/episodes", exist_ok=True) os.makedirs(f"{self.output_dir}/chunks", exist_ok=True) os.makedirs(f"{self.output_dir}/metadata", exist_ok=True) # 5. 保存集数级别的分片 for episode in episode_chunks: filename = f"{self.output_dir}/episodes/{episode['id']}_{episode['title'].replace(' ', '_').replace(':', '')}.md" with open(filename, 'w', encoding='utf-8') as f: f.write(f"# {episode['title']}\n\n") f.write(f"**集数ID**: {episode['id']}\n") f.write(f"**类型**: {episode['type']}\n\n") f.write("## 原文内容\n\n") f.write(episode['content']) f.write("\n\n## 中文翻译\n\n") f.write("[待翻译]") # 6. 保存段落级别的分片 for chunk in paragraph_chunks: filename = f"{self.output_dir}/chunks/{chunk['id']}.md" with open(filename, 'w', encoding='utf-8') as f: f.write(f"# 分片 {chunk['id']}\n\n") f.write(f"**所属集数**: {chunk['episode_title']} ({chunk['episode_id']})\n") f.write(f"**分片类型**: {chunk['type']}\n") f.write(f"**内容长度**: {chunk['length']} 字符\n\n") f.write("## 原文内容\n\n") f.write(chunk['content']) f.write("\n\n## 中文翻译\n\n") f.write("[待翻译]") # 7. 生成元数据文件 import json # 集数元数据 episodes_metadata = [] for episode in episode_chunks: metadata = { 'id': episode['id'], 'title': episode['title'], 'type': episode['type'], 'content_length': len(episode['content']), 'language': 'en', 'source': 'three_body_analysis' } episodes_metadata.append(metadata) with open(f"{self.output_dir}/metadata/episodes_metadata.json", 'w', encoding='utf-8') as f: json.dump(episodes_metadata, f, ensure_ascii=False, indent=2) # 段落元数据 chunks_metadata = [] for chunk in paragraph_chunks: metadata = self.create_chunk_metadata(chunk) chunks_metadata.append(metadata) with open(f"{self.output_dir}/metadata/chunks_metadata.json", 'w', encoding='utf-8') as f: json.dump(chunks_metadata, f, ensure_ascii=False, indent=2) # 8. 生成Milvus导入脚本 self.generate_milvus_script(paragraph_chunks) print(f"处理完成!") print(f"- 集数文件: {len(episode_chunks)} 个") print(f"- 分片文件: {len(paragraph_chunks)} 个") print(f"- 输出目录: {self.output_dir}") return episode_chunks, paragraph_chunks def generate_milvus_script(self, chunks: List[Dict]): """生成Milvus导入脚本""" script_content = '''#!/usr/bin/env python3 """ 三体解读文档Milvus导入脚本 """ from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType import json import os def create_collection(): """创建Milvus集合""" # 定义字段 fields = [ FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True), FieldSchema(name="episode_id", dtype=DataType.VARCHAR, max_length=50), FieldSchema(name="episode_title", dtype=DataType.VARCHAR, max_length=200), FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=10000), FieldSchema(name="content_zh", dtype=DataType.VARCHAR, max_length=10000), FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=50), FieldSchema(name="content_length", dtype=DataType.INT64), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768) # 假设使用768维向量 ] # 创建集合schema schema = CollectionSchema(fields, "三体解读文档向量数据库") # 创建集合 collection = Collection("three_body_analysis", schema) # 创建索引 index_params = { "metric_type": "COSINE", "index_type": "IVF_FLAT", "params": {"nlist": 128} } collection.create_index("embedding", index_params) return collection def load_and_insert_data(collection, chunks_dir, metadata_file): """加载数据并插入Milvus""" # 这里需要实现: # 1. 读取分片文件 # 2. 生成文本向量(使用sentence-transformers等) # 3. 插入到Milvus pass if __name__ == "__main__": # 连接Milvus connections.connect("default", host="localhost", port="19530") # 创建集合 collection = create_collection() # 加载数据 load_and_insert_data(collection, "chunks", "metadata/chunks_metadata.json") print("数据导入完成!") ''' with open(f"{self.output_dir}/milvus_import.py", 'w', encoding='utf-8') as f: f.write(script_content) def main(): """主函数""" input_file = "literary-works/analysis/3body/the scripts.md" output_dir = "literary-works/analysis/3body/processed" chunker = ThreeBodyChunker(input_file, output_dir) episodes, chunks = chunker.process() print("\n=== 处理结果统计 ===") print(f"总集数: {len(episodes)}") print(f"总分片: {len(chunks)}") # 显示前几个分片的信息 print("\n=== 前5个分片预览 ===") for i, chunk in enumerate(chunks[:5]): print(f"{i+1}. {chunk['id']} - {chunk['episode_title']}") print(f" 长度: {chunk['length']} 字符") print(f" 内容预览: {chunk['content'][:100]}...") print() if __name__ == "__main__": main()