Skip to content

Roster

roster

Per-task ACP roster: aggregates in-container agents into one endpoint.

ACPRoster owns the per-task state for the ACP host-proxy:

  • the cache lookup that answers "what models does this agent advertise?"
  • the live walk that answers "what agents are currently authenticated for this image?" — re-evaluated on every session/new so newly-authed agents appear without daemon restart
  • the proxy attach loop (delegated to proxy) that brokers JSON-RPC frames between the connected client and the chosen backend

The class follows the shape of AgentRunner: lazy-init properties for cross-cutting subsystems, OOP over free functions, no mutable state in __init__ beyond the parameters themselves.

DEFAULT_AUTH_IDENTITY = 'global' module-attribute

Sentinel used everywhere terok auth is currently process-wide.

Future per-project auth makes this variable; the cache key already accommodates the change without a schema migration.

DEFAULT_CREDENTIAL_SCOPE = 'default' module-attribute

Scope name used by CredentialDB for the process-wide credential set. Mirrors what authenticate writes.

ACPRoster(*, container_name, image_id, sandbox, auth_identity=DEFAULT_AUTH_IDENTITY, cache=None)

Per-task ACP aggregator.

Construct one per running task — the roster owns the per-agent probe cache lookups and the attach loop that brokers a connected ACP client. It probes every agent declared in the image's ai.terok.agents label; failed probes (missing wrapper, no credentials, agent crashed) cache empty so a misbehaving agent doesn't get re-probed every session/new. The roster deliberately does not consult the credential vault: that view is incomplete (file-mounted creds aren't there) and the proxy has nothing useful to do with the answer anyway — a probe that succeeds is, by definition, an authed agent.

Source code in src/terok_executor/acp/roster.py
def __init__(
    self,
    *,
    container_name: str,
    image_id: str,
    sandbox: Sandbox,
    auth_identity: str = DEFAULT_AUTH_IDENTITY,
    cache: AgentRosterCache | None = None,
) -> None:
    self._container_name = container_name
    self._image_id = image_id
    self._sandbox = sandbox
    self._auth_identity = auth_identity
    # Don't ``cache or GLOBAL_CACHE`` here — ``AgentRosterCache`` defines
    # ``__len__``, so an empty cache is falsy and would silently swap in
    # the global singleton.  Explicit ``is None`` check.
    self._cache = cache if cache is not None else GLOBAL_CACHE

configured_agents cached property

Agents declared in the image's ai.terok.agents label.

Parsed once per roster instance — the image label is stable for the lifetime of the running task. The label is a comma- separated list (see AGENTS_LABEL).

acp_capable_agents cached property

Subset of configured_agents that ship a terok-{agent}-acp wrapper.

The image label lists every agent in the runtime — claude, opencode, gh, sonar, blablador, etc. Of those, only the ones that actually install an ACP wrapper script (currently claude, codex, copilot, opencode, vibe) can be probed by the proxy; the rest are tools or LLM gateways that don't speak the protocol at all. Probing them anyway costs a full probe_timeout per agent for nothing — and worse, leaves their wrappers as zombie subprocess threads in the executor pool until exec_stdio's own timeout kills them.

Resolved by a single in-container shell call at first use (command -v is built-in to bash, near-zero cost). The property is cached for the roster's lifetime; new wrappers installed mid-task aren't picked up without a daemon restart.

list_available_agents() async

Return agent:model ids ready to surface to a client.

Probes every agent in the image's ai.terok.agents label (filtered through the cache) and concatenates the namespaced model ids of those that responded. Cold-cache agents are probed in parallel via gather, so first-call latency is max(probe_time) rather than sum(probe_time). Successful probes cache the model tuple for the daemon's lifetime; failed probes are not cached so a transient cold start (Node wrapper warming up, OAuth refresh in flight) can recover on the next session/new instead of wedging the roster empty until the daemon restarts.

Source code in src/terok_executor/acp/roster.py
async def list_available_agents(self) -> list[str]:
    """Return ``agent:model`` ids ready to surface to a client.

    Probes every agent in the image's ``ai.terok.agents`` label
    (filtered through the cache) and concatenates the namespaced
    model ids of those that responded.  Cold-cache agents are
    probed in parallel via [`gather`][asyncio.gather], so first-call
    latency is ``max(probe_time)`` rather than ``sum(probe_time)``.
    Successful probes cache the model tuple for the daemon's
    lifetime; failed probes are *not* cached so a transient cold
    start (Node wrapper warming up, OAuth refresh in flight) can
    recover on the next ``session/new`` instead of wedging the
    roster empty until the daemon restarts.
    """
    agents_in_order = self.acp_capable_agents
    cold = [a for a in agents_in_order if self._cache.get(self._cache_key(a)) is None]
    if cold:
        await asyncio.gather(*(self.warm(a) for a in cold))
    out: list[str] = []
    for agent in agents_in_order:
        for model in self._cache.get(self._cache_key(agent)) or ():
            out.append(f"{agent}{MODEL_NAMESPACE_SEP}{model}")
    return out

warm(agent_id) async

Probe agent_id and cache the result on success only.

Returns the probed model tuple (possibly empty on failure). Failures are deliberately not cached: a transient cold- start failure (slow Node start, OAuth refresh racing the probe timeout) would otherwise pin the agent at empty for the daemon's lifetime. The trade-off is paid in cold-start latency: an agent that's genuinely unavailable gets re- probed every session/new and adds its full timeout to the response. Successful probes are cached per-daemon and reused across reconnects.

Source code in src/terok_executor/acp/roster.py
async def warm(self, agent_id: str) -> tuple[str, ...]:
    """Probe *agent_id* and cache the result on success only.

    Returns the probed model tuple (possibly empty on failure).
    Failures are deliberately *not* cached: a transient cold-
    start failure (slow Node start, OAuth refresh racing the
    probe timeout) would otherwise pin the agent at empty for
    the daemon's lifetime.  The trade-off is paid in cold-start
    latency: an agent that's *genuinely* unavailable gets re-
    probed every ``session/new`` and adds its full timeout to
    the response.  Successful probes are cached per-daemon and
    reused across reconnects.
    """
    key = self._cache_key(agent_id)
    try:
        models = await self._probe(agent_id)
    except ProbeError as exc:
        _logger.warning("ACP probe failed for agent %r: %s", agent_id, exc)
        return ()
    self._cache.put(key, models)
    return models

attach(reader, writer) async

Run the proxy loop for one connected client until disconnect.

Delegates the JSON-RPC state machine to ACPProxy. The roster owns the data (cache + live walk); the proxy owns the protocol.

Source code in src/terok_executor/acp/roster.py
async def attach(
    self,
    reader: asyncio.StreamReader,
    writer: asyncio.StreamWriter,
) -> None:
    """Run the proxy loop for one connected client until disconnect.

    Delegates the JSON-RPC state machine to [`ACPProxy`][terok_executor.acp.proxy.ACPProxy].  The
    roster owns the data (cache + live walk); the proxy owns the
    protocol.
    """
    proxy = ACPProxy(roster=self)
    await proxy.run(reader, writer)

wrapper_argv(agent_id)

Return the argv that runs terok-{agent_id}-acp in this container.

Hands back something a caller can pass directly to create_subprocess_exec — both the bind path and the probe path use this so they can attach asyncio's pipe transports to the wrapper subprocess. Currently podman-specific; a krun runtime would need a different shape (which is why this method lives on the roster, not on the proxy or probe).

Source code in src/terok_executor/acp/roster.py
def wrapper_argv(self, agent_id: str) -> list[str]:
    """Return the argv that runs ``terok-{agent_id}-acp`` in this container.

    Hands back something a caller can pass directly to
    [`create_subprocess_exec`][asyncio.create_subprocess_exec] — both
    the bind path and the probe path use this so they can attach
    asyncio's pipe transports to the wrapper subprocess.  Currently
    podman-specific; a krun runtime would need a different shape
    (which is why this method lives on the roster, not on the proxy
    or probe).
    """
    return ["podman", "exec", "-i", self._container_name, f"terok-{agent_id}-acp"]

list_authenticated_agents(*, db_path=None, scope=DEFAULT_CREDENTIAL_SCOPE)

Return provider names that have stored credentials in scope.

Pure query against CredentialDB — no probing, no container exec. Used by the host-side acp list to classify endpoints in its status display; the roster itself doesn't gate probing on this anymore (file-based auth like Claude's OAuth lives outside the vault, so a vault-only filter would silently hide working agents).

Source code in src/terok_executor/acp/roster.py
def list_authenticated_agents(
    *,
    db_path: Path | None = None,
    scope: str = DEFAULT_CREDENTIAL_SCOPE,
) -> list[str]:
    """Return provider names that have stored credentials in *scope*.

    Pure query against [`CredentialDB`][terok_sandbox.CredentialDB] — no probing,
    no container exec.  Used by the host-side ``acp list`` to classify
    endpoints in its status display; the roster itself doesn't gate
    probing on this anymore (file-based auth like Claude's OAuth lives
    outside the vault, so a vault-only filter would silently hide
    working agents).
    """
    cfg = SandboxConfig()
    # ``db_path`` override exists for tests + multi-instance hosts; the
    # cfg still owns the tier policy so this caller never has to know
    # about the chain mechanism (session-file / systemd-creds /
    # keyring / config).
    db = cfg.open_credential_db(db_path)
    try:
        return list(db.list_credentials(scope))
    finally:
        db.close()