""" Agent Identity Management System 管理多个 AI Agent 的身份信息,包括 SSH/GPG 密钥、Git 配置等 """ import os import json import subprocess from pathlib import Path from typing import Dict, List, Optional from dataclasses import dataclass, asdict import logging @dataclass class AgentIdentity: """Agent 身份信息""" name: str email: str ssh_key_path: str gpg_key_id: Optional[str] = None git_username: str = "" description: str = "" repositories: List[str] = None def __post_init__(self): if self.repositories is None: self.repositories = [] if not self.git_username: self.git_username = self.name.lower().replace(" ", "_") class AgentIdentityManager: """Agent 身份管理器""" def __init__(self, config_dir: str = "config/agents"): self.config_dir = Path(config_dir) self.config_dir.mkdir(parents=True, exist_ok=True) self.identities_file = self.config_dir / "identities.json" self.ssh_keys_dir = self.config_dir / "ssh_keys" self.gpg_keys_dir = self.config_dir / "gpg_keys" # 创建必要的目录 self.ssh_keys_dir.mkdir(exist_ok=True) self.gpg_keys_dir.mkdir(exist_ok=True) self.logger = logging.getLogger(__name__) self._load_identities() def _load_identities(self): """加载已有的身份信息""" if self.identities_file.exists(): with open(self.identities_file, 'r', encoding='utf-8') as f: data = json.load(f) self.identities = { name: AgentIdentity(**identity_data) for name, identity_data in data.items() } else: self.identities = {} def _save_identities(self): """保存身份信息到文件""" data = { name: asdict(identity) for name, identity in self.identities.items() } with open(self.identities_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) def create_agent_identity(self, name: str, email: str, description: str = "", generate_keys: bool = True) -> AgentIdentity: """创建新的 Agent 身份""" if name in self.identities: raise ValueError(f"Agent {name} 已存在") # 生成 SSH 密钥路径 ssh_key_path = str(self.ssh_keys_dir / f"{name.lower().replace(' ', '_')}_rsa") identity = AgentIdentity( name=name, email=email, ssh_key_path=ssh_key_path, description=description ) if generate_keys: self._generate_ssh_key(identity) self._generate_gpg_key(identity) self.identities[name] = identity self._save_identities() self.logger.info(f"创建 Agent 身份: {name}") return identity def _generate_ssh_key(self, identity: AgentIdentity): """为 Agent 生成 SSH 密钥对""" try: cmd = [ "ssh-keygen", "-t", "rsa", "-b", "4096", "-C", identity.email, "-f", identity.ssh_key_path, "-N", "" # 无密码 ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"SSH 密钥生成失败: {result.stderr}") # 设置正确的权限 os.chmod(identity.ssh_key_path, 0o600) os.chmod(f"{identity.ssh_key_path}.pub", 0o644) self.logger.info(f"为 {identity.name} 生成 SSH 密钥: {identity.ssh_key_path}") except Exception as e: self.logger.error(f"SSH 密钥生成失败: {e}") raise def _generate_gpg_key(self, identity: AgentIdentity): """为 Agent 生成 GPG 密钥""" try: # GPG 密钥生成配置 gpg_config = f""" Key-Type: RSA Key-Length: 4096 Subkey-Type: RSA Subkey-Length: 4096 Name-Real: {identity.name} Name-Email: {identity.email} Expire-Date: 0 %no-protection %commit """ # 写入临时配置文件 config_file = self.gpg_keys_dir / f"{identity.git_username}_gpg_config" with open(config_file, 'w') as f: f.write(gpg_config) # 生成 GPG 密钥 cmd = ["gpg", "--batch", "--generate-key", str(config_file)] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: self.logger.warning(f"GPG 密钥生成失败: {result.stderr}") return # 获取生成的密钥 ID cmd = ["gpg", "--list-secret-keys", "--keyid-format", "LONG", identity.email] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: # 解析密钥 ID lines = result.stdout.split('\n') for line in lines: if 'sec' in line and 'rsa4096/' in line: key_id = line.split('rsa4096/')[1].split(' ')[0] identity.gpg_key_id = key_id break # 清理临时文件 config_file.unlink() self.logger.info(f"为 {identity.name} 生成 GPG 密钥: {identity.gpg_key_id}") except Exception as e: self.logger.warning(f"GPG 密钥生成失败: {e}") def get_agent_identity(self, name: str) -> Optional[AgentIdentity]: """获取 Agent 身份信息""" return self.identities.get(name) def list_agents(self) -> List[str]: """列出所有 Agent""" return list(self.identities.keys()) def setup_git_config(self, agent_name: str, repo_path: str = "."): """为指定仓库设置 Agent 的 Git 配置""" identity = self.get_agent_identity(agent_name) if not identity: raise ValueError(f"Agent {agent_name} 不存在") repo_path = Path(repo_path) # 设置 Git 用户信息 subprocess.run([ "git", "config", "--local", "user.name", identity.name ], cwd=repo_path) subprocess.run([ "git", "config", "--local", "user.email", identity.email ], cwd=repo_path) # 设置 GPG 签名 if identity.gpg_key_id: subprocess.run([ "git", "config", "--local", "user.signingkey", identity.gpg_key_id ], cwd=repo_path) subprocess.run([ "git", "config", "--local", "commit.gpgsign", "true" ], cwd=repo_path) self.logger.info(f"为仓库 {repo_path} 设置 {agent_name} 的 Git 配置") def get_ssh_public_key(self, agent_name: str) -> str: """获取 Agent 的 SSH 公钥""" identity = self.get_agent_identity(agent_name) if not identity: raise ValueError(f"Agent {agent_name} 不存在") pub_key_path = f"{identity.ssh_key_path}.pub" if not os.path.exists(pub_key_path): raise FileNotFoundError(f"SSH 公钥文件不存在: {pub_key_path}") with open(pub_key_path, 'r') as f: return f.read().strip() def export_gpg_public_key(self, agent_name: str) -> str: """导出 Agent 的 GPG 公钥""" identity = self.get_agent_identity(agent_name) if not identity or not identity.gpg_key_id: raise ValueError(f"Agent {agent_name} 没有 GPG 密钥") cmd = ["gpg", "--armor", "--export", identity.gpg_key_id] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"GPG 公钥导出失败: {result.stderr}") return result.stdout