Skip to content

Acp

acp

Per-task host-side ACP (Agent Client Protocol) aggregator.

Bridges a single ACP client (Zed, Toad, …) to one of several in-container agents (claude, codex, copilot, …) by namespacing models as agent:model (e.g. claude:opus-4.6) under ACP's standard category: "model" configOption.

Module map:

  • daemon — Unix-socket server, container lifecycle supervision, and the standalone terok-executor acp entry point. Owns serve_acp and the acp_socket_is_live probe used to distinguish live daemons from stale socket files.
  • roster — per-task aggregation: walks the image's ai.terok.agents label, probes each agent, and answers "what models does this container offer?" Owns ACPRoster and the vault-side list_authenticated_agents.
  • proxy — the typed bidirectional ACP mediator: implements both acp.Agent (toward the connected client) and acp.Client (toward the bound backend wrapper) on one object. Drives the bind handshake on first model pick.
  • probe — the minimal initialize + session/new handshake that extracts an agent's model roster.
  • cache — thread-safe per-agent model cache; survives reconnects, invalidated on credential rotation.
  • endpoint — the ACPEndpointStatus enum the host CLI uses to classify endpoints in terok acp list.
  • model_options — the agent:model namespace vocabulary and the typed builders + rewriter that keep the proxy's frames schema-valid.

Bind-trigger surfaces: explicit session/set_model / session/set_config_option(configId="model"), or — for clients that trust the advertised currentModelId — lazily on the first backend-needing method (e.g. session/prompt). Cross-agent switching mid-session is out of scope for v1; subsequent picks against a different agent are rejected at the protocol level.

The exports below are re-exported from terok_executor so the host-side caller (terok) doesn't have to reach into the submodules.

__all__ = ['ACPEndpointStatus', 'ACPRoster', 'AgentBindError', 'AgentRosterCache', 'CacheKey', 'ProbeError', 'acp_socket_is_live', 'list_authenticated_agents', 'probe_agent_models', 'serve_acp'] module-attribute

AgentRosterCache()

Thread-safe map from CacheKey to a tuple of model ids.

Models are stored as a tuple so cache entries are immutable once inserted — callers can return them directly without defensive copying. Empty tuples are valid and signal "probe ran but yielded nothing" (saved to avoid hammering a misconfigured agent on every session).

Source code in src/terok_executor/acp/cache.py
def __init__(self) -> None:
    self._models: dict[CacheKey, tuple[str, ...]] = {}
    self._lock = threading.Lock()

get(key)

Return cached models for key, or None if not yet probed.

Source code in src/terok_executor/acp/cache.py
def get(self, key: CacheKey) -> tuple[str, ...] | None:
    """Return cached models for *key*, or ``None`` if not yet probed."""
    with self._lock:
        return self._models.get(key)

put(key, models)

Store models under key, replacing any existing entry.

Source code in src/terok_executor/acp/cache.py
def put(self, key: CacheKey, models: tuple[str, ...]) -> None:
    """Store *models* under *key*, replacing any existing entry."""
    with self._lock:
        self._models[key] = models

invalidate_auth(auth_identity)

Drop every entry tied to auth_identity.

Used when credentials for an identity rotate — the next session/new re-probes affected agents.

Source code in src/terok_executor/acp/cache.py
def invalidate_auth(self, auth_identity: str) -> None:
    """Drop every entry tied to *auth_identity*.

    Used when credentials for an identity rotate — the next
    ``session/new`` re-probes affected agents.
    """
    with self._lock:
        self._models = {
            k: v for k, v in self._models.items() if k.auth_identity != auth_identity
        }

__len__()

Return the number of cached entries (for tests / introspection).

Source code in src/terok_executor/acp/cache.py
def __len__(self) -> int:
    """Return the number of cached entries (for tests / introspection)."""
    with self._lock:
        return len(self._models)

CacheKey(image_id, auth_identity, agent_id) dataclass

Composite key for one agent's roster within one auth scope.

auth_identity is the constant "global" today (terok auth is process-wide); the field exists from day one so per-project auth can slot in without a key-schema migration.

image_id instance-attribute

auth_identity instance-attribute

agent_id instance-attribute

ACPEndpointStatus

Bases: StrEnum

Live state of a per-task ACP endpoint.

The host classifier (Project.acp_endpoints) attaches one of these to every running task; the value drives both the rendered row in acp list and the decision acp connect makes about whether to spawn a daemon.

ACTIVE = 'active' class-attribute instance-attribute

Daemon up, socket bound, ready for client connections.

READY = 'ready' class-attribute instance-attribute

Task running with at least one authenticated agent — a daemon will spawn on first terok acp connect.

UNSUPPORTED = 'unsupported' class-attribute instance-attribute

Task running but no in-image agents are authenticated. Connect would fail; surface honestly so the user knows to authenticate.

ProbeError

Bases: RuntimeError

Raised when an agent fails to respond to the probe handshake.

The cache stores empty rosters for failed probes (so we don't hammer a misconfigured agent on every session) — callers should treat ProbeError as "this agent is currently unusable" rather than bubble it to the user.

AgentBindError

Bases: RuntimeError

Surface error raised when the proxy fails to bind a backend agent.

Always converted to a JSON-RPC error response on the wire — never bubbles to the caller of run.

ACPRoster(*, container_name, image_id, sandbox, auth_identity=DEFAULT_AUTH_IDENTITY, cache=None)

Per-task ACP aggregator.

Construct one per running task — the roster owns the per-agent probe cache lookups and the attach loop that brokers a connected ACP client. It probes every agent declared in the image's ai.terok.agents label; failed probes (missing wrapper, no credentials, agent crashed) cache empty so a misbehaving agent doesn't get re-probed every session/new. The roster deliberately does not consult the credential vault: that view is incomplete (file-mounted creds aren't there) and the proxy has nothing useful to do with the answer anyway — a probe that succeeds is, by definition, an authed agent.

Source code in src/terok_executor/acp/roster.py
def __init__(
    self,
    *,
    container_name: str,
    image_id: str,
    sandbox: Sandbox,
    auth_identity: str = DEFAULT_AUTH_IDENTITY,
    cache: AgentRosterCache | None = None,
) -> None:
    self._container_name = container_name
    self._image_id = image_id
    self._sandbox = sandbox
    self._auth_identity = auth_identity
    # Don't ``cache or GLOBAL_CACHE`` here — ``AgentRosterCache`` defines
    # ``__len__``, so an empty cache is falsy and would silently swap in
    # the global singleton.  Explicit ``is None`` check.
    self._cache = cache if cache is not None else GLOBAL_CACHE

configured_agents cached property

Agents declared in the image's ai.terok.agents label.

Parsed once per roster instance — the image label is stable for the lifetime of the running task. The label is a comma- separated list (see AGENTS_LABEL).

acp_capable_agents cached property

Subset of configured_agents that ship a terok-{agent}-acp wrapper.

The image label lists every agent in the runtime — claude, opencode, gh, sonar, blablador, etc. Of those, only the ones that actually install an ACP wrapper script (currently claude, codex, copilot, opencode, vibe) can be probed by the proxy; the rest are tools or LLM gateways that don't speak the protocol at all. Probing them anyway costs a full probe_timeout per agent for nothing — and worse, leaves their wrappers as zombie subprocess threads in the executor pool until exec_stdio's own timeout kills them.

Resolved by a single in-container shell call at first use (command -v is built-in to bash, near-zero cost). The property is cached for the roster's lifetime; new wrappers installed mid-task aren't picked up without a daemon restart.

list_available_agents() async

Return agent:model ids ready to surface to a client.

Probes every agent in the image's ai.terok.agents label (filtered through the cache) and concatenates the namespaced model ids of those that responded. Cold-cache agents are probed in parallel via gather, so first-call latency is max(probe_time) rather than sum(probe_time). Successful probes cache the model tuple for the daemon's lifetime; failed probes are not cached so a transient cold start (Node wrapper warming up, OAuth refresh in flight) can recover on the next session/new instead of wedging the roster empty until the daemon restarts.

Source code in src/terok_executor/acp/roster.py
async def list_available_agents(self) -> list[str]:
    """Return ``agent:model`` ids ready to surface to a client.

    Probes every agent in the image's ``ai.terok.agents`` label
    (filtered through the cache) and concatenates the namespaced
    model ids of those that responded.  Cold-cache agents are
    probed in parallel via [`gather`][asyncio.gather], so first-call
    latency is ``max(probe_time)`` rather than ``sum(probe_time)``.
    Successful probes cache the model tuple for the daemon's
    lifetime; failed probes are *not* cached so a transient cold
    start (Node wrapper warming up, OAuth refresh in flight) can
    recover on the next ``session/new`` instead of wedging the
    roster empty until the daemon restarts.
    """
    agents_in_order = self.acp_capable_agents
    cold = [a for a in agents_in_order if self._cache.get(self._cache_key(a)) is None]
    if cold:
        await asyncio.gather(*(self.warm(a) for a in cold))
    out: list[str] = []
    for agent in agents_in_order:
        for model in self._cache.get(self._cache_key(agent)) or ():
            out.append(f"{agent}{MODEL_NAMESPACE_SEP}{model}")
    return out

warm(agent_id) async

Probe agent_id and cache the result on success only.

Returns the probed model tuple (possibly empty on failure). Failures are deliberately not cached: a transient cold- start failure (slow Node start, OAuth refresh racing the probe timeout) would otherwise pin the agent at empty for the daemon's lifetime. The trade-off is paid in cold-start latency: an agent that's genuinely unavailable gets re- probed every session/new and adds its full timeout to the response. Successful probes are cached per-daemon and reused across reconnects.

Source code in src/terok_executor/acp/roster.py
async def warm(self, agent_id: str) -> tuple[str, ...]:
    """Probe *agent_id* and cache the result on success only.

    Returns the probed model tuple (possibly empty on failure).
    Failures are deliberately *not* cached: a transient cold-
    start failure (slow Node start, OAuth refresh racing the
    probe timeout) would otherwise pin the agent at empty for
    the daemon's lifetime.  The trade-off is paid in cold-start
    latency: an agent that's *genuinely* unavailable gets re-
    probed every ``session/new`` and adds its full timeout to
    the response.  Successful probes are cached per-daemon and
    reused across reconnects.
    """
    key = self._cache_key(agent_id)
    try:
        models = await self._probe(agent_id)
    except ProbeError as exc:
        _logger.warning("ACP probe failed for agent %r: %s", agent_id, exc)
        return ()
    self._cache.put(key, models)
    return models

attach(reader, writer) async

Run the proxy loop for one connected client until disconnect.

Delegates the JSON-RPC state machine to ACPProxy. The roster owns the data (cache + live walk); the proxy owns the protocol.

Source code in src/terok_executor/acp/roster.py
async def attach(
    self,
    reader: asyncio.StreamReader,
    writer: asyncio.StreamWriter,
) -> None:
    """Run the proxy loop for one connected client until disconnect.

    Delegates the JSON-RPC state machine to [`ACPProxy`][terok_executor.acp.proxy.ACPProxy].  The
    roster owns the data (cache + live walk); the proxy owns the
    protocol.
    """
    proxy = ACPProxy(roster=self)
    await proxy.run(reader, writer)

wrapper_argv(agent_id)

Return the argv that runs terok-{agent_id}-acp in this container.

Hands back something a caller can pass directly to create_subprocess_exec — both the bind path and the probe path use this so they can attach asyncio's pipe transports to the wrapper subprocess. Currently podman-specific; a krun runtime would need a different shape (which is why this method lives on the roster, not on the proxy or probe).

Source code in src/terok_executor/acp/roster.py
def wrapper_argv(self, agent_id: str) -> list[str]:
    """Return the argv that runs ``terok-{agent_id}-acp`` in this container.

    Hands back something a caller can pass directly to
    [`create_subprocess_exec`][asyncio.create_subprocess_exec] — both
    the bind path and the probe path use this so they can attach
    asyncio's pipe transports to the wrapper subprocess.  Currently
    podman-specific; a krun runtime would need a different shape
    (which is why this method lives on the roster, not on the proxy
    or probe).
    """
    return ["podman", "exec", "-i", self._container_name, f"terok-{agent_id}-acp"]

acp_socket_is_live(path)

Return True when a peer is currently accepting on path.

Distinguishes a live ACP daemon from a stale socket file left behind by a crash: a successful connect means a peer is listening, while ECONNREFUSED (and any other OSError) means the file is safe to unlink.

Source code in src/terok_executor/acp/daemon.py
def acp_socket_is_live(path: Path) -> bool:
    """Return ``True`` when a peer is currently accepting on *path*.

    Distinguishes a live ACP daemon from a stale socket file left
    behind by a crash: a successful ``connect`` means a peer is
    listening, while ``ECONNREFUSED`` (and any other ``OSError``)
    means the file is safe to unlink.
    """
    if not path.exists():
        return False
    try:
        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as probe:
            probe.settimeout(0.2)
            probe.connect(str(path))
    except OSError:
        return False
    return True

serve_acp(container_name, socket_path, *, sandbox=None, poll_interval_sec=CONTAINER_POLL_INTERVAL_SEC)

Bind socket_path and run the ACP host-proxy until container_name stops.

Returns the process exit code. When sandbox is None, builds one from the layered config.yml + env (the same SandboxConfig() defaults executor uses everywhere).

Source code in src/terok_executor/acp/daemon.py
def serve_acp(
    container_name: str,
    socket_path: Path,
    *,
    sandbox: Sandbox | None = None,
    poll_interval_sec: float = CONTAINER_POLL_INTERVAL_SEC,
) -> int:
    """Bind *socket_path* and run the ACP host-proxy until *container_name* stops.

    Returns the process exit code.  When *sandbox* is ``None``, builds
    one from the layered ``config.yml`` + env (the same
    ``SandboxConfig()`` defaults executor uses everywhere).
    """
    if sandbox is None:
        sandbox = Sandbox(config=SandboxConfig())
    return asyncio.run(_run(container_name, socket_path, sandbox, poll_interval_sec))

probe_agent_models(*, agent_id, wrapper_argv, timeout=DEFAULT_PROBE_TIMEOUT_SEC, cwd='/workspace') async

Drive the minimal ACP handshake against terok-{agent_id}-acp.

Spawns the wrapper via acp.spawn_agent_process (which owns the asyncio stdio bridging and the graceful subprocess shutdown dance), sends initialize and session/new, reads the models block, and returns the bare model ids.

Raises ProbeError on timeout, transport failure, or any handshake error. Callers (the roster cache) typically catch it, cache an empty roster, and skip the agent until the container is restarted.

Source code in src/terok_executor/acp/probe.py
async def probe_agent_models(
    *,
    agent_id: str,
    wrapper_argv: list[str],
    timeout: float = DEFAULT_PROBE_TIMEOUT_SEC,
    cwd: str = "/workspace",
) -> tuple[str, ...]:
    """Drive the minimal ACP handshake against ``terok-{agent_id}-acp``.

    Spawns the wrapper via `acp.spawn_agent_process`
    (which owns the asyncio stdio bridging and the graceful subprocess
    shutdown dance), sends ``initialize`` and ``session/new``, reads
    the ``models`` block, and returns the bare model ids.

    Raises [`ProbeError`][terok_executor.acp.probe.ProbeError] on
    timeout, transport failure, or any handshake error.  Callers (the
    roster cache) typically catch it, cache an empty roster, and skip
    the agent until the container is restarted.
    """
    command, *args = wrapper_argv
    try:
        async with asyncio.timeout(timeout):
            async with spawn_agent_process(_ProbeClient(), command, *args) as (client, _proc):
                await client.initialize(
                    protocol_version=PROTOCOL_VERSION,
                    client_capabilities=ClientCapabilities(),
                )
                resp = await client.new_session(cwd=cwd, mcp_servers=[])
    except TimeoutError as exc:
        _logger.warning("ACP probe for agent %r timed out after %.1fs", agent_id, timeout)
        raise ProbeError(f"probe timed out for agent {agent_id!r}") from exc
    except Exception as exc:
        raise ProbeError(f"probe failed for agent {agent_id!r}: {exc}") from exc

    if resp.models is None:
        return ()
    return tuple(m.model_id for m in resp.models.available_models)

list_authenticated_agents(*, db_path=None, scope=DEFAULT_CREDENTIAL_SCOPE)

Return provider names that have stored credentials in scope.

Pure query against CredentialDB — no probing, no container exec. Used by the host-side acp list to classify endpoints in its status display; the roster itself doesn't gate probing on this anymore (file-based auth like Claude's OAuth lives outside the vault, so a vault-only filter would silently hide working agents).

Source code in src/terok_executor/acp/roster.py
def list_authenticated_agents(
    *,
    db_path: Path | None = None,
    scope: str = DEFAULT_CREDENTIAL_SCOPE,
) -> list[str]:
    """Return provider names that have stored credentials in *scope*.

    Pure query against [`CredentialDB`][terok_sandbox.CredentialDB] — no probing,
    no container exec.  Used by the host-side ``acp list`` to classify
    endpoints in its status display; the roster itself doesn't gate
    probing on this anymore (file-based auth like Claude's OAuth lives
    outside the vault, so a vault-only filter would silently hide
    working agents).
    """
    cfg = SandboxConfig()
    # ``db_path`` override exists for tests + multi-instance hosts; the
    # cfg still owns the tier policy so this caller never has to know
    # about the chain mechanism (session-file / systemd-creds /
    # keyring / config).
    db = cfg.open_credential_db(db_path)
    try:
        return list(db.list_credentials(scope))
    finally:
        db.close()