liurenchaxin/modules/agent-identity/agents/identity_manager.py

237 lines
8.2 KiB
Python

"""
Agent Identity Management System
管理多个 AI Agent 的身份信息,包括 SSH/GPG 密钥、Git 配置等
"""
import os
import json
import subprocess
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
import logging
@dataclass
class AgentIdentity:
"""Agent 身份信息"""
name: str
email: str
ssh_key_path: str
gpg_key_id: Optional[str] = None
git_username: str = ""
description: str = ""
repositories: List[str] = None
def __post_init__(self):
if self.repositories is None:
self.repositories = []
if not self.git_username:
self.git_username = self.name.lower().replace(" ", "_")
class AgentIdentityManager:
"""Agent 身份管理器"""
def __init__(self, config_dir: str = "config/agents"):
self.config_dir = Path(config_dir)
self.config_dir.mkdir(parents=True, exist_ok=True)
self.identities_file = self.config_dir / "identities.json"
self.ssh_keys_dir = self.config_dir / "ssh_keys"
self.gpg_keys_dir = self.config_dir / "gpg_keys"
# 创建必要的目录
self.ssh_keys_dir.mkdir(exist_ok=True)
self.gpg_keys_dir.mkdir(exist_ok=True)
self.logger = logging.getLogger(__name__)
self._load_identities()
def _load_identities(self):
"""加载已有的身份信息"""
if self.identities_file.exists():
with open(self.identities_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.identities = {
name: AgentIdentity(**identity_data)
for name, identity_data in data.items()
}
else:
self.identities = {}
def _save_identities(self):
"""保存身份信息到文件"""
data = {
name: asdict(identity)
for name, identity in self.identities.items()
}
with open(self.identities_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def create_agent_identity(self,
name: str,
email: str,
description: str = "",
generate_keys: bool = True) -> AgentIdentity:
"""创建新的 Agent 身份"""
if name in self.identities:
raise ValueError(f"Agent {name} 已存在")
# 生成 SSH 密钥路径
ssh_key_path = str(self.ssh_keys_dir / f"{name.lower().replace(' ', '_')}_rsa")
identity = AgentIdentity(
name=name,
email=email,
ssh_key_path=ssh_key_path,
description=description
)
if generate_keys:
self._generate_ssh_key(identity)
self._generate_gpg_key(identity)
self.identities[name] = identity
self._save_identities()
self.logger.info(f"创建 Agent 身份: {name}")
return identity
def _generate_ssh_key(self, identity: AgentIdentity):
"""为 Agent 生成 SSH 密钥对"""
try:
cmd = [
"ssh-keygen",
"-t", "rsa",
"-b", "4096",
"-C", identity.email,
"-f", identity.ssh_key_path,
"-N", "" # 无密码
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"SSH 密钥生成失败: {result.stderr}")
# 设置正确的权限
os.chmod(identity.ssh_key_path, 0o600)
os.chmod(f"{identity.ssh_key_path}.pub", 0o644)
self.logger.info(f"{identity.name} 生成 SSH 密钥: {identity.ssh_key_path}")
except Exception as e:
self.logger.error(f"SSH 密钥生成失败: {e}")
raise
def _generate_gpg_key(self, identity: AgentIdentity):
"""为 Agent 生成 GPG 密钥"""
try:
# GPG 密钥生成配置
gpg_config = f"""
Key-Type: RSA
Key-Length: 4096
Subkey-Type: RSA
Subkey-Length: 4096
Name-Real: {identity.name}
Name-Email: {identity.email}
Expire-Date: 0
%no-protection
%commit
"""
# 写入临时配置文件
config_file = self.gpg_keys_dir / f"{identity.git_username}_gpg_config"
with open(config_file, 'w') as f:
f.write(gpg_config)
# 生成 GPG 密钥
cmd = ["gpg", "--batch", "--generate-key", str(config_file)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
self.logger.warning(f"GPG 密钥生成失败: {result.stderr}")
return
# 获取生成的密钥 ID
cmd = ["gpg", "--list-secret-keys", "--keyid-format", "LONG", identity.email]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# 解析密钥 ID
lines = result.stdout.split('\n')
for line in lines:
if 'sec' in line and 'rsa4096/' in line:
key_id = line.split('rsa4096/')[1].split(' ')[0]
identity.gpg_key_id = key_id
break
# 清理临时文件
config_file.unlink()
self.logger.info(f"{identity.name} 生成 GPG 密钥: {identity.gpg_key_id}")
except Exception as e:
self.logger.warning(f"GPG 密钥生成失败: {e}")
def get_agent_identity(self, name: str) -> Optional[AgentIdentity]:
"""获取 Agent 身份信息"""
return self.identities.get(name)
def list_agents(self) -> List[str]:
"""列出所有 Agent"""
return list(self.identities.keys())
def setup_git_config(self, agent_name: str, repo_path: str = "."):
"""为指定仓库设置 Agent 的 Git 配置"""
identity = self.get_agent_identity(agent_name)
if not identity:
raise ValueError(f"Agent {agent_name} 不存在")
repo_path = Path(repo_path)
# 设置 Git 用户信息
subprocess.run([
"git", "config", "--local", "user.name", identity.name
], cwd=repo_path)
subprocess.run([
"git", "config", "--local", "user.email", identity.email
], cwd=repo_path)
# 设置 GPG 签名
if identity.gpg_key_id:
subprocess.run([
"git", "config", "--local", "user.signingkey", identity.gpg_key_id
], cwd=repo_path)
subprocess.run([
"git", "config", "--local", "commit.gpgsign", "true"
], cwd=repo_path)
self.logger.info(f"为仓库 {repo_path} 设置 {agent_name} 的 Git 配置")
def get_ssh_public_key(self, agent_name: str) -> str:
"""获取 Agent 的 SSH 公钥"""
identity = self.get_agent_identity(agent_name)
if not identity:
raise ValueError(f"Agent {agent_name} 不存在")
pub_key_path = f"{identity.ssh_key_path}.pub"
if not os.path.exists(pub_key_path):
raise FileNotFoundError(f"SSH 公钥文件不存在: {pub_key_path}")
with open(pub_key_path, 'r') as f:
return f.read().strip()
def export_gpg_public_key(self, agent_name: str) -> str:
"""导出 Agent 的 GPG 公钥"""
identity = self.get_agent_identity(agent_name)
if not identity or not identity.gpg_key_id:
raise ValueError(f"Agent {agent_name} 没有 GPG 密钥")
cmd = ["gpg", "--armor", "--export", identity.gpg_key_id]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"GPG 公钥导出失败: {result.stderr}")
return result.stdout