重构程序文件目录结构并更新相关路径引用
- 创建新的目录结构:research/、tools/(含子目录)和apps/ - 移动核心理论文件到research/core-theory/ - 移动天山理论文件到research/specialized/ - 重组tools/目录为多个子目录:content-generation/、data-processing/等 - 更新所有文档中的路径引用,包括README.md、项目结构说明.md等 - 更新工作流文件和脚本中的路径引用 - 更新文档索引文件中的路径引用
This commit is contained in:
173
tools/data-processing/image-processing/analyze_large_image.py
Normal file
173
tools/data-processing/image-processing/analyze_large_image.py
Normal file
@@ -0,0 +1,173 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
大型图像分析工具
|
||||
用于分析《三体》项目的复杂图表结构
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
import numpy as np
|
||||
from collections import Counter
|
||||
import json
|
||||
|
||||
def analyze_image_basic_info(image_path):
|
||||
"""分析图像基本信息"""
|
||||
print(f"正在分析图像: {image_path}")
|
||||
|
||||
try:
|
||||
# 使用更节省内存的方式打开图像
|
||||
Image.MAX_IMAGE_PIXELS = None # 移除像素数量限制
|
||||
|
||||
with Image.open(image_path) as img:
|
||||
print(f"图像格式: {img.format}")
|
||||
print(f"图像模式: {img.mode}")
|
||||
print(f"图像尺寸: {img.size} (宽 x 高)")
|
||||
print(f"总像素数: {img.size[0] * img.size[1]:,}")
|
||||
|
||||
# 计算文件大小
|
||||
file_size = os.path.getsize(image_path)
|
||||
print(f"文件大小: {file_size / (1024*1024):.1f} MB")
|
||||
|
||||
return img
|
||||
except Exception as e:
|
||||
print(f"打开图像时出错: {e}")
|
||||
return None
|
||||
|
||||
def analyze_image_colors(img, sample_size=1000):
|
||||
"""分析图像颜色分布"""
|
||||
print("\n=== 颜色分析 ===")
|
||||
|
||||
# 将图像转换为RGB模式(如果不是的话)
|
||||
if img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
|
||||
# 采样分析(对于大图像,采样会更快)
|
||||
pixels = list(img.getdata())
|
||||
if len(pixels) > sample_size * sample_size:
|
||||
# 均匀采样
|
||||
step = len(pixels) // (sample_size * sample_size)
|
||||
pixels = pixels[::step]
|
||||
|
||||
# 统计主要颜色
|
||||
color_counter = Counter(pixels)
|
||||
print(f"采样像素数: {len(pixels):,}")
|
||||
print("主要颜色 (RGB值, 出现次数):")
|
||||
|
||||
for color, count in color_counter.most_common(10):
|
||||
percentage = (count / len(pixels)) * 100
|
||||
print(f" RGB{color}: {count:,} 次 ({percentage:.1f}%)")
|
||||
|
||||
def detect_content_regions(img, threshold=240):
|
||||
"""检测图像中的内容区域"""
|
||||
print("\n=== 内容区域检测 ===")
|
||||
|
||||
# 转换为灰度图
|
||||
gray = img.convert('L')
|
||||
pixels = np.array(gray)
|
||||
|
||||
# 找到非白色区域(假设白色背景)
|
||||
non_white = pixels < threshold
|
||||
|
||||
# 找到边界
|
||||
rows, cols = np.where(non_white)
|
||||
if len(rows) > 0:
|
||||
min_row, max_row = rows.min(), rows.max()
|
||||
min_col, max_col = cols.min(), cols.max()
|
||||
|
||||
print(f"内容区域边界:")
|
||||
print(f" 行范围: {min_row} - {max_row} (高度: {max_row - min_row + 1})")
|
||||
print(f" 列范围: {min_col} - {max_col} (宽度: {max_col - min_col + 1})")
|
||||
|
||||
return (min_row, max_row, min_col, max_col)
|
||||
else:
|
||||
print("未检测到明显的内容区域")
|
||||
return None
|
||||
|
||||
def extract_text_regions(img, region_bounds=None):
|
||||
"""提取可能的文本区域"""
|
||||
print("\n=== 文本区域分析 ===")
|
||||
|
||||
# 如果指定了区域边界,只分析该区域
|
||||
if region_bounds:
|
||||
min_row, max_row, min_col, max_col = region_bounds
|
||||
img_cropped = img.crop((min_col, min_row, max_col, max_row))
|
||||
else:
|
||||
img_cropped = img
|
||||
|
||||
# 转换为灰度图
|
||||
gray = img_cropped.convert('L')
|
||||
pixels = np.array(gray)
|
||||
|
||||
# 简单的文本检测:寻找高对比度区域
|
||||
# 计算局部方差
|
||||
from scipy import ndimage
|
||||
|
||||
# 使用sobel算子检测边缘
|
||||
sobel_x = ndimage.sobel(pixels, axis=1)
|
||||
sobel_y = ndimage.sobel(pixels, axis=0)
|
||||
edges = np.sqrt(sobel_x**2 + sobel_y**2)
|
||||
|
||||
# 找到高边缘密度的区域
|
||||
edge_threshold = np.percentile(edges, 90)
|
||||
high_edge_regions = edges > edge_threshold
|
||||
|
||||
# 统计高边缘区域
|
||||
high_edge_pixels = np.sum(high_edge_regions)
|
||||
total_pixels = pixels.size
|
||||
|
||||
print(f"高边缘密度像素: {high_edge_pixels:,} / {total_pixels:,} ({high_edge_pixels/total_pixels*100:.1f}%)")
|
||||
|
||||
return high_edge_regions
|
||||
|
||||
def create_overview_image(img, output_path="overview.png", max_dimension=2000):
|
||||
"""创建图像概览(缩略图)"""
|
||||
print(f"\n=== 创建概览图像 ===")
|
||||
|
||||
# 计算缩放比例
|
||||
scale = min(max_dimension / img.size[0], max_dimension / img.size[1], 1.0)
|
||||
|
||||
if scale < 1.0:
|
||||
new_size = (int(img.size[0] * scale), int(img.size[1] * scale))
|
||||
print(f"缩放到: {new_size}")
|
||||
overview = img.resize(new_size, Image.Resampling.LANCZOS)
|
||||
else:
|
||||
overview = img.copy()
|
||||
|
||||
# 保存概览图
|
||||
overview.save(output_path)
|
||||
print(f"概览图已保存: {output_path}")
|
||||
|
||||
return overview
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
image_path = "/home/ben/code/huhan3000/3body/三体结构3.drawio.png"
|
||||
|
||||
print("=" * 50)
|
||||
print("《三体》项目大型图像分析工具")
|
||||
print("=" * 50)
|
||||
|
||||
# 分析基本信息
|
||||
img = analyze_image_basic_info(image_path)
|
||||
if img is None:
|
||||
return
|
||||
|
||||
# 分析颜色分布
|
||||
analyze_image_colors(img)
|
||||
|
||||
# 检测内容区域
|
||||
regions = detect_content_regions(img)
|
||||
|
||||
# 提取文本区域
|
||||
extract_text_regions(img, regions)
|
||||
|
||||
# 创建概览图
|
||||
create_overview_image(img, "/home/ben/code/huhan3000/3body/overview.png")
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print("分析完成!")
|
||||
print("=" * 50)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
274
tools/data-processing/image-processing/deepzoom_generator.py
Normal file
274
tools/data-processing/image-processing/deepzoom_generator.py
Normal file
@@ -0,0 +1,274 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Deep Zoom 瓦片集生成工具
|
||||
用于将高分辨率PNG图像转换为Deep Zoom格式,适用于三体项目的大型历史图表展示
|
||||
|
||||
使用方法:
|
||||
python deepzoom_generator.py --input <input_image.png> --output <output_dir> --tile_size <tile_size> --overlap <overlap>
|
||||
|
||||
参数说明:
|
||||
--input: 输入的PNG图像文件路径
|
||||
--output: 输出的Deep Zoom目录路径
|
||||
--tile_size: 瓦片大小,默认512
|
||||
--overlap: 瓦片重叠像素,默认1
|
||||
--format: 输出瓦片格式,支持jpg或png,默认jpg
|
||||
--quality: JPEG图像质量(1-100),默认90
|
||||
|
||||
示例:
|
||||
python deepzoom_generator.py --input "三体结构3.drawio.png" --output deepzoom_output
|
||||
"""
|
||||
|
||||
import os
|
||||
import argparse
|
||||
import math
|
||||
from PIL import Image
|
||||
from xml.dom import minidom
|
||||
import logging
|
||||
from tqdm import tqdm
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DeepZoomGenerator:
|
||||
"""Deep Zoom 瓦片集生成器类"""
|
||||
|
||||
def __init__(self, input_image_path, output_dir, tile_size=512, overlap=1,
|
||||
output_format='jpg', quality=90):
|
||||
"""
|
||||
初始化DeepZoomGenerator
|
||||
|
||||
参数:
|
||||
input_image_path: 输入图像路径
|
||||
output_dir: 输出目录路径
|
||||
tile_size: 瓦片大小
|
||||
overlap: 瓦片重叠像素
|
||||
output_format: 输出格式(jpg或png)
|
||||
quality: JPEG质量
|
||||
"""
|
||||
self.input_image_path = input_image_path
|
||||
self.output_dir = output_dir
|
||||
self.tile_size = tile_size
|
||||
self.overlap = overlap
|
||||
self.output_format = output_format.lower()
|
||||
self.quality = quality
|
||||
|
||||
# 验证参数
|
||||
self._validate_params()
|
||||
|
||||
# 创建输出目录
|
||||
self._create_output_dirs()
|
||||
|
||||
# 加载图像
|
||||
self.image = self._load_image()
|
||||
self.width, self.height = self.image.size
|
||||
|
||||
# 计算金字塔层级
|
||||
self.levels = self._calculate_levels()
|
||||
|
||||
logger.info(f"输入图像: {input_image_path}")
|
||||
logger.info(f"图像尺寸: {self.width}x{self.height}")
|
||||
logger.info(f"输出目录: {output_dir}")
|
||||
logger.info(f"瓦片大小: {tile_size}, 重叠像素: {overlap}")
|
||||
logger.info(f"输出格式: {output_format}")
|
||||
logger.info(f"金字塔层级: {self.levels}")
|
||||
|
||||
def _validate_params(self):
|
||||
"""验证输入参数"""
|
||||
# 检查输入图像是否存在
|
||||
if not os.path.exists(self.input_image_path):
|
||||
raise FileNotFoundError(f"输入图像文件不存在: {self.input_image_path}")
|
||||
|
||||
# 检查输出格式
|
||||
if self.output_format not in ['jpg', 'png']:
|
||||
raise ValueError(f"不支持的输出格式: {self.output_format},仅支持jpg和png")
|
||||
|
||||
# 检查质量参数
|
||||
if not (1 <= self.quality <= 100):
|
||||
raise ValueError(f"JPEG质量必须在1-100之间: {self.quality}")
|
||||
|
||||
# 检查瓦片大小
|
||||
if self.tile_size <= 0:
|
||||
raise ValueError(f"瓦片大小必须大于0: {self.tile_size}")
|
||||
|
||||
# 检查重叠像素
|
||||
if self.overlap < 0:
|
||||
raise ValueError(f"重叠像素不能为负数: {self.overlap}")
|
||||
|
||||
def _create_output_dirs(self):
|
||||
"""创建输出目录结构"""
|
||||
# 创建主输出目录
|
||||
os.makedirs(self.output_dir, exist_ok=True)
|
||||
|
||||
# 提取基本文件名(不含扩展名)
|
||||
base_name = os.path.splitext(os.path.basename(self.input_image_path))[0]
|
||||
|
||||
# 设置DZI文件名和瓦片目录
|
||||
self.dzi_filename = f"{base_name}.dzi"
|
||||
self.tiles_dir = f"{base_name}_files"
|
||||
self.tiles_dir_path = os.path.join(self.output_dir, self.tiles_dir)
|
||||
|
||||
# 创建瓦片目录
|
||||
os.makedirs(self.tiles_dir_path, exist_ok=True)
|
||||
|
||||
def _load_image(self):
|
||||
"""加载输入图像"""
|
||||
try:
|
||||
image = Image.open(self.input_image_path)
|
||||
# 确保图像为RGB模式
|
||||
if image.mode != 'RGB':
|
||||
image = image.convert('RGB')
|
||||
return image
|
||||
except Exception as e:
|
||||
raise IOError(f"无法加载图像: {e}")
|
||||
|
||||
def _calculate_levels(self):
|
||||
"""计算金字塔层级数量"""
|
||||
# 计算最大维度
|
||||
max_dim = max(self.width, self.height)
|
||||
# 计算需要的层级数,确保最小维度至少为1
|
||||
levels = math.floor(math.log2(max_dim)) + 1
|
||||
return levels
|
||||
|
||||
def _create_dzi_file(self):
|
||||
"""创建DZI XML文件"""
|
||||
# 创建XML文档
|
||||
doc = minidom.getDOMImplementation().createDocument(None, 'Image', None)
|
||||
root = doc.documentElement
|
||||
root.setAttribute('xmlns', 'http://schemas.microsoft.com/deepzoom/2008')
|
||||
root.setAttribute('Format', self.output_format)
|
||||
root.setAttribute('Overlap', str(self.overlap))
|
||||
root.setAttribute('TileSize', str(self.tile_size))
|
||||
|
||||
# 创建Size元素
|
||||
size_element = doc.createElement('Size')
|
||||
size_element.setAttribute('Height', str(self.height))
|
||||
size_element.setAttribute('Width', str(self.width))
|
||||
root.appendChild(size_element)
|
||||
|
||||
# 保存XML文件
|
||||
dzi_file_path = os.path.join(self.output_dir, self.dzi_filename)
|
||||
with open(dzi_file_path, 'w', encoding='utf-8') as f:
|
||||
root.writexml(f, indent=' ', addindent=' ', newl='\n')
|
||||
|
||||
logger.info(f"创建DZI文件: {dzi_file_path}")
|
||||
|
||||
def _generate_tiles(self):
|
||||
"""生成所有层级的瓦片"""
|
||||
current_image = self.image.copy()
|
||||
current_width, current_height = current_image.size
|
||||
|
||||
# 从最高分辨率到最低分辨率生成瓦片
|
||||
for level in range(self.levels):
|
||||
# 创建当前层级的目录
|
||||
level_dir = os.path.join(self.tiles_dir_path, str(level))
|
||||
os.makedirs(level_dir, exist_ok=True)
|
||||
|
||||
# 计算当前层级的瓦片数量
|
||||
tiles_x = max(1, math.ceil((current_width + 2 * self.overlap) / self.tile_size))
|
||||
tiles_y = max(1, math.ceil((current_height + 2 * self.overlap) / self.tile_size))
|
||||
|
||||
logger.info(f"生成层级 {level} 的瓦片: {tiles_x}x{tiles_y}")
|
||||
|
||||
# 使用tqdm创建进度条
|
||||
total_tiles = tiles_x * tiles_y
|
||||
with tqdm(total=total_tiles, desc=f"层级 {level}", unit="tile") as pbar:
|
||||
# 生成每个瓦片
|
||||
for y in range(tiles_y):
|
||||
for x in range(tiles_x):
|
||||
self._generate_single_tile(current_image, level, x, y, level_dir)
|
||||
pbar.update(1)
|
||||
|
||||
# 如果不是最后一层,缩小图像到下一层
|
||||
if level < self.levels - 1:
|
||||
new_width = max(1, current_width // 2)
|
||||
new_height = max(1, current_height // 2)
|
||||
current_image = current_image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||||
current_width, current_height = current_image.size
|
||||
|
||||
def _generate_single_tile(self, image, level, tile_x, tile_y, level_dir):
|
||||
"""生成单个瓦片"""
|
||||
width, height = image.size
|
||||
|
||||
# 计算瓦片在原图中的位置
|
||||
tile_size_no_overlap = self.tile_size - 2 * self.overlap
|
||||
start_x = max(0, tile_x * tile_size_no_overlap - self.overlap)
|
||||
start_y = max(0, tile_y * tile_size_no_overlap - self.overlap)
|
||||
|
||||
# 计算瓦片的实际大小
|
||||
end_x = min(width, start_x + self.tile_size)
|
||||
end_y = min(height, start_y + self.tile_size)
|
||||
actual_width = end_x - start_x
|
||||
actual_height = end_y - start_y
|
||||
|
||||
# 创建一个新的瓦片图像(空白背景)
|
||||
tile = Image.new('RGB', (self.tile_size, self.tile_size), color=(255, 255, 255))
|
||||
|
||||
# 从原图中裁剪瓦片区域
|
||||
tile_region = image.crop((start_x, start_y, end_x, end_y))
|
||||
|
||||
# 将裁剪的区域粘贴到瓦片上
|
||||
tile.paste(tile_region, (0, 0))
|
||||
|
||||
# 保存瓦片
|
||||
tile_filename = os.path.join(level_dir, f"{tile_x}_{tile_y}.{self.output_format}")
|
||||
|
||||
if self.output_format == 'jpg':
|
||||
tile.save(tile_filename, 'JPEG', quality=self.quality, optimize=True)
|
||||
else:
|
||||
tile.save(tile_filename, 'PNG', optimize=True)
|
||||
|
||||
def generate(self):
|
||||
"""生成完整的Deep Zoom瓦片集"""
|
||||
logger.info("开始生成Deep Zoom瓦片集...")
|
||||
|
||||
# 创建DZI文件
|
||||
self._create_dzi_file()
|
||||
|
||||
# 生成瓦片
|
||||
self._generate_tiles()
|
||||
|
||||
logger.info("Deep Zoom瓦片集生成完成!")
|
||||
logger.info(f"DZI文件: {os.path.join(self.output_dir, self.dzi_filename)}")
|
||||
logger.info(f"瓦片目录: {self.tiles_dir_path}")
|
||||
|
||||
|
||||
def parse_args():
|
||||
"""解析命令行参数"""
|
||||
parser = argparse.ArgumentParser(description='Deep Zoom瓦片集生成工具')
|
||||
parser.add_argument('--input', '-i', required=True, help='输入的PNG图像文件路径')
|
||||
parser.add_argument('--output', '-o', required=True, help='输出的Deep Zoom目录路径')
|
||||
parser.add_argument('--tile_size', '-t', type=int, default=512, help='瓦片大小,默认512')
|
||||
parser.add_argument('--overlap', '-l', type=int, default=1, help='瓦片重叠像素,默认1')
|
||||
parser.add_argument('--format', '-f', default='jpg', choices=['jpg', 'png'], help='输出瓦片格式,默认jpg')
|
||||
parser.add_argument('--quality', '-q', type=int, default=90, help='JPEG图像质量(1-100),默认90')
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
args = parse_args()
|
||||
|
||||
try:
|
||||
# 创建DeepZoomGenerator实例
|
||||
generator = DeepZoomGenerator(
|
||||
input_image_path=args.input,
|
||||
output_dir=args.output,
|
||||
tile_size=args.tile_size,
|
||||
overlap=args.overlap,
|
||||
output_format=args.format,
|
||||
quality=args.quality
|
||||
)
|
||||
|
||||
# 生成Deep Zoom瓦片集
|
||||
generator.generate()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"生成Deep Zoom瓦片集时出错: {e}")
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
250
tools/data-processing/image-processing/image_converter.py
Normal file
250
tools/data-processing/image-processing/image_converter.py
Normal file
@@ -0,0 +1,250 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
胡汉三千年项目 - 图像转换工具
|
||||
支持 PPM -> PNG/JPG/SVG 转换
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
|
||||
def install_requirements():
|
||||
"""安装必要的依赖包"""
|
||||
import subprocess
|
||||
|
||||
packages = [
|
||||
'Pillow>=10.0.0', # PIL的现代版本
|
||||
'opencv-python>=4.8.0', # OpenCV
|
||||
'svgwrite>=1.4.0', # SVG生成
|
||||
'numpy>=1.24.0', # 数值计算
|
||||
]
|
||||
|
||||
print("🔧 安装必要的Python包...")
|
||||
for package in packages:
|
||||
try:
|
||||
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
|
||||
print(f"✅ {package} 安装成功")
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"❌ {package} 安装失败: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
def convert_ppm_to_png(ppm_path, output_path=None, quality=95):
|
||||
"""将PPM文件转换为PNG格式"""
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
if output_path is None:
|
||||
output_path = str(Path(ppm_path).with_suffix('.png'))
|
||||
|
||||
# 打开PPM文件
|
||||
with Image.open(ppm_path) as img:
|
||||
# 转换为RGB模式(PPM通常是RGB)
|
||||
if img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
|
||||
# 保存为PNG
|
||||
img.save(output_path, 'PNG', optimize=True)
|
||||
print(f"✅ PPM -> PNG: {ppm_path} -> {output_path}")
|
||||
return output_path
|
||||
|
||||
except ImportError:
|
||||
print("❌ 需要安装 Pillow: pip install Pillow")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ 转换失败: {e}")
|
||||
return None
|
||||
|
||||
def convert_ppm_to_jpg(ppm_path, output_path=None, quality=95):
|
||||
"""将PPM文件转换为JPG格式"""
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
if output_path is None:
|
||||
output_path = str(Path(ppm_path).with_suffix('.jpg'))
|
||||
|
||||
with Image.open(ppm_path) as img:
|
||||
if img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
|
||||
img.save(output_path, 'JPEG', quality=quality, optimize=True)
|
||||
print(f"✅ PPM -> JPG: {ppm_path} -> {output_path}")
|
||||
return output_path
|
||||
|
||||
except ImportError:
|
||||
print("❌ 需要安装 Pillow: pip install Pillow")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ 转换失败: {e}")
|
||||
return None
|
||||
|
||||
def create_svg_template(image_path, output_path=None):
|
||||
"""为图像创建SVG模板"""
|
||||
try:
|
||||
import svgwrite
|
||||
from PIL import Image
|
||||
|
||||
if output_path is None:
|
||||
output_path = str(Path(image_path).with_suffix('.svg'))
|
||||
|
||||
# 获取图像尺寸
|
||||
with Image.open(image_path) as img:
|
||||
width, height = img.size
|
||||
|
||||
# 创建SVG文档
|
||||
dwg = svgwrite.Drawing(output_path, size=(f"{width}px", f"{height}px"))
|
||||
|
||||
# 添加背景矩形
|
||||
dwg.add(dwg.rect(insert=(0, 0), size=(width, height),
|
||||
fill='white', stroke='black', stroke_width=1))
|
||||
|
||||
# 添加标题
|
||||
dwg.add(dwg.text('胡汉三千年 - 图像模板',
|
||||
insert=(width//2, 30),
|
||||
text_anchor='middle',
|
||||
font_size=16,
|
||||
font_family='Arial'))
|
||||
|
||||
# 添加说明文字
|
||||
dwg.add(dwg.text('此SVG模板需要手动添加具体内容',
|
||||
insert=(width//2, height-30),
|
||||
text_anchor='middle',
|
||||
font_size=12,
|
||||
font_family='Arial'))
|
||||
|
||||
dwg.save()
|
||||
print(f"✅ SVG模板创建: {output_path}")
|
||||
return output_path
|
||||
|
||||
except ImportError:
|
||||
print("❌ 需要安装 svgwrite: pip install svgwrite")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ SVG创建失败: {e}")
|
||||
return None
|
||||
|
||||
def batch_convert_directory(directory_path, formats=['png', 'jpg']):
|
||||
"""批量转换目录中的所有PPM文件"""
|
||||
directory = Path(directory_path)
|
||||
if not directory.exists():
|
||||
print(f"❌ 目录不存在: {directory_path}")
|
||||
return
|
||||
|
||||
ppm_files = list(directory.rglob('*.ppm'))
|
||||
if not ppm_files:
|
||||
print(f"❌ 在 {directory_path} 中未找到PPM文件")
|
||||
return
|
||||
|
||||
print(f"🔍 找到 {len(ppm_files)} 个PPM文件")
|
||||
|
||||
converted_count = 0
|
||||
for ppm_file in ppm_files:
|
||||
print(f"\n📁 处理: {ppm_file}")
|
||||
|
||||
for format_type in formats:
|
||||
if format_type == 'png':
|
||||
result = convert_ppm_to_png(str(ppm_file))
|
||||
elif format_type == 'jpg':
|
||||
result = convert_ppm_to_jpg(str(ppm_file))
|
||||
elif format_type == 'svg':
|
||||
result = create_svg_template(str(ppm_file))
|
||||
|
||||
if result:
|
||||
converted_count += 1
|
||||
|
||||
print(f"\n🎉 批量转换完成! 成功转换 {converted_count} 个文件")
|
||||
|
||||
def analyze_image_content(image_path):
|
||||
"""分析图像内容并生成描述"""
|
||||
try:
|
||||
from PIL import Image
|
||||
import numpy as np
|
||||
|
||||
with Image.open(image_path) as img:
|
||||
width, height = img.size
|
||||
mode = img.mode
|
||||
|
||||
# 转换为numpy数组进行分析
|
||||
img_array = np.array(img)
|
||||
|
||||
print(f"📊 图像分析: {image_path}")
|
||||
print(f" 尺寸: {width} x {height}")
|
||||
print(f" 模式: {mode}")
|
||||
print(f" 数据类型: {img_array.dtype}")
|
||||
print(f" 形状: {img_array.shape}")
|
||||
|
||||
# 分析颜色分布
|
||||
if len(img_array.shape) == 3: # RGB图像
|
||||
unique_colors = len(np.unique(img_array.reshape(-1, img_array.shape[-1]), axis=0))
|
||||
print(f" 唯一颜色数: {unique_colors}")
|
||||
|
||||
return {
|
||||
'width': width,
|
||||
'height': height,
|
||||
'mode': mode,
|
||||
'shape': img_array.shape
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 图像分析失败: {e}")
|
||||
return None
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='胡汉三千年项目 - 图像转换工具')
|
||||
parser.add_argument('--install', action='store_true', help='安装必要的依赖包')
|
||||
parser.add_argument('--convert', type=str, help='转换单个PPM文件')
|
||||
parser.add_argument('--batch', type=str, help='批量转换目录中的所有PPM文件')
|
||||
parser.add_argument('--analyze', type=str, help='分析图像内容')
|
||||
parser.add_argument('--formats', nargs='+', default=['png', 'jpg'],
|
||||
help='转换格式 (png, jpg, svg)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.install:
|
||||
if install_requirements():
|
||||
print("🎉 所有依赖包安装完成!")
|
||||
else:
|
||||
print("❌ 依赖包安装失败")
|
||||
sys.exit(1)
|
||||
|
||||
elif args.convert:
|
||||
ppm_path = args.convert
|
||||
if not os.path.exists(ppm_path):
|
||||
print(f"❌ 文件不存在: {ppm_path}")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"🔄 转换文件: {ppm_path}")
|
||||
for format_type in args.formats:
|
||||
if format_type == 'png':
|
||||
convert_ppm_to_png(ppm_path)
|
||||
elif format_type == 'jpg':
|
||||
convert_ppm_to_jpg(ppm_path)
|
||||
elif format_type == 'svg':
|
||||
create_svg_template(ppm_path)
|
||||
|
||||
elif args.batch:
|
||||
print(f"🔄 批量转换目录: {args.batch}")
|
||||
batch_convert_directory(args.batch, args.formats)
|
||||
|
||||
elif args.analyze:
|
||||
image_path = args.analyze
|
||||
if not os.path.exists(image_path):
|
||||
print(f"❌ 文件不存在: {image_path}")
|
||||
sys.exit(1)
|
||||
|
||||
analyze_image_content(image_path)
|
||||
|
||||
else:
|
||||
print("🎯 胡汉三千年项目 - 图像转换工具")
|
||||
print("\n使用方法:")
|
||||
print(" python image_converter.py --install # 安装依赖")
|
||||
print(" python image_converter.py --convert file.ppm # 转换单个文件")
|
||||
print(" python image_converter.py --batch images/ # 批量转换目录")
|
||||
print(" python image_converter.py --analyze file.png # 分析图像")
|
||||
print(" python image_converter.py --formats png jpg svg # 指定转换格式")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
103
tools/data-processing/image-processing/quick_convert.py
Normal file
103
tools/data-processing/image-processing/quick_convert.py
Normal file
@@ -0,0 +1,103 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
快速图像转换脚本 - 专门处理胡汉三千年项目的PPM文件
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
def quick_install():
|
||||
"""快速安装依赖"""
|
||||
import subprocess
|
||||
|
||||
print("🔧 安装图像处理依赖...")
|
||||
packages = ['Pillow', 'svgwrite']
|
||||
|
||||
for package in packages:
|
||||
try:
|
||||
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
|
||||
print(f"✅ {package} 安装成功")
|
||||
except:
|
||||
print(f"❌ {package} 安装失败")
|
||||
|
||||
def convert_ppm_to_png_simple(ppm_path):
|
||||
"""简单的PPM到PNG转换"""
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
# 打开PPM文件
|
||||
img = Image.open(ppm_path)
|
||||
|
||||
# 转换为RGB(如果需要)
|
||||
if img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
|
||||
# 生成输出文件名
|
||||
output_path = str(Path(ppm_path).with_suffix('.png'))
|
||||
|
||||
# 保存为PNG
|
||||
img.save(output_path, 'PNG', optimize=True)
|
||||
|
||||
# 显示文件大小对比
|
||||
original_size = os.path.getsize(ppm_path) / (1024*1024) # MB
|
||||
new_size = os.path.getsize(output_path) / (1024*1024) # MB
|
||||
|
||||
print(f"✅ {Path(ppm_path).name} -> {Path(output_path).name}")
|
||||
print(f" 原始: {original_size:.1f}MB -> 转换后: {new_size:.1f}MB")
|
||||
print(f" 压缩率: {(1-new_size/original_size)*100:.1f}%")
|
||||
|
||||
return output_path
|
||||
|
||||
except ImportError:
|
||||
print("❌ 需要安装 Pillow: pip install Pillow")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ 转换失败: {e}")
|
||||
return None
|
||||
|
||||
def batch_convert_images():
|
||||
"""批量转换images目录下的所有PPM文件"""
|
||||
images_dir = Path("images")
|
||||
|
||||
if not images_dir.exists():
|
||||
print("❌ images目录不存在")
|
||||
return
|
||||
|
||||
# 查找所有PPM文件
|
||||
ppm_files = list(images_dir.rglob('*.ppm'))
|
||||
|
||||
if not ppm_files:
|
||||
print("❌ 未找到PPM文件")
|
||||
return
|
||||
|
||||
print(f"🔍 找到 {len(ppm_files)} 个PPM文件")
|
||||
|
||||
total_original_size = 0
|
||||
total_new_size = 0
|
||||
converted_count = 0
|
||||
|
||||
for ppm_file in ppm_files:
|
||||
print(f"\n📁 处理: {ppm_file.relative_to(images_dir)}")
|
||||
|
||||
result = convert_ppm_to_png_simple(str(ppm_file))
|
||||
if result:
|
||||
converted_count += 1
|
||||
total_original_size += os.path.getsize(str(ppm_file))
|
||||
total_new_size += os.path.getsize(result)
|
||||
|
||||
print(f"\n🎉 批量转换完成!")
|
||||
print(f" 转换文件数: {converted_count}/{len(ppm_files)}")
|
||||
print(f" 总大小: {total_original_size/(1024*1024):.1f}MB -> {total_new_size/(1024*1024):.1f}MB")
|
||||
print(f" 总体压缩率: {(1-total_new_size/total_original_size)*100:.1f}%")
|
||||
|
||||
def main():
|
||||
if len(sys.argv) > 1 and sys.argv[1] == '--install':
|
||||
quick_install()
|
||||
else:
|
||||
batch_convert_images()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
256
tools/data-processing/text-processing/three_body_chunker.py
Normal file
256
tools/data-processing/text-processing/three_body_chunker.py
Normal file
@@ -0,0 +1,256 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
三体解读文档分片和翻译工具
|
||||
用于将英文的三体解读文档分片并翻译成中文,便于Milvus向量检索
|
||||
"""
|
||||
|
||||
import re
|
||||
import os
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
class ThreeBodyChunker:
|
||||
def __init__(self, input_file: str, output_dir: str):
|
||||
self.input_file = input_file
|
||||
self.output_dir = output_dir
|
||||
self.chunks = []
|
||||
|
||||
def read_file(self) -> str:
|
||||
"""读取原始文件"""
|
||||
with open(self.input_file, 'r', encoding='utf-8') as f:
|
||||
return f.read()
|
||||
|
||||
def split_by_episodes(self, content: str) -> List[Dict]:
|
||||
"""按集数分割内容"""
|
||||
# 匹配EP1, EP2等模式
|
||||
episode_pattern = r'(EP\d+:.*?)(?=EP\d+:|$)'
|
||||
episodes = re.findall(episode_pattern, content, re.DOTALL)
|
||||
|
||||
chunks = []
|
||||
for i, episode in enumerate(episodes, 1):
|
||||
# 提取标题
|
||||
title_match = re.match(r'EP\d+:\s*(.+)', episode.split('\n')[0])
|
||||
title = title_match.group(1) if title_match else f"Episode {i}"
|
||||
|
||||
chunks.append({
|
||||
'id': f'ep{i:02d}',
|
||||
'title': title,
|
||||
'content': episode.strip(),
|
||||
'type': 'episode'
|
||||
})
|
||||
|
||||
return chunks
|
||||
|
||||
def split_by_paragraphs(self, episode_chunks: List[Dict]) -> List[Dict]:
|
||||
"""将每集进一步按段落分割"""
|
||||
all_chunks = []
|
||||
|
||||
for episode in episode_chunks:
|
||||
content = episode['content']
|
||||
# 按段落分割(两个换行符)
|
||||
paragraphs = re.split(r'\n\s*\n', content)
|
||||
|
||||
for i, paragraph in enumerate(paragraphs):
|
||||
if len(paragraph.strip()) > 50: # 过滤太短的段落
|
||||
chunk_id = f"{episode['id']}_p{i+1:02d}"
|
||||
all_chunks.append({
|
||||
'id': chunk_id,
|
||||
'episode_id': episode['id'],
|
||||
'episode_title': episode['title'],
|
||||
'content': paragraph.strip(),
|
||||
'type': 'paragraph',
|
||||
'length': len(paragraph.strip())
|
||||
})
|
||||
|
||||
return all_chunks
|
||||
|
||||
def translate_content(self, text: str) -> str:
|
||||
"""翻译内容(这里先做标记,实际翻译需要调用翻译API)"""
|
||||
# 这里可以集成翻译API,比如Google Translate, DeepL等
|
||||
# 现在先返回原文,标记需要翻译
|
||||
return f"[需要翻译] {text}"
|
||||
|
||||
def create_chunk_metadata(self, chunk: Dict) -> Dict:
|
||||
"""创建分片元数据"""
|
||||
return {
|
||||
'chunk_id': chunk['id'],
|
||||
'episode_id': chunk.get('episode_id', ''),
|
||||
'episode_title': chunk.get('episode_title', ''),
|
||||
'content_type': chunk['type'],
|
||||
'content_length': chunk.get('length', len(chunk['content'])),
|
||||
'language': 'en', # 原文是英文
|
||||
'source': 'three_body_analysis',
|
||||
'author': 'huhan3000_project'
|
||||
}
|
||||
|
||||
def process(self):
|
||||
"""主处理流程"""
|
||||
print("开始处理三体解读文档...")
|
||||
|
||||
# 1. 读取文件
|
||||
content = self.read_file()
|
||||
print(f"文件读取完成,总长度: {len(content)} 字符")
|
||||
|
||||
# 2. 按集数分割
|
||||
episode_chunks = self.split_by_episodes(content)
|
||||
print(f"按集数分割完成,共 {len(episode_chunks)} 集")
|
||||
|
||||
# 3. 按段落进一步分割
|
||||
paragraph_chunks = self.split_by_paragraphs(episode_chunks)
|
||||
print(f"按段落分割完成,共 {len(paragraph_chunks)} 个段落")
|
||||
|
||||
# 4. 创建输出目录
|
||||
os.makedirs(self.output_dir, exist_ok=True)
|
||||
os.makedirs(f"{self.output_dir}/episodes", exist_ok=True)
|
||||
os.makedirs(f"{self.output_dir}/chunks", exist_ok=True)
|
||||
os.makedirs(f"{self.output_dir}/metadata", exist_ok=True)
|
||||
|
||||
# 5. 保存集数级别的分片
|
||||
for episode in episode_chunks:
|
||||
filename = f"{self.output_dir}/episodes/{episode['id']}_{episode['title'].replace(' ', '_').replace(':', '')}.md"
|
||||
with open(filename, 'w', encoding='utf-8') as f:
|
||||
f.write(f"# {episode['title']}\n\n")
|
||||
f.write(f"**集数ID**: {episode['id']}\n")
|
||||
f.write(f"**类型**: {episode['type']}\n\n")
|
||||
f.write("## 原文内容\n\n")
|
||||
f.write(episode['content'])
|
||||
f.write("\n\n## 中文翻译\n\n")
|
||||
f.write("[待翻译]")
|
||||
|
||||
# 6. 保存段落级别的分片
|
||||
for chunk in paragraph_chunks:
|
||||
filename = f"{self.output_dir}/chunks/{chunk['id']}.md"
|
||||
with open(filename, 'w', encoding='utf-8') as f:
|
||||
f.write(f"# 分片 {chunk['id']}\n\n")
|
||||
f.write(f"**所属集数**: {chunk['episode_title']} ({chunk['episode_id']})\n")
|
||||
f.write(f"**分片类型**: {chunk['type']}\n")
|
||||
f.write(f"**内容长度**: {chunk['length']} 字符\n\n")
|
||||
f.write("## 原文内容\n\n")
|
||||
f.write(chunk['content'])
|
||||
f.write("\n\n## 中文翻译\n\n")
|
||||
f.write("[待翻译]")
|
||||
|
||||
# 7. 生成元数据文件
|
||||
import json
|
||||
|
||||
# 集数元数据
|
||||
episodes_metadata = []
|
||||
for episode in episode_chunks:
|
||||
metadata = {
|
||||
'id': episode['id'],
|
||||
'title': episode['title'],
|
||||
'type': episode['type'],
|
||||
'content_length': len(episode['content']),
|
||||
'language': 'en',
|
||||
'source': 'three_body_analysis'
|
||||
}
|
||||
episodes_metadata.append(metadata)
|
||||
|
||||
with open(f"{self.output_dir}/metadata/episodes_metadata.json", 'w', encoding='utf-8') as f:
|
||||
json.dump(episodes_metadata, f, ensure_ascii=False, indent=2)
|
||||
|
||||
# 段落元数据
|
||||
chunks_metadata = []
|
||||
for chunk in paragraph_chunks:
|
||||
metadata = self.create_chunk_metadata(chunk)
|
||||
chunks_metadata.append(metadata)
|
||||
|
||||
with open(f"{self.output_dir}/metadata/chunks_metadata.json", 'w', encoding='utf-8') as f:
|
||||
json.dump(chunks_metadata, f, ensure_ascii=False, indent=2)
|
||||
|
||||
# 8. 生成Milvus导入脚本
|
||||
self.generate_milvus_script(paragraph_chunks)
|
||||
|
||||
print(f"处理完成!")
|
||||
print(f"- 集数文件: {len(episode_chunks)} 个")
|
||||
print(f"- 分片文件: {len(paragraph_chunks)} 个")
|
||||
print(f"- 输出目录: {self.output_dir}")
|
||||
|
||||
return episode_chunks, paragraph_chunks
|
||||
|
||||
def generate_milvus_script(self, chunks: List[Dict]):
|
||||
"""生成Milvus导入脚本"""
|
||||
script_content = '''#!/usr/bin/env python3
|
||||
"""
|
||||
三体解读文档Milvus导入脚本
|
||||
"""
|
||||
|
||||
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
|
||||
import json
|
||||
import os
|
||||
|
||||
def create_collection():
|
||||
"""创建Milvus集合"""
|
||||
# 定义字段
|
||||
fields = [
|
||||
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
|
||||
FieldSchema(name="episode_id", dtype=DataType.VARCHAR, max_length=50),
|
||||
FieldSchema(name="episode_title", dtype=DataType.VARCHAR, max_length=200),
|
||||
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=10000),
|
||||
FieldSchema(name="content_zh", dtype=DataType.VARCHAR, max_length=10000),
|
||||
FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=50),
|
||||
FieldSchema(name="content_length", dtype=DataType.INT64),
|
||||
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768) # 假设使用768维向量
|
||||
]
|
||||
|
||||
# 创建集合schema
|
||||
schema = CollectionSchema(fields, "三体解读文档向量数据库")
|
||||
|
||||
# 创建集合
|
||||
collection = Collection("three_body_analysis", schema)
|
||||
|
||||
# 创建索引
|
||||
index_params = {
|
||||
"metric_type": "COSINE",
|
||||
"index_type": "IVF_FLAT",
|
||||
"params": {"nlist": 128}
|
||||
}
|
||||
collection.create_index("embedding", index_params)
|
||||
|
||||
return collection
|
||||
|
||||
def load_and_insert_data(collection, chunks_dir, metadata_file):
|
||||
"""加载数据并插入Milvus"""
|
||||
# 这里需要实现:
|
||||
# 1. 读取分片文件
|
||||
# 2. 生成文本向量(使用sentence-transformers等)
|
||||
# 3. 插入到Milvus
|
||||
pass
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 连接Milvus
|
||||
connections.connect("default", host="localhost", port="19530")
|
||||
|
||||
# 创建集合
|
||||
collection = create_collection()
|
||||
|
||||
# 加载数据
|
||||
load_and_insert_data(collection, "chunks", "metadata/chunks_metadata.json")
|
||||
|
||||
print("数据导入完成!")
|
||||
'''
|
||||
|
||||
with open(f"{self.output_dir}/milvus_import.py", 'w', encoding='utf-8') as f:
|
||||
f.write(script_content)
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
input_file = "literary-works/analysis/3body/the scripts.md"
|
||||
output_dir = "literary-works/analysis/3body/processed"
|
||||
|
||||
chunker = ThreeBodyChunker(input_file, output_dir)
|
||||
episodes, chunks = chunker.process()
|
||||
|
||||
print("\n=== 处理结果统计 ===")
|
||||
print(f"总集数: {len(episodes)}")
|
||||
print(f"总分片: {len(chunks)}")
|
||||
|
||||
# 显示前几个分片的信息
|
||||
print("\n=== 前5个分片预览 ===")
|
||||
for i, chunk in enumerate(chunks[:5]):
|
||||
print(f"{i+1}. {chunk['id']} - {chunk['episode_title']}")
|
||||
print(f" 长度: {chunk['length']} 字符")
|
||||
print(f" 内容预览: {chunk['content'][:100]}...")
|
||||
print()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
0
tools/data-processing/text-processing/translator.py
Normal file
0
tools/data-processing/text-processing/translator.py
Normal file
Reference in New Issue
Block a user