314 lines
11 KiB
Python
314 lines
11 KiB
Python
"""
|
||
Git 协作管理系统
|
||
管理 Agent 之间基于 Git 的真实协作
|
||
"""
|
||
|
||
import os
|
||
import subprocess
|
||
import json
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple, Any
|
||
from dataclasses import dataclass
|
||
import logging
|
||
from .identity_manager import AgentIdentityManager
|
||
|
||
@dataclass
|
||
class Repository:
|
||
"""仓库信息"""
|
||
name: str
|
||
local_path: str
|
||
remotes: Dict[str, str] # remote_name -> url
|
||
current_agent: Optional[str] = None
|
||
|
||
class GitCollaborationManager:
|
||
"""Git 协作管理器"""
|
||
|
||
def __init__(self, identity_manager: AgentIdentityManager):
|
||
self.identity_manager = identity_manager
|
||
self.logger = logging.getLogger(__name__)
|
||
self.repositories = {}
|
||
self._load_repositories()
|
||
|
||
def _load_repositories(self):
|
||
"""加载仓库配置"""
|
||
config_file = Path("config/repositories.json")
|
||
if config_file.exists():
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
self.repositories = {
|
||
name: Repository(**repo_data)
|
||
for name, repo_data in data.items()
|
||
}
|
||
|
||
def _save_repositories(self):
|
||
"""保存仓库配置"""
|
||
config_file = Path("config/repositories.json")
|
||
config_file.parent.mkdir(exist_ok=True)
|
||
|
||
data = {
|
||
name: {
|
||
'name': repo.name,
|
||
'local_path': repo.local_path,
|
||
'remotes': repo.remotes,
|
||
'current_agent': repo.current_agent
|
||
}
|
||
for name, repo in self.repositories.items()
|
||
}
|
||
|
||
with open(config_file, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||
|
||
def setup_progressive_deployment(self,
|
||
repo_name: str,
|
||
gitea_url: str,
|
||
bitbucket_url: str,
|
||
github_url: str,
|
||
local_path: Optional[str] = None):
|
||
"""设置渐进发布的三个远程仓库"""
|
||
|
||
if not local_path:
|
||
local_path_str = f"./repos/{repo_name}"
|
||
else:
|
||
local_path_str = local_path
|
||
|
||
local_path_obj = Path(local_path_str)
|
||
local_path_obj.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 初始化本地仓库(如果不存在)
|
||
if not (local_path_obj / ".git").exists():
|
||
subprocess.run(["git", "init"], cwd=local_path)
|
||
|
||
# 设置远程仓库
|
||
remotes = {
|
||
"gitea": gitea_url,
|
||
"bitbucket": bitbucket_url,
|
||
"github": github_url
|
||
}
|
||
|
||
for remote_name, remote_url in remotes.items():
|
||
# 检查远程是否已存在
|
||
result = subprocess.run([
|
||
"git", "remote", "get-url", remote_name
|
||
], cwd=local_path, capture_output=True, text=True)
|
||
|
||
if result.returncode != 0:
|
||
# 添加新的远程
|
||
subprocess.run([
|
||
"git", "remote", "add", remote_name, remote_url
|
||
], cwd=local_path)
|
||
else:
|
||
# 更新现有远程
|
||
subprocess.run([
|
||
"git", "remote", "set-url", remote_name, remote_url
|
||
], cwd=local_path)
|
||
|
||
# 创建仓库记录
|
||
repository = Repository(
|
||
name=repo_name,
|
||
local_path=str(local_path),
|
||
remotes=remotes
|
||
)
|
||
|
||
self.repositories[repo_name] = repository
|
||
self._save_repositories()
|
||
|
||
self.logger.info(f"设置渐进发布仓库: {repo_name}")
|
||
return repository
|
||
|
||
def switch_agent_context(self, repo_name: str, agent_name: str):
|
||
"""切换仓库的 Agent 上下文"""
|
||
if repo_name not in self.repositories:
|
||
raise ValueError(f"仓库 {repo_name} 不存在")
|
||
|
||
repository = self.repositories[repo_name]
|
||
|
||
# 设置 Git 配置
|
||
self.identity_manager.setup_git_config(agent_name, repository.local_path)
|
||
|
||
# 设置 SSH 密钥
|
||
identity = self.identity_manager.get_agent_identity(agent_name)
|
||
if identity:
|
||
self._setup_ssh_agent(identity.ssh_key_path)
|
||
|
||
repository.current_agent = agent_name
|
||
self._save_repositories()
|
||
|
||
self.logger.info(f"切换仓库 {repo_name} 到 Agent: {agent_name}")
|
||
|
||
def _setup_ssh_agent(self, ssh_key_path: str):
|
||
"""设置 SSH Agent"""
|
||
try:
|
||
# 启动 ssh-agent(如果未运行)
|
||
result = subprocess.run([
|
||
"ssh-add", "-l"
|
||
], capture_output=True, text=True)
|
||
|
||
if result.returncode != 0:
|
||
# 启动 ssh-agent
|
||
result = subprocess.run([
|
||
"ssh-agent", "-s"
|
||
], capture_output=True, text=True)
|
||
|
||
if result.returncode == 0:
|
||
# 解析环境变量
|
||
for line in result.stdout.split('\n'):
|
||
if 'SSH_AUTH_SOCK' in line:
|
||
sock = line.split('=')[1].split(';')[0]
|
||
os.environ['SSH_AUTH_SOCK'] = sock
|
||
elif 'SSH_AGENT_PID' in line:
|
||
pid = line.split('=')[1].split(';')[0]
|
||
os.environ['SSH_AGENT_PID'] = pid
|
||
|
||
# 添加 SSH 密钥
|
||
subprocess.run(["ssh-add", ssh_key_path])
|
||
|
||
except Exception as e:
|
||
self.logger.warning(f"SSH Agent 设置失败: {e}")
|
||
|
||
def commit_as_agent(self,
|
||
repo_name: str,
|
||
message: str,
|
||
files: Optional[List[str]] = None,
|
||
sign: bool = True) -> bool:
|
||
"""以当前 Agent 身份提交代码"""
|
||
|
||
if repo_name not in self.repositories:
|
||
raise ValueError(f"仓库 {repo_name} 不存在")
|
||
|
||
repository = self.repositories[repo_name]
|
||
repo_path = Path(repository.local_path)
|
||
|
||
try:
|
||
# 添加文件
|
||
if files:
|
||
for file in files:
|
||
subprocess.run(["git", "add", file], cwd=repo_path)
|
||
else:
|
||
subprocess.run(["git", "add", "."], cwd=repo_path)
|
||
|
||
# 提交
|
||
commit_cmd = ["git", "commit", "-m", message]
|
||
if sign:
|
||
commit_cmd.append("-S")
|
||
|
||
result = subprocess.run(commit_cmd, cwd=repo_path, capture_output=True, text=True)
|
||
|
||
if result.returncode == 0:
|
||
self.logger.info(f"Agent {repository.current_agent} 提交成功: {message}")
|
||
return True
|
||
else:
|
||
self.logger.error(f"提交失败: {result.stderr}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"提交过程出错: {e}")
|
||
return False
|
||
|
||
def progressive_push(self, repo_name: str, branch: str = "main") -> Dict[str, bool]:
|
||
"""渐进式推送到三个平台"""
|
||
|
||
if repo_name not in self.repositories:
|
||
raise ValueError(f"仓库 {repo_name} 不存在")
|
||
|
||
repository = self.repositories[repo_name]
|
||
repo_path = Path(repository.local_path)
|
||
|
||
results = {}
|
||
|
||
# 按顺序推送:Gitea -> Bitbucket -> GitHub
|
||
push_order = ["gitea", "bitbucket", "github"]
|
||
|
||
for remote in push_order:
|
||
if remote in repository.remotes:
|
||
try:
|
||
result = subprocess.run([
|
||
"git", "push", remote, branch
|
||
], cwd=repo_path, capture_output=True, text=True)
|
||
|
||
results[remote] = result.returncode == 0
|
||
|
||
if result.returncode == 0:
|
||
self.logger.info(f"推送到 {remote} 成功")
|
||
else:
|
||
self.logger.error(f"推送到 {remote} 失败: {result.stderr}")
|
||
# 如果某个平台失败,停止后续推送
|
||
break
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"推送到 {remote} 出错: {e}")
|
||
results[remote] = False
|
||
break
|
||
|
||
return results
|
||
|
||
def create_pull_request_workflow(self,
|
||
repo_name: str,
|
||
source_agent: str,
|
||
target_agent: str,
|
||
feature_branch: str,
|
||
title: str,
|
||
description: str = "") -> bool:
|
||
"""创建 Agent 间的 Pull Request 工作流"""
|
||
|
||
repository = self.repositories[repo_name]
|
||
repo_path = Path(repository.local_path)
|
||
|
||
try:
|
||
# 1. 切换到源 Agent
|
||
self.switch_agent_context(repo_name, source_agent)
|
||
|
||
# 2. 创建功能分支
|
||
subprocess.run([
|
||
"git", "checkout", "-b", feature_branch
|
||
], cwd=repo_path)
|
||
|
||
# 3. 推送功能分支
|
||
subprocess.run([
|
||
"git", "push", "-u", "gitea", feature_branch
|
||
], cwd=repo_path)
|
||
|
||
# 4. 这里可以集成 API 调用来创建实际的 PR
|
||
# 具体实现取决于使用的 Git 平台
|
||
|
||
self.logger.info(f"创建 PR 工作流: {source_agent} -> {target_agent}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"创建 PR 工作流失败: {e}")
|
||
return False
|
||
|
||
def get_repository_status(self, repo_name: str) -> Dict[str, Any]:
|
||
"""获取仓库状态"""
|
||
if repo_name not in self.repositories:
|
||
raise ValueError(f"仓库 {repo_name} 不存在")
|
||
|
||
repository = self.repositories[repo_name]
|
||
repo_path = Path(repository.local_path)
|
||
|
||
status = {
|
||
"current_agent": repository.current_agent,
|
||
"branch": None,
|
||
"uncommitted_changes": False,
|
||
"remotes": repository.remotes
|
||
}
|
||
|
||
try:
|
||
# 获取当前分支
|
||
result = subprocess.run([
|
||
"git", "branch", "--show-current"
|
||
], cwd=repo_path, capture_output=True, text=True)
|
||
|
||
if result.returncode == 0:
|
||
status["branch"] = result.stdout.strip()
|
||
|
||
# 检查未提交的更改
|
||
result = subprocess.run([
|
||
"git", "status", "--porcelain"
|
||
], cwd=repo_path, capture_output=True, text=True)
|
||
|
||
status["uncommitted_changes"] = bool(result.stdout.strip())
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"获取仓库状态失败: {e}")
|
||
|
||
return status |