Skip to content

Proxy

proxy

ACP proxy — one connection's worth of typed JSON-RPC mediation.

ACPProxy is the bridge behind ACPRoster.attach. It implements both sides of the ACP protocol on the same object:

  • acp.Agent — facing the connected ACP client (Zed, Toad, …). An acp.agent.connection.AgentSideConnection reads the client's frames, deserialises them into typed pydantic models, and dispatches to self.initialize / self.new_session / self.prompt / etc.
  • acp.Client — facing the bound in-container backend wrapper. Once a model has been picked, a acp.client.connection.ClientSideConnection to terok-{agent}-acp reads the wrapper's frames and dispatches backend → client traffic (session/update, request_permission, fs/*, terminal/*) onto the same proxy object so it can forward to the connected client.

Two phases drive the lifecycle:

  • Pre-bind: initialize and session/new answer locally, advertising the aggregated agent:model list in acp.schema.SessionModelState plus a mirroring configOptions[category=model]. No backend process exists yet.
  • Bound: on the first model-picking client request — modern ACP's session/set_model or older Zed's session/set_config_option(category=model), or lazily on the first backend-needing method like session/prompt — the proxy spawns the in-container wrapper through acp.spawn_agent_process, replays initialize + session/new + session/set_model against it, and from then on forwards typed calls in both directions. Backend responses and notifications carrying model ids are re-namespaced on the way out so the client always sees agent:model ids.

V1 takes shortcuts where the design is still settling: one session per connection (Zed reconnects on every chat — fix on the roadmap), one bound agent per session (no cross-agent switches without reconnect), no push notifications when the authed-agent set changes mid-connection.

CLIENT_SESSION_ID = 'proxy-1' module-attribute

Synthetic session id the proxy advertises to the connected client.

Backend session ids never reach the client — every backend-originated frame has its sessionId rewritten to this constant on the way out.

CONTAINER_WORKSPACE = '/workspace' module-attribute

Path the backend session/new runs in.

ACP clients send their host filesystem path in new_session.cwd (Zed: /var/home/user/prog/X) which doesn't exist inside the container. claude-agent-acp chdirs into cwd before exec; an ENOENT there surfaces as the famously misleading "Claude Code native binary not found …". Pinning to the container's workspace mount is a stopgap until the host↔sandbox path strategy lands.

PROXY_AGENT_NAME = 'terok-acp' module-attribute

PROXY_AGENT_TITLE = 'Terok ACP host-proxy' module-attribute

PROXY_AGENT_VERSION = '1' module-attribute

SessionUpdatePayload = UserMessageChunk | AgentMessageChunk | AgentThoughtChunk | ToolCallStart | ToolCallProgress | AgentPlanUpdate | AvailableCommandsUpdate | CurrentModeUpdate | ConfigOptionUpdate | SessionInfoUpdate | UsageUpdate module-attribute

ACPProxy(*, roster)

One client connection's worth of proxy state.

Constructed by attach; lives for the duration of a single client connection. Not reusable — discard after run returns.

Source code in src/terok_executor/acp/proxy.py
def __init__(self, *, roster: ACPRoster) -> None:
    self._roster = roster
    self._client: AgentSideConnection | None = None
    self._backend: ClientSideConnection | None = None
    self._backend_stack = AsyncExitStack()
    self._bind_lock = asyncio.Lock()
    self._bound_agent: str | None = None
    self._backend_session_id: str | None = None
    self._client_mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio] | None = None
    self._client_additional_directories: list[str] | None = None
    self._client_capabilities: ClientCapabilities | None = None
    self._aggregated_models: list[str] = []
    # Namespaced ``agent:model`` advertised as ``currentModelId`` in
    # ``session/new``.  Lazy-bind target for clients that go straight
    # from ``session/new`` to ``session/prompt`` without an explicit
    # model selection — they trust that value and we follow through.
    self._default_namespaced: str | None = None
    self._session_announced = False

run(reader, writer) async

Run the typed proxy loop until the client disconnects.

Hands the client side to acp.agent.connection.AgentSideConnection which dispatches typed methods on this object. Always tears the bound backend down on exit, even on cancellation.

Source code in src/terok_executor/acp/proxy.py
async def run(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
    """Run the typed proxy loop until the client disconnects.

    Hands the client side to
    `acp.agent.connection.AgentSideConnection`
    which dispatches typed methods on this object.  Always tears the
    bound backend down on exit, even on cancellation.
    """
    self._client = AgentSideConnection(self, writer, reader, listening=False)
    try:
        await self._client.listen()
    finally:
        await self._teardown_backend()

on_connect(_conn)

Required by the acp.Agent / acp.Client protocols.

Source code in src/terok_executor/acp/proxy.py
def on_connect(self, _conn: Any) -> None:
    """Required by the `acp.Agent` / `acp.Client` protocols."""

initialize(protocol_version, client_capabilities=None, client_info=None, **_kw) async

Answer initialize locally and capture the client's caps.

client_capabilities is replayed verbatim on the backend's initialize during bind — whatever the client said it can do, the backend believes. client_info is accepted to satisfy the protocol but discarded (the proxy doesn't relay it).

Source code in src/terok_executor/acp/proxy.py
async def initialize(
    self,
    protocol_version: int,
    client_capabilities: ClientCapabilities | None = None,
    client_info: Implementation | None = None,
    **_kw: Any,
) -> InitializeResponse:
    """Answer ``initialize`` locally and capture the client's caps.

    ``client_capabilities`` is replayed verbatim on the backend's
    ``initialize`` during bind — whatever the client said it can do,
    the backend believes.  ``client_info`` is accepted to satisfy
    the protocol but discarded (the proxy doesn't relay it).
    """
    del protocol_version, client_info
    self._client_capabilities = client_capabilities or ClientCapabilities()
    return InitializeResponse(
        protocol_version=PROTOCOL_VERSION,
        agent_info=Implementation(
            name=PROXY_AGENT_NAME, title=PROXY_AGENT_TITLE, version=PROXY_AGENT_VERSION
        ),
    )

new_session(cwd, additional_directories=None, mcp_servers=None, **_kw) async

Answer session/new with the aggregated model list.

Synthesises CLIENT_SESSION_ID so the client can proceed to model selection before any backend exists. The real backend session id is captured on bind and translated on every forwarded frame.

Source code in src/terok_executor/acp/proxy.py
async def new_session(
    self,
    cwd: str,
    additional_directories: list[str] | None = None,
    mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio] | None = None,
    **_kw: Any,
) -> NewSessionResponse:
    """Answer ``session/new`` with the aggregated model list.

    Synthesises [`CLIENT_SESSION_ID`][terok_executor.acp.proxy.CLIENT_SESSION_ID]
    so the client can proceed to model selection before any backend
    exists.  The real backend session id is captured on bind and
    translated on every forwarded frame.
    """
    del cwd
    if self._session_announced:
        raise RequestError.invalid_request(
            {"details": "proxy supports one session per connection (v1)"}
        )
    self._session_announced = True
    self._client_mcp_servers = mcp_servers
    self._client_additional_directories = additional_directories
    self._aggregated_models = await self._roster.list_available_agents()
    self._default_namespaced = self._aggregated_models[0] if self._aggregated_models else None
    return build_aggregated_session_new(CLIENT_SESSION_ID, self._aggregated_models)

set_session_model(model_id, session_id, **_kw) async

Bind on first call; same-agent re-pick forwards through.

Source code in src/terok_executor/acp/proxy.py
async def set_session_model(
    self, model_id: str, session_id: str, **_kw: Any
) -> SetSessionModelResponse | None:
    """Bind on first call; same-agent re-pick forwards through."""
    self._require_client_session(session_id)
    await self._select_model(model_id)
    return SetSessionModelResponse()

set_config_option(config_id, session_id, value, **_kw) async

Bind on category=model; otherwise forward to the bound backend.

Older ACP clients (Zed v1.0.x at the time of writing) pick the model through session/set_config_option(category="model"); modern clients use the dedicated session/set_model. Accept both. Non-model categories pass through to the bound backend; a non-model option pre-bind is rejected.

Source code in src/terok_executor/acp/proxy.py
async def set_config_option(
    self, config_id: str, session_id: str, value: str | bool, **_kw: Any
) -> SetSessionConfigOptionResponse | None:
    """Bind on ``category=model``; otherwise forward to the bound backend.

    Older ACP clients (Zed v1.0.x at the time of writing) pick the
    model through ``session/set_config_option(category="model")``;
    modern clients use the dedicated ``session/set_model``.  Accept
    both.  Non-model categories pass through to the bound backend;
    a non-model option pre-bind is rejected.
    """
    self._require_client_session(session_id)
    if config_id == MODEL_OPTION_CATEGORY and isinstance(value, str):
        await self._select_model(value)
        opt = build_model_option(self._aggregated_models, current=value)
        return SetSessionConfigOptionResponse(config_options=[opt])
    backend, backend_session = self._require_bound()
    resp = await backend.set_config_option(
        config_id=config_id, session_id=backend_session, value=value
    )
    if resp is not None and self._bound_agent is not None:
        namespace_model_options_in_place(resp.config_options, self._bound_agent)
    return resp

prompt(prompt, session_id, message_id=None, **_kw) async

Lazy-bind to the default model if needed, then forward.

Source code in src/terok_executor/acp/proxy.py
async def prompt(
    self,
    prompt: list,
    session_id: str,
    message_id: str | None = None,
    **_kw: Any,
) -> PromptResponse:
    """Lazy-bind to the default model if needed, then forward."""
    self._require_client_session(session_id)
    await self._ensure_bound_for_default()
    backend, backend_session = self._require_bound()
    return await backend.prompt(
        prompt=prompt, session_id=backend_session, message_id=message_id
    )

cancel(session_id, **_kw) async

Fire-and-forget cancel to the backend (no-op if not bound).

Source code in src/terok_executor/acp/proxy.py
async def cancel(self, session_id: str, **_kw: Any) -> None:
    """Fire-and-forget cancel to the backend (no-op if not bound)."""
    del session_id
    if self._backend is not None and self._backend_session_id is not None:
        await self._backend.cancel(session_id=self._backend_session_id)

authenticate(method_id, **_kw) async

Forward to bound backend; pre-bind authenticate is rejected.

Source code in src/terok_executor/acp/proxy.py
async def authenticate(self, method_id: str, **_kw: Any) -> AuthenticateResponse | None:
    """Forward to bound backend; pre-bind authenticate is rejected."""
    backend, _ = self._require_bound()
    return await backend.authenticate(method_id=method_id)

set_session_mode(mode_id, session_id, **_kw) async

Forward to bound backend.

Source code in src/terok_executor/acp/proxy.py
async def set_session_mode(
    self, mode_id: str, session_id: str, **_kw: Any
) -> SetSessionModeResponse | None:
    """Forward to bound backend."""
    self._require_client_session(session_id)
    backend, backend_session = self._require_bound()
    return await backend.set_session_mode(mode_id=mode_id, session_id=backend_session)

load_session(cwd, session_id, additional_directories=None, mcp_servers=None, **_kw) async

Forward to bound backend (v1 advertises no session-load capability).

Source code in src/terok_executor/acp/proxy.py
async def load_session(
    self,
    cwd: str,
    session_id: str,
    additional_directories: list[str] | None = None,
    mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio] | None = None,
    **_kw: Any,
) -> LoadSessionResponse | None:
    """Forward to bound backend (v1 advertises no session-load capability)."""
    self._require_client_session(session_id)
    backend, backend_session = self._require_bound()
    return await backend.load_session(
        cwd=cwd,
        session_id=backend_session,
        additional_directories=additional_directories,
        mcp_servers=mcp_servers,
    )

list_sessions(additional_directories=None, cursor=None, cwd=None, **_kw) async

Forward to bound backend.

Source code in src/terok_executor/acp/proxy.py
async def list_sessions(
    self,
    additional_directories: list[str] | None = None,
    cursor: str | None = None,
    cwd: str | None = None,
    **_kw: Any,
) -> ListSessionsResponse:
    """Forward to bound backend."""
    backend, _ = self._require_bound()
    return await backend.list_sessions(
        additional_directories=additional_directories, cursor=cursor, cwd=cwd
    )

fork_session(cwd, session_id, additional_directories=None, mcp_servers=None, **_kw) async

Forward to bound backend.

Source code in src/terok_executor/acp/proxy.py
async def fork_session(
    self,
    cwd: str,
    session_id: str,
    additional_directories: list[str] | None = None,
    mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio] | None = None,
    **_kw: Any,
) -> ForkSessionResponse:
    """Forward to bound backend."""
    self._require_client_session(session_id)
    backend, backend_session = self._require_bound()
    return await backend.fork_session(
        cwd=cwd,
        session_id=backend_session,
        additional_directories=additional_directories,
        mcp_servers=mcp_servers,
    )

resume_session(cwd, session_id, additional_directories=None, mcp_servers=None, **_kw) async

Forward to bound backend.

Source code in src/terok_executor/acp/proxy.py
async def resume_session(
    self,
    cwd: str,
    session_id: str,
    additional_directories: list[str] | None = None,
    mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio] | None = None,
    **_kw: Any,
) -> ResumeSessionResponse:
    """Forward to bound backend."""
    self._require_client_session(session_id)
    backend, backend_session = self._require_bound()
    return await backend.resume_session(
        cwd=cwd,
        session_id=backend_session,
        additional_directories=additional_directories,
        mcp_servers=mcp_servers,
    )

close_session(session_id, **_kw) async

Forward to bound backend and tear down the wrapper.

v1 keeps one backend per connection; after a successful close the wrapper has nothing more to do, so reap it eagerly instead of leaving it around to be killed by _teardown_backend on disconnect. Returns None when no backend was ever bound.

Source code in src/terok_executor/acp/proxy.py
async def close_session(self, session_id: str, **_kw: Any) -> Any:
    """Forward to bound backend and tear down the wrapper.

    v1 keeps one backend per connection; after a successful close the
    wrapper has nothing more to do, so reap it eagerly instead of
    leaving it around to be killed by ``_teardown_backend`` on
    disconnect.  Returns ``None`` when no backend was ever bound.
    """
    self._require_client_session(session_id)
    if self._backend is None or self._backend_session_id is None:
        return None
    try:
        return await self._backend.close_session(session_id=self._backend_session_id)
    finally:
        await self._teardown_backend()

ext_method(method, params) async

Forward extension methods to the bound backend.

Source code in src/terok_executor/acp/proxy.py
async def ext_method(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
    """Forward extension methods to the bound backend."""
    backend, _ = self._require_bound()
    return await backend.ext_method(method, params)

ext_notification(method, params) async

Forward extension notifications to the bound backend (silent if not bound).

Source code in src/terok_executor/acp/proxy.py
async def ext_notification(self, method: str, params: dict[str, Any]) -> None:
    """Forward extension notifications to the bound backend (silent if not bound)."""
    if self._backend is not None:
        await self._backend.ext_notification(method, params)

session_update(session_id, update, **_kw) async

Rewrite session id and any model ids, then forward to the client.

Source code in src/terok_executor/acp/proxy.py
async def session_update(
    self,
    session_id: str,
    update: SessionUpdatePayload,
    **_kw: Any,
) -> None:
    """Rewrite session id and any model ids, then forward to the client."""
    del session_id
    if isinstance(update, ConfigOptionUpdate) and self._bound_agent is not None:
        namespace_model_options_in_place(update.config_options, self._bound_agent)
    await self._require_client().session_update(session_id=CLIENT_SESSION_ID, update=update)

request_permission(options, session_id, tool_call, **_kw) async

Forward permission request to the connected client.

Source code in src/terok_executor/acp/proxy.py
async def request_permission(
    self,
    options: list[PermissionOption],
    session_id: str,
    tool_call: ToolCallUpdate,
    **_kw: Any,
) -> RequestPermissionResponse:
    """Forward permission request to the connected client."""
    del session_id
    return await self._require_client().request_permission(
        options=options, session_id=CLIENT_SESSION_ID, tool_call=tool_call
    )

read_text_file(path, session_id, limit=None, line=None, **_kw) async

Forward fs read to the connected client.

Source code in src/terok_executor/acp/proxy.py
async def read_text_file(
    self,
    path: str,
    session_id: str,
    limit: int | None = None,
    line: int | None = None,
    **_kw: Any,
) -> ReadTextFileResponse:
    """Forward fs read to the connected client."""
    del session_id
    return await self._require_client().read_text_file(
        path=path, session_id=CLIENT_SESSION_ID, limit=limit, line=line
    )

write_text_file(content, path, session_id, **_kw) async

Forward fs write to the connected client.

Source code in src/terok_executor/acp/proxy.py
async def write_text_file(
    self, content: str, path: str, session_id: str, **_kw: Any
) -> WriteTextFileResponse | None:
    """Forward fs write to the connected client."""
    del session_id
    return await self._require_client().write_text_file(
        content=content, path=path, session_id=CLIENT_SESSION_ID
    )

create_terminal(command, session_id, args=None, cwd=None, env=None, output_byte_limit=None, **_kw) async

Forward terminal create to the connected client.

Source code in src/terok_executor/acp/proxy.py
async def create_terminal(
    self,
    command: str,
    session_id: str,
    args: list[str] | None = None,
    cwd: str | None = None,
    env: list[EnvVariable] | None = None,
    output_byte_limit: int | None = None,
    **_kw: Any,
) -> CreateTerminalResponse:
    """Forward terminal create to the connected client."""
    del session_id
    return await self._require_client().create_terminal(
        command=command,
        session_id=CLIENT_SESSION_ID,
        args=args,
        cwd=cwd,
        env=env,
        output_byte_limit=output_byte_limit,
    )

terminal_output(session_id, terminal_id, **_kw) async

Forward terminal output read to the connected client.

Source code in src/terok_executor/acp/proxy.py
async def terminal_output(
    self, session_id: str, terminal_id: str, **_kw: Any
) -> TerminalOutputResponse:
    """Forward terminal output read to the connected client."""
    del session_id
    return await self._require_client().terminal_output(
        session_id=CLIENT_SESSION_ID, terminal_id=terminal_id
    )

release_terminal(session_id, terminal_id, **_kw) async

Forward terminal release to the connected client.

Source code in src/terok_executor/acp/proxy.py
async def release_terminal(
    self, session_id: str, terminal_id: str, **_kw: Any
) -> ReleaseTerminalResponse | None:
    """Forward terminal release to the connected client."""
    del session_id
    return await self._require_client().release_terminal(
        session_id=CLIENT_SESSION_ID, terminal_id=terminal_id
    )

wait_for_terminal_exit(session_id, terminal_id, **_kw) async

Forward wait-for-exit to the connected client.

Source code in src/terok_executor/acp/proxy.py
async def wait_for_terminal_exit(
    self, session_id: str, terminal_id: str, **_kw: Any
) -> WaitForTerminalExitResponse:
    """Forward wait-for-exit to the connected client."""
    del session_id
    return await self._require_client().wait_for_terminal_exit(
        session_id=CLIENT_SESSION_ID, terminal_id=terminal_id
    )

kill_terminal(session_id, terminal_id, **_kw) async

Forward kill to the connected client.

Source code in src/terok_executor/acp/proxy.py
async def kill_terminal(
    self, session_id: str, terminal_id: str, **_kw: Any
) -> KillTerminalResponse | None:
    """Forward kill to the connected client."""
    del session_id
    return await self._require_client().kill_terminal(
        session_id=CLIENT_SESSION_ID, terminal_id=terminal_id
    )

AgentBindError

Bases: RuntimeError

Surface error raised when the proxy fails to bind a backend agent.

Always converted to a JSON-RPC error response on the wire — never bubbles to the caller of run.