huhan3000/documentation/analysis/phallic-worship-analysis/database/connections.py

350 lines
12 KiB
Python

"""
数据库连接管理模块
支持Neo4j知识图谱和PostgreSQL关系数据库
"""
import logging
from typing import Optional, Dict, Any
import psycopg2
from psycopg2.extras import RealDictCursor
from neo4j import GraphDatabase
import json
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Neo4jConnection:
"""Neo4j知识图谱数据库连接"""
def __init__(self, uri: str, user: str, password: str, database: str = "neo4j"):
self.uri = uri
self.user = user
self.password = password
self.database = database
self.driver = None
def connect(self):
"""建立连接"""
try:
self.driver = GraphDatabase.driver(
self.uri,
auth=(self.user, self.password)
)
logger.info(f"Successfully connected to Neo4j at {self.uri}")
except Exception as e:
logger.error(f"Failed to connect to Neo4j: {e}")
raise
def close(self):
"""关闭连接"""
if self.driver:
self.driver.close()
logger.info("Neo4j connection closed")
def execute_query(self, query: str, parameters: Dict[str, Any] = None) -> list:
"""执行Cypher查询"""
if not self.driver:
self.connect()
with self.driver.session(database=self.database) as session:
result = session.run(query, parameters or {})
return [record.data() for record in result]
def create_cultural_transmission_node(self, transmission_data: Dict[str, Any]) -> str:
"""创建文化传播节点"""
query = """
CREATE (ct:CulturalTransmission {
source_region: $source_region,
target_region: $target_region,
transmission_period: $transmission_period,
mechanism: $mechanism,
cultural_type: $cultural_type,
reliability: $reliability,
evidence_count: $evidence_count
})
RETURN id(ct) as node_id
"""
result = self.execute_query(query, transmission_data)
return result[0]['node_id'] if result else None
def create_relationship(self, from_node_id: str, to_node_id: str,
relationship_type: str, properties: Dict[str, Any] = None):
"""创建节点关系"""
query = f"""
MATCH (a), (b)
WHERE id(a) = $from_id AND id(b) = $to_id
CREATE (a)-[r:{relationship_type}]->(b)
"""
if properties:
props_str = ", ".join([f"r.{k} = ${k}" for k in properties.keys()])
query += f" SET {props_str}"
query += " RETURN r"
params = {"from_id": from_node_id, "to_id": to_node_id}
if properties:
params.update(properties)
return self.execute_query(query, params)
class PostgreSQLConnection:
"""PostgreSQL关系数据库连接"""
def __init__(self, host: str, port: int, database: str, user: str, password: str):
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self.connection = None
def connect(self):
"""建立连接"""
try:
self.connection = psycopg2.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password,
cursor_factory=RealDictCursor
)
logger.info(f"Successfully connected to PostgreSQL at {self.host}:{self.port}")
except Exception as e:
logger.error(f"Failed to connect to PostgreSQL: {e}")
raise
def close(self):
"""关闭连接"""
if self.connection:
self.connection.close()
logger.info("PostgreSQL connection closed")
def execute_query(self, query: str, parameters: tuple = None) -> list:
"""执行SQL查询"""
if not self.connection:
self.connect()
with self.connection.cursor() as cursor:
cursor.execute(query, parameters or ())
if query.strip().upper().startswith('SELECT'):
return cursor.fetchall()
else:
self.connection.commit()
return []
def create_tables(self):
"""创建数据表"""
tables = {
"emperors": """
CREATE TABLE IF NOT EXISTS emperors (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
reign_period VARCHAR(50),
birth_year INTEGER,
death_year INTEGER,
lifespan INTEGER,
reign_duration INTEGER,
death_cause TEXT,
offspring_count INTEGER,
fertility_anxiety_score FLOAT,
religious_activities JSONB,
sources JSONB,
reliability VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"religious_buildings": """
CREATE TABLE IF NOT EXISTS religious_buildings (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
location JSONB,
construction_period VARCHAR(50),
architect VARCHAR(100),
purpose JSONB,
architectural_features JSONB,
religious_function JSONB,
political_significance TEXT,
modern_status VARCHAR(100),
fertility_elements JSONB,
dragon_symbolism JSONB,
sources JSONB,
reliability VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"folk_customs": """
CREATE TABLE IF NOT EXISTS folk_customs (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
region VARCHAR(100),
historical_period VARCHAR(50),
practice_description TEXT,
cultural_meaning JSONB,
religious_aspects JSONB,
social_function JSONB,
modern_practice BOOLEAN,
variations JSONB,
fertility_connection BOOLEAN,
dragon_elements JSONB,
phallic_symbolism JSONB,
sources JSONB,
reliability VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"cultural_transmissions": """
CREATE TABLE IF NOT EXISTS cultural_transmissions (
id SERIAL PRIMARY KEY,
source_region VARCHAR(100),
target_region VARCHAR(100),
transmission_period VARCHAR(50),
transmission_mechanism TEXT,
cultural_carriers JSONB,
adaptations JSONB,
evidence JSONB,
reliability VARCHAR(20),
cultural_type VARCHAR(50),
transmission_route JSONB,
time_span INTEGER,
success_indicators JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"dragon_worship_documents": """
CREATE TABLE IF NOT EXISTS dragon_worship_documents (
id SERIAL PRIMARY KEY,
title VARCHAR(300) NOT NULL,
author VARCHAR(100),
period VARCHAR(50),
content TEXT,
dragon_characteristics JSONB,
sexual_symbolism JSONB,
cultural_context TEXT,
cross_references JSONB,
reliability VARCHAR(20),
phallic_connections JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"linguistic_evidence": """
CREATE TABLE IF NOT EXISTS linguistic_evidence (
id SERIAL PRIMARY KEY,
word VARCHAR(50) NOT NULL,
pronunciation VARCHAR(100),
meaning TEXT,
etymology TEXT,
region VARCHAR(100),
period VARCHAR(50),
related_words JSONB,
symbolism JSONB,
evidence JSONB,
phonetic_evolution JSONB,
dragon_connection BOOLEAN,
phallic_connection BOOLEAN,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"nihon_shoki_analysis": """
CREATE TABLE IF NOT EXISTS nihon_shoki_analysis (
id SERIAL PRIMARY KEY,
section VARCHAR(100),
content TEXT,
northern_wei_elements JSONB,
packaging_strategies JSONB,
myth_construction JSONB,
political_purpose TEXT,
cultural_inferiority_indicators JSONB,
imagination_community_elements JSONB,
sources JSONB,
analysis_confidence FLOAT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
}
for table_name, create_sql in tables.items():
try:
self.execute_query(create_sql)
logger.info(f"Table '{table_name}' created successfully")
except Exception as e:
logger.error(f"Failed to create table '{table_name}': {e}")
class DatabaseManager:
"""数据库管理器"""
def __init__(self, config: Dict[str, Dict[str, Any]]):
self.config = config
self.neo4j_conn = None
self.postgres_conn = None
def initialize_neo4j(self):
"""初始化Neo4j连接"""
neo4j_config = self.config.get('neo4j', {})
self.neo4j_conn = Neo4jConnection(
uri=neo4j_config.get('uri'),
user=neo4j_config.get('user'),
password=neo4j_config.get('password'),
database=neo4j_config.get('database', 'neo4j')
)
self.neo4j_conn.connect()
def initialize_postgresql(self):
"""初始化PostgreSQL连接"""
pg_config = self.config.get('postgresql', {})
self.postgres_conn = PostgreSQLConnection(
host=pg_config.get('host'),
port=pg_config.get('port'),
database=pg_config.get('database'),
user=pg_config.get('user'),
password=pg_config.get('password')
)
self.postgres_conn.connect()
self.postgres_conn.create_tables()
def initialize_all(self):
"""初始化所有数据库连接"""
self.initialize_neo4j()
self.initialize_postgresql()
logger.info("All database connections initialized successfully")
def close_all(self):
"""关闭所有连接"""
if self.neo4j_conn:
self.neo4j_conn.close()
if self.postgres_conn:
self.postgres_conn.close()
logger.info("All database connections closed")
def get_neo4j(self) -> Neo4jConnection:
"""获取Neo4j连接"""
if not self.neo4j_conn:
self.initialize_neo4j()
return self.neo4j_conn
def get_postgresql(self) -> PostgreSQLConnection:
"""获取PostgreSQL连接"""
if not self.postgres_conn:
self.initialize_postgresql()
return self.postgres_conn
# 默认配置
DEFAULT_CONFIG = {
"neo4j": {
"uri": "bolt://localhost:7687",
"user": "neo4j",
"password": "password",
"database": "phallic_worship_analysis"
},
"postgresql": {
"host": "localhost",
"port": 5432,
"database": "phallic_worship_db",
"user": "postgres",
"password": "password"
}
}
# 全局数据库管理器实例
db_manager = DatabaseManager(DEFAULT_CONFIG)