guildhouse-mcp/src/guildhouse_mcp/tools/forgejo.py
Tyler King dffa821c38 feat: Guildhouse MCP server + multi-agent governed dev pipeline
MCP server connecting Claude Code to the Guildhouse ecosystem.

Tools (23):
  Forgejo: repos, files, branches, PRs, CI status
  Chronicle: events, epochs, verify, emit
  GSAP proxy: request_ac, check_operation, get_posture, delegate
  Capstone: agents, tenants
  Tasks: list, get, create, update_status, submit_for_review, submit_review

Multi-agent topology:
  Lead Agent — plans work, delegates, reviews proposals
  Worker Agents — implement in parallel, submit PRs with confidence
  Reviewer Agent — QA's worker output, submits review confidence
  Confidence Gate — auto-merge/flag/propose/reject based on combined score

Confidence thresholds (configurable):
  >= 85: auto-merge (no human needed)
  >= 70: flag-merge (merged, flagged for post-review)
  >= 50: proposal (Tyler reviews)
  < 50: reject (worker revises)

Reviewer can override: 'reject' always rejects, 'request_changes'
caps at propose.

Task definitions (TOML) with phased prompts, success criteria,
and delegation scopes.

Every tool call emits Chronicle MCP_TOOL_CALL (0x3020).
Every gate decision emits Chronicle events (0x4010-0x4013).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 03:24:01 -04:00

143 lines
4.2 KiB
Python

"""Forgejo REST API tools."""
import json
import httpx
from ..config import settings
_client: httpx.AsyncClient | None = None
def _get_client() -> httpx.AsyncClient:
global _client
if _client is None:
_client = httpx.AsyncClient(
base_url=settings.forgejo_url,
headers={
"Authorization": f"token {settings.forgejo_token}",
"Accept": "application/json",
},
timeout=30.0,
)
return _client
async def list_repos(org: str = "", limit: int = 20) -> dict:
client = _get_client()
if org:
resp = await client.get(f"/api/v1/orgs/{org}/repos", params={"limit": limit})
else:
resp = await client.get("/api/v1/repos/search", params={"limit": limit})
resp.raise_for_status()
data = resp.json()
repos = data if isinstance(data, list) else data.get("data", [])
return {
"repos": [
{
"full_name": r["full_name"],
"description": r.get("description", ""),
"default_branch": r.get("default_branch", "main"),
"updated_at": r.get("updated_at", ""),
"stars": r.get("stars_count", 0),
"open_issues": r.get("open_issues_count", 0),
}
for r in repos
],
"count": len(repos),
}
async def read_file(repo: str, path: str, ref: str = "main") -> str:
client = _get_client()
resp = await client.get(f"/api/v1/repos/{repo}/raw/{path}", params={"ref": ref})
resp.raise_for_status()
return resp.text
async def create_branch(repo: str, branch: str, from_ref: str = "main") -> dict:
client = _get_client()
resp = await client.post(
f"/api/v1/repos/{repo}/branches",
json={"new_branch_name": branch, "old_branch_name": from_ref},
)
resp.raise_for_status()
return {"branch": branch, "repo": repo, "status": "created"}
async def create_pr(
repo: str, title: str, body: str,
head: str, base: str = "main",
labels: list[str] | None = None,
metadata: dict | None = None,
) -> dict:
client = _get_client()
full_body = body
if metadata:
full_body += f"\n\n<!-- agent-metadata: {json.dumps(metadata)} -->"
resp = await client.post(
f"/api/v1/repos/{repo}/pulls",
json={
"title": title,
"body": full_body,
"head": head,
"base": base,
},
)
resp.raise_for_status()
pr = resp.json()
return {
"number": pr["number"],
"url": pr.get("html_url", ""),
"state": pr["state"],
}
async def get_pr(repo: str, pr_number: int) -> dict:
client = _get_client()
resp = await client.get(f"/api/v1/repos/{repo}/pulls/{pr_number}")
resp.raise_for_status()
pr = resp.json()
ci = await ci_status(repo, ref=pr.get("head", {}).get("sha", ""))
return {
"number": pr["number"],
"title": pr["title"],
"state": pr["state"],
"mergeable": pr.get("mergeable"),
"body": pr.get("body", ""),
"ci": ci,
"created_at": pr.get("created_at"),
"updated_at": pr.get("updated_at"),
}
async def ci_status(repo: str, ref: str = "main") -> dict:
client = _get_client()
try:
resp = await client.get(f"/api/v1/repos/{repo}/commits/{ref}/status")
if resp.status_code == 200:
data = resp.json()
return {
"state": data.get("state", "unknown"),
"total": data.get("total_count", 0),
"statuses": [
{"context": s.get("context"), "state": s.get("status")}
for s in data.get("statuses", [])
],
}
except Exception:
pass
return {"state": "unknown", "total": 0, "statuses": []}
async def merge_pr(repo: str, pr_number: int, method: str = "merge") -> dict:
"""Merge a PR. Only called by the confidence gate on auto-merge."""
client = _get_client()
resp = await client.post(
f"/api/v1/repos/{repo}/pulls/{pr_number}/merge",
json={"Do": method, "merge_message_field": ""},
)
resp.raise_for_status()
return {"merged": True, "pr_number": pr_number}