#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 符号查询系统 胡汉三千年项目查询工具 功能:提供符号数据库的快速查询、过滤、搜索和分析功能 """ import sqlite3 import pandas as pd from typing import Dict, List, Tuple, Any, Optional from datetime import datetime import re class SymbolQuerySystem: """符号查询系统""" def __init__(self, db_path: str = "symbols.db"): """初始化查询系统""" self.db_path = db_path self.conn = sqlite3.connect(db_path) def search_by_keyword(self, keyword: str, search_fields: List[str] = None) -> List[Dict]: """ 根据关键词搜索符号 Args: keyword: 搜索关键词 search_fields: 搜索字段列表(可选,默认搜索所有字段) Returns: 匹配的符号列表 """ if search_fields is None: search_fields = ['symbol_id', 'symbol_form', 'symbol_name', 'yin_yang_attribute', 'engraving_type', 'origin_civilization', 'origin_period', 'geographical_context', 'functional_context', 'phonetic_context', 'semantic_context'] cursor = self.conn.cursor() # 构建查询条件 conditions = [] params = [] for field in search_fields: conditions.append(f"{field} LIKE ?") params.append(f"%{keyword}%") where_clause = " OR ".join(conditions) query = f""" SELECT * FROM symbols WHERE {where_clause} ORDER BY symbol_id """ cursor.execute(query, params) results = cursor.fetchall() # 转换为字典列表 columns = [desc[0] for desc in cursor.description] symbols = [] for row in results: symbol_dict = dict(zip(columns, row)) symbols.append(symbol_dict) return symbols def filter_by_attributes(self, filters: Dict[str, Any]) -> List[Dict]: """ 根据属性过滤符号 Args: filters: 过滤条件字典 - yin_yang_attribute: 阴阳属性 - engraving_type: 刻法类型 - origin_civilization: 起源文明 - origin_period: 起源时期 - geographical_context: 地理背景 - functional_context: 功能背景 Returns: 匹配的符号列表 """ cursor = self.conn.cursor() # 构建查询条件 conditions = [] params = [] for field, value in filters.items(): if value is not None: conditions.append(f"{field} = ?") params.append(value) if not conditions: where_clause = "1=1" else: where_clause = " AND ".join(conditions) query = f""" SELECT * FROM symbols WHERE {where_clause} ORDER BY symbol_id """ cursor.execute(query, params) results = cursor.fetchall() # 转换为字典列表 columns = [desc[0] for desc in cursor.description] symbols = [] for row in results: symbol_dict = dict(zip(columns, row)) symbols.append(symbol_dict) return symbols def find_related_symbols(self, symbol_id: str, max_depth: int = 3) -> Dict[str, Any]: """ 查找相关符号(传播路径) Args: symbol_id: 符号ID max_depth: 最大搜索深度 Returns: 相关符号信息 """ cursor = self.conn.cursor() # 查找直接关联的符号 cursor.execute(""" SELECT l.link_id, l.link_type, l.confidence_level, s1.symbol_id as source_id, s1.symbol_name as source_name, s2.symbol_id as target_id, s2.symbol_name as target_name FROM cross_civilization_links l JOIN symbols s1 ON l.source_symbol_id = s1.symbol_id JOIN symbols s2 ON l.target_symbol_id = s2.symbol_id WHERE s1.symbol_id = ? OR s2.symbol_id = ? """, (symbol_id, symbol_id)) direct_links = cursor.fetchall() # 查找传播路径 cursor.execute(""" WITH RECURSIVE symbol_path AS ( SELECT source_symbol_id, target_symbol_id, 1 as depth, source_symbol_id || '->' || target_symbol_id as path FROM cross_civilization_links WHERE source_symbol_id = ? UNION ALL SELECT sp.source_symbol_id, l.target_symbol_id, sp.depth + 1, sp.path || '->' || l.target_symbol_id FROM cross_civilization_links l JOIN symbol_path sp ON l.source_symbol_id = sp.target_symbol_id WHERE sp.depth < ? ) SELECT * FROM symbol_path ORDER BY depth, path """, (symbol_id, max_depth)) transmission_paths = cursor.fetchall() # 获取符号基本信息 cursor.execute("SELECT * FROM symbols WHERE symbol_id = ?", (symbol_id,)) symbol_info = cursor.fetchone() if symbol_info: columns = [desc[0] for desc in cursor.description] symbol_dict = dict(zip(columns, symbol_info)) else: symbol_dict = {} return { 'symbol_info': symbol_dict, 'direct_links': direct_links, 'transmission_paths': transmission_paths } def compare_symbols(self, symbol_ids: List[str]) -> Dict[str, Any]: """ 比较多个符号 Args: symbol_ids: 符号ID列表 Returns: 比较结果 """ cursor = self.conn.cursor() comparison = {} # 获取符号基本信息 symbols_data = [] for symbol_id in symbol_ids: cursor.execute("SELECT * FROM symbols WHERE symbol_id = ?", (symbol_id,)) result = cursor.fetchone() if result: columns = [desc[0] for desc in cursor.description] symbol_dict = dict(zip(columns, result)) symbols_data.append(symbol_dict) comparison['symbols'] = symbols_data # 比较阴阳属性 yin_yang_values = [s.get('yin_yang_attribute', '') for s in symbols_data] comparison['yin_yang_comparison'] = { 'values': yin_yang_values, 'is_same': len(set(yin_yang_values)) == 1 } # 比较刻法类型 engraving_values = [s.get('engraving_type', '') for s in symbols_data] comparison['engraving_comparison'] = { 'values': engraving_values, 'is_same': len(set(engraving_values)) == 1 } # 比较起源文明 civilization_values = [s.get('origin_civilization', '') for s in symbols_data] comparison['civilization_comparison'] = { 'values': civilization_values, 'is_same': len(set(civilization_values)) == 1 } # 查找共同关联 if len(symbol_ids) >= 2: placeholders = ','.join(['?'] * len(symbol_ids)) cursor.execute(f""" SELECT DISTINCT l.link_type, COUNT(*) as link_count FROM cross_civilization_links l WHERE l.source_symbol_id IN ({placeholders}) OR l.target_symbol_id IN ({placeholders}) GROUP BY l.link_type ORDER BY link_count DESC """, symbol_ids * 2) common_links = cursor.fetchall() comparison['common_links'] = common_links return comparison def analyze_symbol_family(self, family_pattern: str) -> Dict[str, Any]: """ 分析符号家族 Args: family_pattern: 家族模式(如 "P_*" 或 "T_*") Returns: 家族分析结果 """ cursor = self.conn.cursor() # 查找匹配的符号 cursor.execute(""" SELECT * FROM symbols WHERE symbol_id LIKE ? ORDER BY symbol_id """, (family_pattern.replace('*', '%'),)) family_symbols = cursor.fetchall() if not family_symbols: return {'error': '未找到匹配的符号家族'} # 转换为字典列表 columns = [desc[0] for desc in cursor.description] symbols_list = [] for row in family_symbols: symbol_dict = dict(zip(columns, row)) symbols_list.append(symbol_dict) # 家族统计分析 family_stats = { 'total_count': len(symbols_list), 'yin_yang_distribution': {}, 'engraving_distribution': {}, 'civilization_distribution': {}, 'period_distribution': {} } for symbol in symbols_list: # 阴阳属性分布 yin_yang = symbol.get('yin_yang_attribute', 'unknown') family_stats['yin_yang_distribution'][yin_yang] = \ family_stats['yin_yang_distribution'].get(yin_yang, 0) + 1 # 刻法类型分布 engraving = symbol.get('engraving_type', 'unknown') family_stats['engraving_distribution'][engraving] = \ family_stats['engraving_distribution'].get(engraving, 0) + 1 # 文明分布 civilization = symbol.get('origin_civilization', 'unknown') family_stats['civilization_distribution'][civilization] = \ family_stats['civilization_distribution'].get(civilization, 0) + 1 # 时期分布 period = symbol.get('origin_period', 'unknown') family_stats['period_distribution'][period] = \ family_stats['period_distribution'].get(period, 0) + 1 # 查找家族内部关联 symbol_ids = [s['symbol_id'] for s in symbols_list] placeholders = ','.join(['?'] * len(symbol_ids)) cursor.execute(f""" SELECT l.link_id, l.link_type, l.confidence_level, s1.symbol_id as source_id, s1.symbol_name as source_name, s2.symbol_id as target_id, s2.symbol_name as target_name FROM cross_civilization_links l JOIN symbols s1 ON l.source_symbol_id = s1.symbol_id JOIN symbols s2 ON l.target_symbol_id = s2.symbol_id WHERE s1.symbol_id IN ({placeholders}) AND s2.symbol_id IN ({placeholders}) ORDER BY l.confidence_level DESC """, symbol_ids * 2) internal_links = cursor.fetchall() return { 'family_symbols': symbols_list, 'family_stats': family_stats, 'internal_links': internal_links } def advanced_search(self, query_params: Dict[str, Any]) -> List[Dict]: """ 高级搜索 Args: query_params: 查询参数 - keywords: 关键词列表 - yin_yang: 阴阳属性列表 - engraving: 刻法类型列表 - civilization: 文明列表 - period: 时期列表 - min_confidence: 最小置信度 - link_type: 关联类型 Returns: 匹配的符号列表 """ cursor = self.conn.cursor() # 构建查询条件 conditions = [] params = [] # 关键词搜索 if 'keywords' in query_params and query_params['keywords']: keyword_conditions = [] for keyword in query_params['keywords']: keyword_conditions.extend([ "symbol_id LIKE ?", "symbol_form LIKE ?", "symbol_name LIKE ?", "geographical_context LIKE ?", "functional_context LIKE ?", "phonetic_context LIKE ?", "semantic_context LIKE ?" ]) params.extend([f"%{keyword}%"] * 7) conditions.append(f"({' OR '.join(keyword_conditions)})") # 阴阳属性过滤 if 'yin_yang' in query_params and query_params['yin_yang']: placeholders = ','.join(['?'] * len(query_params['yin_yang'])) conditions.append(f"yin_yang_attribute IN ({placeholders})") params.extend(query_params['yin_yang']) # 刻法类型过滤 if 'engraving' in query_params and query_params['engraving']: placeholders = ','.join(['?'] * len(query_params['engraving'])) conditions.append(f"engraving_type IN ({placeholders})") params.extend(query_params['engraving']) # 文明过滤 if 'civilization' in query_params and query_params['civilization']: placeholders = ','.join(['?'] * len(query_params['civilization'])) conditions.append(f"origin_civilization IN ({placeholders})") params.extend(query_params['civilization']) # 时期过滤 if 'period' in query_params and query_params['period']: placeholders = ','.join(['?'] * len(query_params['period'])) conditions.append(f"origin_period IN ({placeholders})") params.extend(query_params['period']) # 构建完整查询 if conditions: where_clause = " AND ".join(conditions) else: where_clause = "1=1" query = f""" SELECT DISTINCT s.* FROM symbols s LEFT JOIN cross_civilization_links l ON s.symbol_id = l.source_symbol_id OR s.symbol_id = l.target_symbol_id WHERE {where_clause} """ # 关联类型过滤 if 'link_type' in query_params and query_params['link_type']: query += " AND l.link_type = ?" params.append(query_params['link_type']) # 置信度过滤 if 'min_confidence' in query_params and query_params['min_confidence']: query += " AND l.confidence_level >= ?" params.append(query_params['min_confidence']) query += " ORDER BY s.symbol_id" cursor.execute(query, params) results = cursor.fetchall() # 转换为字典列表 columns = [desc[0] for desc in cursor.description] symbols = [] for row in results: symbol_dict = dict(zip(columns, row)) symbols.append(symbol_dict) return symbols def export_search_results(self, symbols: List[Dict], output_file: str) -> str: """ 导出搜索结果 Args: symbols: 符号列表 output_file: 输出文件路径 Returns: 导出文件路径 """ import csv if not symbols: return "没有数据可导出" # 获取所有字段 fieldnames = symbols[0].keys() with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for symbol in symbols: writer.writerow(symbol) return f"搜索结果已导出至:{output_file}" def create_search_report(self, search_params: Dict[str, Any], output_file: str = None) -> str: """ 创建搜索报告 Args: search_params: 搜索参数 output_file: 输出文件路径(可选) Returns: 报告内容 """ # 执行搜索 results = self.advanced_search(search_params) # 生成报告 report = [] report.append("# 符号搜索报告") report.append(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report.append(f"搜索参数:{search_params}") report.append(f"\n## 搜索结果摘要") report.append(f"- 找到 {len(results)} 个匹配的符号") if results: # 统计信息 yin_yang_counts = {} engraving_counts = {} civilization_counts = {} for symbol in results: yin_yang = symbol.get('yin_yang_attribute', 'unknown') engraving = symbol.get('engraving_type', 'unknown') civilization = symbol.get('origin_civilization', 'unknown') yin_yang_counts[yin_yang] = yin_yang_counts.get(yin_yang, 0) + 1 engraving_counts[engraving] = engraving_counts.get(engraving, 0) + 1 civilization_counts[civilization] = civilization_counts.get(civilization, 0) + 1 report.append("\n### 阴阳属性分布") for yin_yang, count in yin_yang_counts.items(): percentage = (count / len(results)) * 100 report.append(f"- {yin_yang}:{count} ({percentage:.1f}%)") report.append("\n### 刻法类型分布") for engraving, count in engraving_counts.items(): percentage = (count / len(results)) * 100 report.append(f"- {engraving}:{count} ({percentage:.1f}%)") report.append("\n### 文明分布") for civilization, count in civilization_counts.items(): percentage = (count / len(results)) * 100 report.append(f"- {civilization}:{count} ({percentage:.1f}%)") # 前10个结果 report.append("\n### 前10个匹配符号") for i, symbol in enumerate(results[:10]): report.append(f"\n**{i+1}. {symbol.get('symbol_id', 'N/A')} - {symbol.get('symbol_name', 'N/A')}**") report.append(f" - 阴阳属性:{symbol.get('yin_yang_attribute', 'N/A')}") report.append(f" - 刻法类型:{symbol.get('engraving_type', 'N/A')}") report.append(f" - 起源文明:{symbol.get('origin_civilization', 'N/A')}") report.append(f" - 起源时期:{symbol.get('origin_period', 'N/A')}") report_content = '\n'.join(report) if output_file: with open(output_file, 'w', encoding='utf-8') as f: f.write(report_content) print(f"搜索报告已保存至:{output_file}") return report_content # 使用示例 def main(): """主函数示例""" # 创建查询系统 query_system = SymbolQuerySystem() # 示例1:关键词搜索 print("=== 关键词搜索示例 ===") results = query_system.search_by_keyword("P") print(f"找到 {len(results)} 个包含 'P' 的符号") # 示例2:属性过滤 print("\n=== 属性过滤示例 ===") filters = { 'yin_yang_attribute': 'yin', 'engraving_type': 'yin_engraving' } results = query_system.filter_by_attributes(filters) print(f"找到 {len(results)} 个阴属性阴刻符号") # 示例3:查找相关符号 print("\n=== 查找相关符号示例 ===") related = query_system.find_related_symbols("P_yin_001") print(f"符号信息:{related['symbol_info'].get('symbol_name', 'N/A')}") print(f"直接关联:{len(related['direct_links'])} 个") # 示例4:符号比较 print("\n=== 符号比较示例 ===") comparison = query_system.compare_symbols(["P_yin_001", "T_yang_001"]) print(f"阴阳属性相同:{comparison['yin_yang_comparison']['is_same']}") # 示例5:符号家族分析 print("\n=== 符号家族分析示例 ===") family = query_system.analyze_symbol_family("P_*") if 'error' not in family: print(f"P家族符号数量:{family['family_stats']['total_count']}") # 示例6:高级搜索 print("\n=== 高级搜索示例 ===") search_params = { 'keywords': ['P', 'yin'], 'yin_yang': ['yin'], 'civilization': ['Chinese', 'Greek'] } results = query_system.advanced_search(search_params) print(f"高级搜索找到 {len(results)} 个符号") # 示例7:创建搜索报告 print("\n=== 创建搜索报告示例 ===") report = query_system.create_search_report(search_params, "search_report.md") print("搜索报告已生成") if __name__ == "__main__": main()