#!/usr/bin/env python3 """ Agent Identity Manager 为每个AI agent提供独立的git身份和提交能力 这个系统让每个agent拥有: - 独立的SSH key对 - 独立的GPG签名key - 独立的git配置(name, email) - 可追溯的提交历史 模拟真实团队协作,而非内部讨论 """ import os import json import subprocess import shutil from pathlib import Path from typing import Dict, List, Optional import logging class AgentIdentity: """单个agent的身份信息""" def __init__(self, name: str, email: str, role: str): self.name = name self.email = email self.role = role self.ssh_key_path = None self.gpg_key_id = None def to_dict(self) -> Dict: return { "name": self.name, "email": self.email, "role": self.role, "ssh_key_path": str(self.ssh_key_path) if self.ssh_key_path else None, "gpg_key_id": self.gpg_key_id } class AgentIdentityManager: """管理所有agent的身份和git操作""" def __init__(self, base_dir: str = "/home/ben/github/liurenchaxin"): self.base_dir = Path(base_dir) self.agents_dir = self.base_dir / "agents" self.keys_dir = self.agents_dir / "keys" self.config_file = self.agents_dir / "identities.json" # 确保目录存在 self.agents_dir.mkdir(exist_ok=True) self.keys_dir.mkdir(exist_ok=True) self.identities: Dict[str, AgentIdentity] = {} self.load_identities() def load_identities(self): """从配置文件加载agent身份""" if self.config_file.exists(): with open(self.config_file, 'r', encoding='utf-8') as f: data = json.load(f) for name, identity_data in data.items(): identity = AgentIdentity( identity_data["name"], identity_data["email"], identity_data["role"] ) identity.ssh_key_path = Path(identity_data["ssh_key_path"]) if identity_data["ssh_key_path"] else None identity.gpg_key_id = identity_data["gpg_key_id"] self.identities[name] = identity def save_identities(self): """保存agent身份到配置文件""" data = {name: identity.to_dict() for name, identity in self.identities.items()} with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) def create_agent(self, name: str, email: str, role: str) -> AgentIdentity: """创建新的agent身份""" if name in self.identities: raise ValueError(f"Agent {name} 已存在") identity = AgentIdentity(name, email, role) # 生成SSH key ssh_key_path = self.keys_dir / f"{name}_rsa" self._generate_ssh_key(name, email, ssh_key_path) identity.ssh_key_path = ssh_key_path # 生成GPG key gpg_key_id = self._generate_gpg_key(name, email) identity.gpg_key_id = gpg_key_id self.identities[name] = identity self.save_identities() logging.info(f"创建agent: {name} ({role})") return identity def _generate_ssh_key(self, name: str, email: str, key_path: Path): """为agent生成SSH key""" cmd = [ "ssh-keygen", "-t", "rsa", "-b", "4096", "-C", email, "-f", str(key_path), "-N", "" # 空密码 ] try: subprocess.run(cmd, check=True, capture_output=True) logging.info(f"SSH key已生成: {key_path}") except subprocess.CalledProcessError as e: logging.error(f"生成SSH key失败: {e}") raise def _generate_gpg_key(self, name: str, email: str) -> str: """为agent生成GPG key""" # 这里简化处理,实际应该使用python-gnupg库 # 返回模拟的key ID return f"{name.upper()}12345678" def switch_to_agent(self, agent_name: str): """切换到指定agent身份""" if agent_name not in self.identities: raise ValueError(f"Agent {agent_name} 不存在") identity = self.identities[agent_name] # 设置git配置 commands = [ ["git", "config", "user.name", identity.name], ["git", "config", "user.email", identity.email], ["git", "config", "user.signingkey", identity.gpg_key_id], ["git", "config", "commit.gpgsign", "true"] ] for cmd in commands: try: subprocess.run(cmd, check=True, cwd=self.base_dir) except subprocess.CalledProcessError as e: logging.error(f"设置git配置失败: {e}") raise # 设置SSH key (通过ssh-agent) if identity.ssh_key_path and identity.ssh_key_path.exists(): self._setup_ssh_agent(identity.ssh_key_path) logging.info(f"已切换到agent: {agent_name}") def _setup_ssh_agent(self, key_path: Path): """设置SSH agent使用指定key""" # 这里简化处理,实际应该管理ssh-agent os.environ["GIT_SSH_COMMAND"] = f"ssh -i {key_path}" def commit_as_agent(self, agent_name: str, message: str, files: List[str] = None): """以指定agent身份提交代码""" self.switch_to_agent(agent_name) # 添加文件 if files: subprocess.run(["git", "add"] + files, check=True, cwd=self.base_dir) else: subprocess.run(["git", "add", "."], check=True, cwd=self.base_dir) # 提交 subprocess.run(["git", "commit", "-S", "-m", message], check=True, cwd=self.base_dir) logging.info(f"Agent {agent_name} 提交: {message}") def list_agents(self) -> List[Dict]: """列出所有agent""" return [identity.to_dict() for identity in self.identities.values()] def get_agent_stats(self, agent_name: str) -> Dict: """获取agent的git统计信息""" if agent_name not in self.identities: raise ValueError(f"Agent {agent_name} 不存在") identity = self.identities[agent_name] # 获取提交统计 cmd = [ "git", "log", "--author", identity.email, "--pretty=format:%h|%an|%ae|%ad|%s", "--date=short" ] try: result = subprocess.run(cmd, capture_output=True, text=True, cwd=self.base_dir) commits = result.stdout.strip().split('\n') if result.stdout.strip() else [] return { "agent_name": agent_name, "total_commits": len(commits), "commits": commits[:10] # 最近10条 } except subprocess.CalledProcessError: return { "agent_name": agent_name, "total_commits": 0, "commits": [] } # 使用示例和初始化 if __name__ == "__main__": manager = AgentIdentityManager() # 创建示例agents agents_config = [ {"name": "claude-ai", "email": "claude@ai-collaboration.local", "role": "架构师"}, {"name": "gemini-dev", "email": "gemini@ai-collaboration.local", "role": "开发者"}, {"name": "qwen-ops", "email": "qwen@ai-collaboration.local", "role": "运维"}, {"name": "llama-research", "email": "llama@ai-collaboration.local", "role": "研究员"} ] for agent in agents_config: try: manager.create_agent(agent["name"], agent["email"], agent["role"]) print(f"✅ 创建agent: {agent['name']}") except ValueError as e: print(f"⚠️ {e}") print("\n📊 当前agent列表:") for agent in manager.list_agents(): print(f" - {agent['name']} ({agent['role']}) - {agent['email']}")