Source code for contextbench.core.repo

"""Git repository checkout management."""

import os
import re
import subprocess
import sys
import tempfile
from typing import Optional, List, Iterable

[docs] def checkout( repo_url: str, commit: str, cache_dir: str, verbose: bool = True, sparse_paths: Optional[List[str]] = None, ) -> Optional[str]: """ Checkout repo at specific commit. Concurrency-safe strategy: - Keep one shared "base" clone per repo under cache_dir (never switch its HEAD). - Create a dedicated detached worktree per commit under /tmp (or a configurable tmp root). This prevents multiple commits from fighting over the same working directory. """ if not repo_url or not commit or not cache_dir: return None sparse_list = _normalize_sparse_paths(sparse_paths) repo_key = _normalize_url(repo_url) os.makedirs(cache_dir, exist_ok=True) # If repo_url is an existing local git clone, use it as the base directly. if os.path.isdir(repo_url) and os.path.isdir(os.path.join(repo_url, ".git")): base_dir = repo_url else: base_dir = os.path.join(cache_dir, repo_key) tmp_root = os.environ.get("CONTEXTBENCH_TMP_ROOT") or tempfile.gettempdir() worktree_root = os.path.join(tmp_root, "contextbench_worktrees", repo_key) worktree_dir = os.path.join(worktree_root, commit) # Fast path: worktree already exists at the right commit if os.path.isdir(worktree_dir) and _verify_commit(worktree_dir, commit): if sparse_list: _ensure_sparse_checkout(worktree_dir, commit, sparse_list, verbose=verbose) return worktree_dir lock_path = os.path.join(cache_dir, f"{repo_key}.lock") with _file_lock(lock_path): # Ensure base repo exists (only when not using an existing local clone) if not os.path.isdir(os.path.join(base_dir, ".git")): if verbose: print(f" Cloning base repo {repo_url}", file=sys.stderr) for attempt in range(1, 4): if os.path.isdir(base_dir): try: subprocess.run(["rm", "-rf", base_dir], check=False) except Exception: pass result = _git( ["clone", "--filter=blob:none", "--no-checkout", "--progress", repo_url, base_dir], show_progress=verbose, timeout=1800, ) if result.returncode == 0 and os.path.isdir(os.path.join(base_dir, ".git")): break if verbose: print(f" Base clone failed (attempt {attempt}/3)", file=sys.stderr) else: return None # Fetch the desired commit into the base repo (does not change HEAD) _git( ["fetch", "--depth", "1", "--filter=blob:none", "origin", commit], cwd=base_dir, show_progress=verbose, timeout=1800, ) # Clean up stale worktree registrations (best-effort) _git(["worktree", "prune"], cwd=base_dir, timeout=600) # If another process created it while we waited for the lock, reuse it. if os.path.isdir(worktree_dir) and _verify_commit(worktree_dir, commit): if sparse_list: _ensure_sparse_checkout(worktree_dir, commit, sparse_list, verbose=verbose) return worktree_dir os.makedirs(worktree_root, exist_ok=True) # Create a detached worktree for the specific commit. wt_args = ["worktree", "add", "--detach"] if sparse_list: # Avoid checking out the full tree (inode explosion on huge repos). wt_args.append("--no-checkout") wt_args += [worktree_dir, commit] wt = _git(wt_args, cwd=base_dir, show_progress=verbose, timeout=1800) if wt.returncode != 0: # If it failed because the directory/worktree exists, try to reuse. if os.path.isdir(worktree_dir) and _verify_commit(worktree_dir, commit): if sparse_list: _ensure_sparse_checkout(worktree_dir, commit, sparse_list, verbose=verbose) return worktree_dir return None if sparse_list: _ensure_sparse_checkout(worktree_dir, commit, sparse_list, verbose=verbose) return worktree_dir if _verify_commit(worktree_dir, commit) else None
def _normalize_url(url: str) -> str: """Convert git URL to directory-safe name.""" s = re.sub(r"^https?://", "", url.strip()) s = re.sub(r"^git@", "", s).replace(":", "/").rstrip("/") s = s.replace("/", "__").replace(".git", "") return re.sub(r"[^A-Za-z0-9_.-]+", "_", s) or "repo" class _file_lock: def __init__(self, path: str): self.path = path self.f = None def __enter__(self): os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True) self.f = open(self.path, "a+", encoding="utf-8") try: import fcntl # Linux fcntl.flock(self.f.fileno(), fcntl.LOCK_EX) except Exception: # If flock is unavailable, proceed without a lock. pass return self def __exit__(self, exc_type, exc, tb): try: if self.f: try: import fcntl fcntl.flock(self.f.fileno(), fcntl.LOCK_UN) except Exception: pass self.f.close() finally: self.f = None return False def _git(args, cwd=None, show_progress=False, timeout: int = 600): """Run git command.""" env = os.environ.copy() # Prevent git-lfs from downloading large blobs during checkout. env.setdefault("GIT_LFS_SKIP_SMUDGE", "1") if show_progress: # Show real-time output for clone/fetch operations return subprocess.run( ["git"] + args, cwd=cwd, check=False, timeout=timeout, env=env ) else: return subprocess.run( ["git"] + args, cwd=cwd, capture_output=True, text=True, check=False, timeout=timeout, env=env, ) def _verify_commit(work_dir: str, expected: str) -> bool: """Check if working directory is at expected commit.""" result = _git(["rev-parse", "HEAD"], cwd=work_dir) return result.returncode == 0 and result.stdout.strip() == expected def _normalize_sparse_paths(paths: Optional[Iterable[str]]) -> List[str]: """Normalize sparse paths to repo-relative POSIX-style paths.""" if not paths: return [] out: List[str] = [] for p in paths: if not p: continue s = str(p).strip().strip("'\"").replace("\\", "/") if not s: continue while s.startswith("./"): s = s[2:] s = s.lstrip("/") if not s or s.startswith(".."): continue out.append(s) # Deduplicate while keeping stable order. seen = set() uniq: List[str] = [] for s in out: if s in seen: continue seen.add(s) uniq.append(s) return uniq def _ensure_sparse_checkout(worktree_dir: str, commit: str, sparse_paths: List[str], verbose: bool = True) -> None: """Configure sparse checkout for this worktree and checkout commit.""" if not sparse_paths: return # Initialize sparse-checkout for this worktree (non-cone to allow file paths). _git(["sparse-checkout", "init", "--no-cone"], cwd=worktree_dir, show_progress=verbose, timeout=600) # Set sparse paths (prunes everything else from the worktree). _git(["sparse-checkout", "set", "--no-cone", "--"] + list(sparse_paths), cwd=worktree_dir, show_progress=verbose, timeout=600) # Ensure the intended commit is checked out (with sparse rules applied). _git(["checkout", "--detach", commit], cwd=worktree_dir, show_progress=verbose, timeout=1800)