Skip to content

terok_sandbox

terok_sandbox

terok-sandbox: hardened Podman container runner with gate and shield integration.

Delegates to domain subsystems:

  • gate — authenticated git serving: HTTP server, token CRUD, upstream mirror management, systemd/daemon lifecycle.
  • vault — secret injection: per-container token broker with phantom credentials, SSH signing proxy, SQLite credential store.
  • shield — egress firewall adapter (delegates to terok-shield).
  • runtime — Podman CLI wrapper (state queries, GPU, log streaming).
  • sandbox — facade composing the above behind SandboxConfig.
  • commands — CLI command registry and handler implementations.

The top-level surface here is the published contract that terok_executor and terok consume. Internal helpers (raw config schema fragments, runtime concrete types like Container/LogStream/PortReservation, SSH keypair helpers, selinux probe internals, port-registry primitives, shield error classes) stay in their submodules; reach into terok_sandbox.<sub> when you need them.

CONTAINER_RUNTIME_DIR = '/run/terok' module-attribute

Container-side mount point for the host runtime directory (socket mode).

SERVICES_TCP_OPTOUT_YAML = 'services: {mode: tcp}' module-attribute

User-facing opt-out snippet shown in SELinux hints — keep in one place so setup, sickbay, tests and docs stay in sync.

ServicesMode = Literal['tcp', 'socket'] module-attribute

Type alias for the services.mode Literal; re-exported from RawServicesSection.model_fields['mode'] so downstream modules (sandbox's SandboxConfig, terok's make_sandbox_config) can annotate without re-declaring the shape.

DEFAULT_GUEST_SSHD_PORT = 22 module-attribute

DEFAULT_SSH_HOST = '127.0.0.1' module-attribute

READY_MARKER = '>> init complete' module-attribute

Default log line emitted by init-ssh-and-repo.sh when the container is ready.

CODEX_SHARED_OAUTH_MARKER = 'terok-proxy-codex-oauth-marker:vault-handles-real-auth' module-attribute

PHANTOM_CREDENTIALS_MARKER = 'terok-proxy-phantom-token:vault-handles-real-auth' module-attribute

__version__ = _meta_version('terok-sandbox') module-attribute

__all__ = ['CONTAINER_RUNTIME_DIR', 'ConfigScope', 'RawRunSection', 'RawSSHSection', 'SERVICES_TCP_OPTOUT_YAML', 'Sandbox', 'SandboxConfig', 'SandboxConfigView', 'ServicesMode', 'gate_use_personal_ssh_default', 'SetupVerdict', 'installed_versions', 'needs_setup', 'read_stamp', 'sandbox_uninstall', 'stamp_path', 'GateServer', 'PerContainerResources', 'allocate_per_container_resources', 'mint_gate_token', 'ContainerRuntime', 'DEFAULT_GUEST_SSHD_PORT', 'DEFAULT_SSH_HOST', 'ExecResult', 'GpuConfigError', 'Image', 'KrunRuntime', 'LifecycleHooks', 'NullRuntime', 'PodmanRuntime', 'READY_MARKER', 'RunSpec', 'Sharing', 'TcpSSHTransport', 'VolumeSpec', 'check_gpu_available', 'podman_port_resolver', 'EnvironmentCheck', 'ShieldHooks', 'ShieldManager', 'check_environment', 'resolve_container_state_dir', 'GateAuthNotConfigured', 'GateStalenessInfo', 'GitGate', 'is_ssh_url', 'CODEX_SHARED_OAUTH_MARKER', 'CredentialDB', 'NoPassphraseError', 'PHANTOM_CREDENTIALS_MARKER', 'RecoveryStatus', 'WrongPassphraseError', 'systemd_creds_has_tpm2', 'handle_vault_seal', 'handle_vault_to_keyring', 'SSHInitResult', 'SSHManager', 'ensure_infra_keypair', 'public_line_of', 'claim_port', 'release_port', 'CheckVerdict', 'DoctorCheck', 'sandbox_doctor_checks', 'SelinuxCheckResult', 'SelinuxStatus', 'check_selinux_status', 'selinux_install_command', 'selinux_install_script', 'AppArmorCheckResult', 'AppArmorStatus', 'check_apparmor_status', 'apparmor_install_command', 'apparmor_install_script', 'CommandTree', 'bold', 'red', 'stage_line', 'yaml_update_section', 'yellow', '__version__'] module-attribute

AppArmorCheckResult(status) dataclass

Structured outcome of check_status.

status instance-attribute

AppArmorStatus

Bases: Enum

Outcome of check_status.

NOT_APPLICABLE = 'not_applicable' class-attribute instance-attribute

No AppArmor, no dnsmasq, or no dnsmasq profile — nothing to do.

PROFILE_MISSING = 'profile_missing' class-attribute instance-attribute

dnsmasq is AppArmor-profiled but the terok addendum isn't installed.

OK = 'ok' class-attribute instance-attribute

The terok addendum is installed.

SelinuxCheckResult(status, missing_policy_tools=tuple()) dataclass

Structured outcome of check_status.

Callers decide how to present the result; this struct only carries the decision tree's output so that terok setup (printed multi- line warnings) and terok sickbay (tuple-based check result) can share one source of truth for the branching.

status instance-attribute

Which branch of the decision tree fired.

missing_policy_tools = field(default_factory=tuple) class-attribute instance-attribute

Names of missing compile tools (only populated for POLICY_MISSING).

SelinuxStatus

Bases: Enum

Outcome of check_status — the single decision tree behind both terok setup's prereq check and terok sickbay's health check.

NOT_APPLICABLE_TCP_MODE = 'not_applicable_tcp_mode' class-attribute instance-attribute

Transport is tcp; the terok_socket_t policy is irrelevant.

NOT_APPLICABLE_PERMISSIVE = 'not_applicable_permissive' class-attribute instance-attribute

Socket transport, but SELinux is disabled or permissive.

POLICY_MISSING = 'policy_missing' class-attribute instance-attribute

Enforcing host, socket transport, but terok_socket module is not loaded.

POLICY_OUTDATED = 'policy_outdated' class-attribute instance-attribute

Enforcing host, socket transport, terok_socket loaded — but an older revision missing the container_runtime_t rule the per-container supervisor needs. Re-running the installer rebuilds + upgrades it.

LIBSELINUX_MISSING = 'libselinux_missing' class-attribute instance-attribute

Policy is loaded but libselinux.so.1 cannot be dlopen'd — silent- failure case where sockets would bind as unconfined_t regardless.

OK = 'ok' class-attribute instance-attribute

Enforcing, policy installed, libselinux loadable — all good.

SandboxConfig(state_dir=_state_root(), runtime_dir=_runtime_root(), config_dir=_config_root(), vault_dir=_vault_root(), gate_port=_default_gate_port(), token_broker_port=_default_token_broker_port(), ssh_signer_port=_default_ssh_signer_port(), shield_profiles=('dev-standard',), shield_audit=_default_shield_audit(), shield_bypass=False, credentials_passphrase=_default_credentials_passphrase(), credentials_use_keyring=_default_credentials_use_keyring(), credentials_passphrase_command=_default_credentials_passphrase_command(), services_mode=_default_services_mode(), experimental=_default_experimental()) dataclass

Immutable configuration for the sandbox layer.

All paths default to the XDG/FHS-resolved values from paths. Override individual fields when constructing from terok's global config or when using terok-sandbox standalone.

state_dir = field(default_factory=_state_root) class-attribute instance-attribute

Writable state root (tokens, gate repos, task data).

runtime_dir = field(default_factory=_runtime_root) class-attribute instance-attribute

Transient runtime directory (PID files, sockets).

config_dir = field(default_factory=_config_root) class-attribute instance-attribute

Sandbox-scoped configuration root.

Note: shield profiles are resolved by shield_profiles_dir via namespace_config_root, not from this directory.

vault_dir = field(default_factory=_vault_root) class-attribute instance-attribute

Shared vault directory (DB, routes, env mounts).

gate_port = field(default_factory=_default_gate_port) class-attribute instance-attribute

HTTP port for the gate server (None = auto-allocate via registry).

Default-factory reads gate_server.port from config.yml; missing or unset keys fall through to None so the port registry can pick one. Direct SandboxConfig(gate_port=…) always wins.

token_broker_port = field(default_factory=_default_token_broker_port) class-attribute instance-attribute

TCP port for the vault's token broker (None = auto-allocate via registry).

Default-factory reads vault.port from config.yml.

ssh_signer_port = field(default_factory=_default_ssh_signer_port) class-attribute instance-attribute

TCP port for the vault's SSH signer (None = auto-allocate via registry).

Default-factory reads vault.ssh_signer_port from config.yml.

shield_profiles = ('dev-standard',) class-attribute instance-attribute

Shield egress firewall profile names.

shield_audit = field(default_factory=_default_shield_audit) class-attribute instance-attribute

Whether shield audit logging is enabled.

Default-factory reads shield.audit from the layered config.yml via the RawShieldSection schema; missing/typo'd keys fall back to the schema's True default. Direct SandboxConfig(shield_audit=…) always wins.

shield_bypass = False class-attribute instance-attribute

DANGEROUS: when True, the egress firewall is completely disabled.

Hardcoded False here — sandbox refuses to read this field from config.yml because the layered chain includes a user-writable scope (~/.config/terok/config.yml) and an $ENV-controllable override (TEROK_CONFIG_FILE), so anything that drops a file in $HOME could silently disable the egress firewall. Orchestrators that want bypass must pass it explicitly to SandboxConfig(shield_bypass=True) after resolving from their own trusted source.

credentials_passphrase = field(default_factory=_default_credentials_passphrase) class-attribute instance-attribute

Headless-no-keyring fallback for the SQLCipher passphrase.

Read from credentials.passphrase in config.yml at construct time. None (the default) means "no config-file fallback set" — callers fall through to the next tier in the resolution chain.

credentials_use_keyring = field(default_factory=_default_credentials_use_keyring) class-attribute instance-attribute

Opt-in switch for the OS keyring tier in the passphrase resolution chain.

Off by default. Linux Secret Service has per-collection (not per-item) ACLs, so authorising terok against the default collection grants read access to every other secret stored there. Operators opt in via terok setup after weighing that trade-off.

credentials_passphrase_command = field(default_factory=_default_credentials_passphrase_command) class-attribute instance-attribute

Operator-supplied shell command that prints the SQLCipher passphrase on stdout.

Resolver tier slotted between keyring and config. Canonical headless option for hosts without systemd ≥ 257 — same shape as git config credential.helper or BORG_PASSCOMMAND. Read from credentials.passphrase_command in config.yml at construct time; None (the default) means "no helper configured" and the resolver skips this tier.

services_mode = field(default_factory=_default_services_mode) class-attribute instance-attribute

Transport for host↔container IPC, resolved once at construction.

Validated through the same RawServicesSection schema terok's RawGlobalConfig composes, so standalone and embedded paths agree on the value. Lives as an instance attribute rather than a free-function call per site so downstream code can't bypass config resolution — no manager without a SandboxConfig, every SandboxConfig carries a resolved mode.

experimental = field(default_factory=_default_experimental) class-attribute instance-attribute

Whether the ecosystem-wide experimental: opt-in is on.

Cross-package switch: gates terok's krun runtime at task launch and sandbox's krun-only prereq probes (currently just ip) at terok-sandbox setup. Read from the top-level experimental: key in the layered config.yml at construct time; missing / typo'd values fall back to False. Direct SandboxConfig(experimental=…) always wins.

gate_base_path property

Return the gate server's repo base path.

shield_profiles_dir property

Return the directory for terok-managed shield profiles.

db_path property

Return the path to the vault sqlite3 database.

vault_socket_path property

Return the Unix socket path for the vault.

vault_pid_path property

Return the PID file path for the managed vault daemon.

vault_passphrase_file property

Return the session-unlock tmpfs path for the SQLCipher passphrase.

Lives under runtime_dir ($XDG_RUNTIME_DIR/...), so it is RAM-backed and cleared on reboot. Written by terok-sandbox vault unlock; read at daemon startup as the highest-priority tier of the passphrase resolution chain.

vault_systemd_creds_file property

Return the sealed-credential path for the systemd-creds tier.

Lives under vault_dir (persistent state, 0o600) — the credential is machine-bound (TPM2 or host key), so persistence across reboots is the whole point. Written by terok-sandbox vault seal; read on every chain walk via terok_sandbox.vault.store.systemd_creds.

vault_recovery_marker_file property

Return the sidecar marker path for "operator saved the recovery passphrase".

Lives next to the sealed-credential file (persistent state, 0o600). Contents are the SHA-256 fingerprint of the acknowledged passphrase, so a re-key invalidates the marker and re-prompts on the next surface that reads it (terok_sandbox.vault.store.recovery).

routes_path property

Return the path to the vault route configuration JSON.

credential_audit_log_path property

Return the path to the credential-use audit JSONL.

One file under the vault state dir, shared across every subject the broker has ever served — sandbox doesn't model "subject" semantically, so per-subject layout is the consumer's concern (terok's review CLI filters by scope / subject).

ssh_signer_socket_path property

Return the Unix socket path for the vault's SSH signer.

The vault binds this socket and serves the SSH-agent protocol on it (clients use it as $SSH_AUTH_SOCK). Filename uses the protocol name so its purpose is recognisable to anyone tracing socket activity.

clone_cache_base_path property

Return the base directory for per-scope non-bare clone caches.

ssh_keys_dir property

Return the base directory for per-scope SSH keys.

with_resolved_ports()

Return a copy with TCP ports allocated via the shared port registry.

Idempotent — returns self (no copy) when there is nothing to allocate: socket mode never needs TCP listeners, and already-fully-resolved cfgs short-circuit.

Side-effectful: allocation hits the shared port registry, bind-tests each candidate, and persists the claim to state_dir/port-claims.json. Keep this call OUT of construction paths that don't actually launch services (sickbay checks, config inspection, tests) — that's why it's opt-in rather than baked into __post_init__. The consumers that do need real ports (ShieldManager, Sandbox) wrap their stored cfg in self._cfg = self._cfg.with_resolved_ports() at construction time so downstream code never sees None for the port it needs.

Source code in src/terok_sandbox/config.py
def with_resolved_ports(self) -> SandboxConfig:
    """Return a copy with TCP ports allocated via the shared port registry.

    Idempotent — returns ``self`` (no copy) when there is nothing
    to allocate: socket mode never needs TCP listeners, and
    already-fully-resolved cfgs short-circuit.

    **Side-effectful**: allocation hits the shared port registry,
    bind-tests each candidate, and persists the claim to
    ``state_dir/port-claims.json``.  Keep this call OUT of
    construction paths that don't actually launch services
    (sickbay checks, config inspection, tests) — that's why it's
    opt-in rather than baked into ``__post_init__``.  The
    consumers that *do* need real ports (``ShieldManager``,
    ``Sandbox``) wrap their stored cfg in
    ``self._cfg = self._cfg.with_resolved_ports()`` at construction
    time so downstream code never sees ``None`` for the port it
    needs.
    """
    if self.services_mode == "socket":
        return self
    if (
        self.gate_port is not None
        and self.token_broker_port is not None
        and self.ssh_signer_port is not None
    ):
        return self
    from dataclasses import replace

    from .port_registry import resolve_service_ports

    ports = resolve_service_ports(
        self.gate_port,
        self.token_broker_port,
        self.ssh_signer_port,
        gate_explicit=self.gate_port is not None,
        proxy_explicit=self.token_broker_port is not None,
        ssh_explicit=self.ssh_signer_port is not None,
        state_dir=self.state_dir,
    )
    return replace(
        self,
        gate_port=self.gate_port if self.gate_port is not None else ports.gate,
        token_broker_port=(
            self.token_broker_port if self.token_broker_port is not None else ports.proxy
        ),
        ssh_signer_port=(
            self.ssh_signer_port if self.ssh_signer_port is not None else ports.ssh_agent
        ),
    )

open_credential_db(db_path=None, *, prompt_on_tty=False)

Open the credentials DB with this config's resolution-chain knobs.

Single seam over open_credential_db so call sites never plumb tier-selection kwargs by hand — adding a new tier is one entry in the private _chain_kwargs helper, no cross-package fan-out.

db_path defaults to self.db_path; callers that already hold a path (a sidecar-pinned DB path, or a test override) pass it explicitly so the open targets that DB while still using this config's tier policy. CLI consumers pass prompt_on_tty=True to unlock the interactive fallback; the per-container supervisor leaves it off.

Source code in src/terok_sandbox/config.py
def open_credential_db(
    self, db_path: Path | None = None, *, prompt_on_tty: bool = False
) -> Any:
    """Open the credentials DB with this config's resolution-chain knobs.

    Single seam over [`open_credential_db`][terok_sandbox.vault.store.db.open_credential_db]
    so call sites never plumb tier-selection kwargs by hand — adding
    a new tier is one entry in the private ``_chain_kwargs`` helper,
    no cross-package fan-out.

    *db_path* defaults to ``self.db_path``; callers that already
    hold a path (a sidecar-pinned DB path, or a test override) pass
    it explicitly so the open targets that DB while still using
    this config's tier policy.  CLI consumers pass
    ``prompt_on_tty=True`` to unlock the interactive fallback;
    the per-container supervisor leaves it off.
    """
    from .vault.store.db import open_credential_db  # noqa: PLC0415

    return open_credential_db(
        db_path if db_path is not None else self.db_path,
        **self._chain_kwargs(prompt_on_tty=prompt_on_tty),
    )

open_credential_db_with_source(db_path=None, *, prompt_on_tty=False)

Same as open_credential_db but also returns which tier of the chain hit.

db_path override semantics match open_credential_db. The returned source lets callers (status reports, the supervisor startup log) name which tier unlocked the vault instead of second-guessing the resolver.

Source code in src/terok_sandbox/config.py
def open_credential_db_with_source(
    self, db_path: Path | None = None, *, prompt_on_tty: bool = False
) -> tuple[CredentialDB, PassphraseSource]:
    """Same as [`open_credential_db`][terok_sandbox.SandboxConfig.open_credential_db]
    but also returns which tier of the chain hit.

    *db_path* override semantics match
    [`open_credential_db`][terok_sandbox.SandboxConfig.open_credential_db].
    The returned source lets callers (status reports, the
    supervisor startup log) name which tier unlocked the vault
    instead of second-guessing the resolver.
    """
    from .vault.store.db import open_credential_db_with_source  # noqa: PLC0415

    return open_credential_db_with_source(
        db_path if db_path is not None else self.db_path,
        **self._chain_kwargs(prompt_on_tty=prompt_on_tty),
    )

open_sqlcipher_connection(db_path=None, **connect_kwargs)

Open a raw sqlcipher3 connection via the chain (vault daemon path).

Source code in src/terok_sandbox/config.py
def open_sqlcipher_connection(self, db_path: Path | None = None, **connect_kwargs: Any) -> Any:
    """Open a raw sqlcipher3 connection via the chain (vault daemon path)."""
    from .vault.store.encryption import open_sqlcipher_via_chain  # noqa: PLC0415

    return open_sqlcipher_via_chain(
        db_path or self.db_path,
        **self._chain_kwargs(prompt_on_tty=False),
        **connect_kwargs,
    )

resolve_passphrase(*, prompt_on_tty=False)

Walk the resolution chain with this config's knobs; return the passphrase or None.

Diagnostic seam — never opens the DB. Used by host-side doctor / sickbay and by vault seal to reuse whatever tier currently has the key. Same chain order as open_credential_db because both delegate here.

Source code in src/terok_sandbox/config.py
def resolve_passphrase(self, *, prompt_on_tty: bool = False) -> str | None:
    """Walk the resolution chain with this config's knobs; return the passphrase or ``None``.

    Diagnostic seam — never opens the DB.  Used by host-side
    doctor / sickbay and by ``vault seal`` to reuse whatever tier
    currently has the key.  Same chain order as
    [`open_credential_db`][terok_sandbox.SandboxConfig.open_credential_db]
    because both delegate here.
    """
    from .vault.store.encryption import resolve_passphrase  # noqa: PLC0415

    return resolve_passphrase(**self._chain_kwargs(prompt_on_tty=prompt_on_tty))

resolve_passphrase_with_source(*, prompt_on_tty=False)

Walk the resolution chain with this config's knobs; return (passphrase, source).

Diagnostic counterpart to resolve_passphrase — feeds the daemon startup log so the operator sees which tier unlocked the vault on this boot.

Source code in src/terok_sandbox/config.py
def resolve_passphrase_with_source(
    self, *, prompt_on_tty: bool = False
) -> tuple[str | None, PassphraseSource | None]:
    """Walk the resolution chain with this config's knobs; return ``(passphrase, source)``.

    Diagnostic counterpart to
    [`resolve_passphrase`][terok_sandbox.SandboxConfig.resolve_passphrase]
    — feeds the daemon startup log so the operator sees *which*
    tier unlocked the vault on this boot.
    """
    from .vault.store.encryption import resolve_passphrase_with_source  # noqa: PLC0415

    return resolve_passphrase_with_source(**self._chain_kwargs(prompt_on_tty=prompt_on_tty))

ssh_signer_local_socket_path(scope)

Return the per-scope vault SSH-agent socket path for scope.

The vault binds one 0600 Unix socket per scope with at least one assigned key, under the same runtime_dir as the main signer. Host-side gate-sync points SSH_AUTH_SOCK at this path.

Rejects unsafe scope names with InvalidScopeName as a belt-and-braces guard — writers in the DB layer enforce the same policy, but the socket path is public API and may be called without a preceding DB write.

Source code in src/terok_sandbox/config.py
def ssh_signer_local_socket_path(self, scope: str) -> Path:
    """Return the per-scope vault SSH-agent socket path for *scope*.

    The vault binds one 0600 Unix socket per scope with at least one
    assigned key, under the same ``runtime_dir`` as the main signer.
    Host-side ``gate-sync`` points ``SSH_AUTH_SOCK`` at this path.

    Rejects unsafe scope names with [`InvalidScopeName`][terok_sandbox.vault.store.db.InvalidScopeName]
    as a belt-and-braces guard — writers in the DB layer enforce the
    same policy, but the socket path is public API and may be called
    without a preceding DB write.
    """
    from .vault.store.db import _require_safe_scope

    _require_safe_scope(scope)
    return self.runtime_dir / f"ssh-agent-local-{scope}.sock"

RawRunSection

Bases: BaseModel

The run: section — "how the container runs".

Covers OCI-runtime selection, container resource limits, capability toggles, environment, and lifecycle hooks. Sandbox owns this because every field translates to a podman/runtime flag or annotation sandbox emits at launch time.

Inheritable in both directions:

  • At the global level, defaults apply to every project (e.g. set runtime: krun once to opt the whole installation into microVM isolation).
  • At the project level, fields override the global default one-by-one via the orchestrator's merge logic.

model_config = ConfigDict(extra='forbid') class-attribute instance-attribute

shutdown_timeout = Field(default=10, description='Seconds to wait before SIGKILL on container stop') class-attribute instance-attribute

gpus = Field(default=None, description='GPU passthrough: ``true``, ``"all"``, or omit to disable') class-attribute instance-attribute

memory = Field(default=None, description='Podman ``--memory`` value (e.g. ``"4g"``, ``"512m"``, ``"4gib"``, plain ``"1024"`` for bytes); ``None`` = unlimited. Format mirrors what podman accepts — see ``man podman-run(1)`` --memory.') class-attribute instance-attribute

cpus = Field(default=None, description='Podman ``--cpus`` value (e.g. ``"2.0"``, ``"0.5"``); ``None`` = unlimited. Non-negative decimal.') class-attribute instance-attribute

nested_containers = Field(default=False, description='Declares that the project runs podman/docker inside its container. When true, the outer container is launched with ``--security-opt label=nested`` and ``--device /dev/fuse`` so rootless nested containers work under SELinux without disabling labels wholesale.') class-attribute instance-attribute

runtime = Field(default=None, description='OCI runtime: ``crun`` (default) for conventional containers, or ``krun`` for KVM-microVM isolation (experimental). ``None`` resolves to ``crun`` — the OCI runtime podman picks by default on every supported distro. ``krun`` requires the global ``experimental: true`` flag at task launch.') class-attribute instance-attribute

timezone = Field(default=None, description="IANA timezone for the task container (e.g. ``Europe/Prague``, ``UTC``). Propagated as ``TZ`` — resolved against the image's ``tzdata``. Unset (default) means follow the host's timezone.") class-attribute instance-attribute

hooks = Field(default_factory=RawHooksSection) class-attribute instance-attribute

RawSSHSection

Bases: BaseModel

The ssh: section — auth strategy for the host-side gate.

Default is None (not False) so model_dump(exclude_none=True) can distinguish unset from explicitly false. Higher layers may layer this with a project.yml ssh: section of the same shape; the None sentinel keeps the project layer from stomping the global value when the user omits it. The effective False default happens at the consumer end.

model_config = ConfigDict(extra='forbid') class-attribute instance-attribute

use_personal = Field(default=None, description="Opt in to the user's ``~/.ssh`` keys for host-side ``gate-sync``. Default ``false`` — terok uses only its vault-managed key. Resolves through ConfigStack: ``terok-global config.yml`` → ``project.yml`` → CLI ``--use-personal-ssh`` (highest).") class-attribute instance-attribute

SandboxConfigView

Bases: BaseModel

The slice of config.yml sandbox owns and validates.

extra="allow" at the top level so unknown sections (executor's image:, terok's tui: / logs: / tasks: / git: / hooks:) pass through silently when sandbox is run standalone — the ecosystem's shared config file is expected to contain every package's keys, and rejecting them would make python -m terok_sandbox crash on any complete config.

Higher layers compose by inheriting from this class and adding their own typed fields:

  • terok_executor.config_schema.ExecutorConfigView inherits and adds the image: section.
  • terok's RawGlobalConfig inherits and adds the remaining five terok-owned sections, then flips to extra="forbid" — the topmost layer knows every section, so a typo at the top level is caught there.

model_config = ConfigDict(extra='allow') class-attribute instance-attribute

credentials = Field(default_factory=RawCredentialsSection) class-attribute instance-attribute

paths = Field(default_factory=RawPathsSection) class-attribute instance-attribute

shield = Field(default_factory=RawShieldSection) class-attribute instance-attribute

services = Field(default_factory=RawServicesSection) class-attribute instance-attribute

vault = Field(default_factory=RawVaultSection) class-attribute instance-attribute

gate_server = Field(default_factory=RawGateServerSection) class-attribute instance-attribute

network = Field(default_factory=RawNetworkSection) class-attribute instance-attribute

ssh = Field(default_factory=RawSSHSection) class-attribute instance-attribute

run = Field(default_factory=RawRunSection) class-attribute instance-attribute

experimental = Field(default=False, description="Cross-package opt-in for experimental features. Gates terok's krun runtime and sandbox's krun-only host-binary prereq probes (``ip``). Lives on the top level rather than in any one section because it's shared between sandbox, executor, and terok — the topmost layer (terok) inherits this declaration.") class-attribute instance-attribute

CheckVerdict(severity, detail, fixable=False) dataclass

Result of evaluating a single health check probe.

severity instance-attribute

"ok", "warn", or "error".

detail instance-attribute

Human-readable explanation.

fixable = False class-attribute instance-attribute

Whether fix_cmd should be offered to the operator.

DoctorCheck(category, label, probe_cmd, evaluate, fix_cmd=None, fix_description='', host_side=False) dataclass

A single health check to run inside (or against) a container.

The probe_cmd is executed via podman exec <cname> ... by the orchestrator. The evaluate callable interprets the result. If fix_cmd is set, the orchestrator may offer it when the check fails with fixable=True.

Dual execution modes:

  • Container mode (host_side=False): the orchestrator runs probe_cmd via podman exec and passes the result to evaluate. The standalone doctor command runs the same probe_cmd directly via subprocess on the host.
  • Host-side mode (host_side=True): the orchestrator bypasses probe_cmd entirely and performs the check via Python APIs (e.g. ShieldManager), then passes resolved state to evaluate. The standalone doctor command calls evaluate(0, "", "") and the function performs the check itself or reports a neutral result.

category instance-attribute

Grouping key: "bridge", "env", "mount", "network", "shield", "git".

label instance-attribute

Human-readable check name shown in output.

probe_cmd instance-attribute

Shell command to run inside the container via podman exec.

evaluate instance-attribute

(returncode, stdout, stderr) → CheckVerdict.

fix_cmd = None class-attribute instance-attribute

Optional remediation command for podman exec.

fix_description = '' class-attribute instance-attribute

Shown to the operator before applying the fix.

host_side = False class-attribute instance-attribute

If True, the check runs on the host (not via podman exec). The orchestrator calls evaluate(0, "", "") and the evaluate function performs the host-side check itself.

GateAuthNotConfigured(scope)

Bases: RuntimeError

Raised when a scope has no vault key and personal-SSH fallback is not opted in.

Callers (the gate-sync CLI dispatch) turn this into a two-door remediation hint:

  • generate a terok-managed key with terok ssh-init <project> and register it upstream, or
  • opt in to the user's own ~/.ssh keys with --use-personal-ssh (or ssh.use_personal: true in the project YAML).
Source code in src/terok_sandbox/gate/mirror.py
def __init__(self, scope: str) -> None:
    self.scope = scope
    super().__init__(
        f"No SSH key is assigned to scope {scope!r} and personal-SSH "
        "fallback is not enabled.  Either run `terok ssh-init` to "
        "generate one, or pass --use-personal-ssh."
    )

scope = scope instance-attribute

GateStalenessInfo(branch, gate_head, upstream_head, is_stale, commits_behind, commits_ahead, last_checked, error) dataclass

Result of comparing gate vs upstream.

branch instance-attribute

gate_head instance-attribute

upstream_head instance-attribute

is_stale instance-attribute

commits_behind instance-attribute

commits_ahead instance-attribute

last_checked instance-attribute

error instance-attribute

GitGate(*, scope, gate_path, upstream_url=None, default_branch=None, use_personal_ssh=False, validate_gate_fn=None, clone_cache_base=None)

Repository + Gateway for a host-side git gate mirror.

Manages the bare git mirror that containers clone from. Provides operations for initial creation, incremental sync from upstream, selective branch fetching, and staleness detection.

Constructor takes plain parameters — no terok-specific types.

Initialise with plain parameters.

Parameters

scope: Credential scope for this gate's owner. Used to locate the per-scope vault SSH-agent socket. gate_path: Path to the bare git mirror on the host. upstream_url: Git upstream URL to sync from. default_branch: Branch name used for staleness comparisons. use_personal_ssh: When True, skip the vault socket entirely and let git fall through to the user's ~/.ssh keys / loaded agent. Default False — "terok never touches your real keys" is the advertised property. Opt in per-invocation (--use-personal-ssh) or per-project (ssh.use_personal: true in project YAML). validate_gate_fn: Optional callback (scope) -> None that validates no other scope uses the same gate with a different upstream. Injected by the orchestration layer; omitted for standalone use. clone_cache_base: Base directory for non-bare clone caches. When set, sync refreshes a working-tree cache at clone_cache_base / scope after updating the bare mirror. The cache accelerates task startup by enabling a host-side file copy instead of a full git clone.

Source code in src/terok_sandbox/gate/mirror.py
def __init__(
    self,
    *,
    scope: str,
    gate_path: Path | str,
    upstream_url: str | None = None,
    default_branch: str | None = None,
    use_personal_ssh: bool = False,
    validate_gate_fn: Callable[[str], None] | None = None,
    clone_cache_base: Path | str | None = None,
) -> None:
    """Initialise with plain parameters.

    Parameters
    ----------
    scope:
        Credential scope for this gate's owner.  Used to locate the
        per-scope vault SSH-agent socket.
    gate_path:
        Path to the bare git mirror on the host.
    upstream_url:
        Git upstream URL to sync from.
    default_branch:
        Branch name used for staleness comparisons.
    use_personal_ssh:
        When ``True``, skip the vault socket entirely and let git fall
        through to the user's ``~/.ssh`` keys / loaded agent.  Default
        ``False`` — "terok never touches your real keys" is the advertised
        property.  Opt in per-invocation (``--use-personal-ssh``) or
        per-project (``ssh.use_personal: true`` in project YAML).
    validate_gate_fn:
        Optional callback ``(scope) -> None`` that validates no other
        scope uses the same gate with a different upstream.  Injected by
        the orchestration layer; omitted for standalone use.
    clone_cache_base:
        Base directory for non-bare clone caches.  When set,
        [`sync`][terok_sandbox.gate.mirror.GitGate.sync] refreshes a working-tree cache at
        ``clone_cache_base / scope`` after updating the bare mirror.
        The cache accelerates task startup by enabling a host-side
        file copy instead of a full ``git clone``.
    """
    self._scope = scope
    self._gate_path = Path(gate_path)
    self._upstream_url = upstream_url
    self._default_branch = default_branch
    self._use_personal_ssh = use_personal_ssh
    self._validate_gate_fn = validate_gate_fn
    self._clone_cache_base = Path(clone_cache_base) if clone_cache_base else None
    self._signer: _EphemeralSigner | None = None

cache_path property

Clone cache directory for this scope, or None if caching is disabled.

close()

Stop the ephemeral signer this gate started, if any.

Idempotent. Long-lived processes (the TUI) should call this explicitly so the signer thread and temp socket don't outlive the gate's last use.

Source code in src/terok_sandbox/gate/mirror.py
def close(self) -> None:
    """Stop the ephemeral signer this gate started, if any.

    Idempotent.  Long-lived processes (the TUI) should call this
    explicitly so the signer thread and temp socket don't outlive
    the gate's last use.
    """
    if self._signer is not None:
        self._signer.stop()
        self._signer = None

__del__()

Best-effort signer teardown on GC.

Source code in src/terok_sandbox/gate/mirror.py
def __del__(self) -> None:
    """Best-effort signer teardown on GC."""
    with contextlib.suppress(Exception):  # __del__ never raises
        self.close()

sync(branches=None, force_reinit=False)

Sync the host-side git mirror gate from upstream.

With an upstream configured, clones (or fetches) from it using the project's SSH setup. Without one, initialises a bare repo in place and returns a no-op sync — the gate then serves as a local-only remote that the container can push to, giving the agent somewhere to stage commits even when there is nothing external to mirror.

A remoteless gate that already exists is a proper no-op: nothing re-initialises, and the returned branch list is empty.

Source code in src/terok_sandbox/gate/mirror.py
def sync(
    self,
    branches: list[str] | None = None,
    force_reinit: bool = False,
) -> GateSyncResult:
    """Sync the host-side git mirror gate from upstream.

    With an upstream configured, clones (or fetches) from it using the
    project's SSH setup.  Without one, initialises a bare repo in place
    and returns a no-op sync — the gate then serves as a local-only
    remote that the container can push to, giving the agent somewhere
    to stage commits even when there is nothing external to mirror.

    A remoteless gate that already exists is a proper no-op: nothing
    re-initialises, and the returned branch list is empty.
    """
    self._validate_gate()

    gate_dir = self._gate_path
    gate_exists = gate_dir.exists()
    gate_dir.parent.mkdir(parents=True, exist_ok=True)

    env = self._ssh_env()
    created = False
    if force_reinit and gate_exists:
        try:
            if gate_dir.is_dir():
                shutil.rmtree(gate_dir)
        except Exception as exc:
            logger.warning(f"Failed to remove gate dir {gate_dir}: {exc}")
        gate_exists = False

    if not gate_exists:
        if self._upstream_url:
            _clone_gate_mirror(self._upstream_url, gate_dir, env)
        else:
            _init_remoteless_gate(gate_dir)
        created = True

    # A remoteless gate has nothing to fetch — skip ``git remote update``
    # (which would fail on a repo with no origin) and the clone-cache
    # refresh (there is no bare mirror to track).
    if not self._upstream_url:
        return {
            "path": str(gate_dir),
            "upstream_url": None,
            "created": created,
            "success": True,
            "updated_branches": [],
            "errors": [],
            "cache_refreshed": False,
        }

    sync_result = self.sync_branches(branches)

    # Refresh the non-bare clone cache from the bare mirror (best-effort).
    cache_refreshed = False
    if sync_result["success"] and self._clone_cache_base:
        cache_refreshed = self._refresh_clone_cache()

    return {
        "path": str(gate_dir),
        "upstream_url": self._upstream_url,
        "created": created,
        "success": sync_result["success"],
        "updated_branches": sync_result["updated_branches"],
        "errors": sync_result["errors"],
        "cache_refreshed": cache_refreshed,
    }

sync_branches(branches=None)

Sync specific branches in the gate from upstream.

Parameters:

Name Type Description Default
branches list[str] | None

List of branches to sync (default: all via remote update)

None

Returns:

Type Description
BranchSyncResult

Dict with keys: success, updated_branches, errors

Source code in src/terok_sandbox/gate/mirror.py
def sync_branches(self, branches: list[str] | None = None) -> BranchSyncResult:
    """Sync specific branches in the gate from upstream.

    Args:
        branches: List of branches to sync (default: all via remote update)

    Returns:
        Dict with keys: success, updated_branches, errors
    """
    gate_dir = self._gate_path

    if not gate_dir.exists():
        return {"success": False, "updated_branches": [], "errors": ["Gate not initialized"]}

    self._validate_gate()

    env = self._ssh_env()
    errors: list[str] = []
    updated: list[str] = []

    try:
        cmd = ["git", "-C", str(gate_dir), "remote", "update", "--prune"]
        result = subprocess.run(cmd, capture_output=True, text=True, env=env, timeout=120)  # nosec B603 — argv is a fixed list controlled by this module

        if result.returncode != 0:
            errors.append(f"remote update failed: {result.stderr}")
        else:
            updated = branches if branches else ["all"]

    except subprocess.TimeoutExpired:
        errors.append("Sync timed out")
    except Exception as e:
        errors.append(str(e))

    return {"success": len(errors) == 0, "updated_branches": updated, "errors": errors}

compare_vs_upstream(branch=None)

Compare gate HEAD vs upstream HEAD for a branch.

Parameters:

Name Type Description Default
branch str | None

Branch to compare (default: configured default_branch)

None

Returns:

Type Description
GateStalenessInfo

GateStalenessInfo with comparison results

Source code in src/terok_sandbox/gate/mirror.py
def compare_vs_upstream(self, branch: str | None = None) -> GateStalenessInfo:
    """Compare gate HEAD vs upstream HEAD for a branch.

    Args:
        branch: Branch to compare (default: configured default_branch)

    Returns:
        GateStalenessInfo with comparison results
    """
    branch = branch or self._default_branch
    now = datetime.now().isoformat()

    if not branch:
        return GateStalenessInfo(
            branch=None,
            gate_head=None,
            upstream_head=None,
            is_stale=False,
            commits_behind=None,
            commits_ahead=None,
            last_checked=now,
            error="No branch configured",
        )

    env = self._ssh_env()

    # Get gate HEAD
    gate_head = _get_gate_branch_head(self._gate_path, branch, env)
    if gate_head is None:
        return GateStalenessInfo(
            branch=branch,
            gate_head=None,
            upstream_head=None,
            is_stale=False,
            commits_behind=None,
            commits_ahead=None,
            last_checked=now,
            error="Gate not initialized",
        )

    # Get upstream HEAD
    if not self._upstream_url:
        return GateStalenessInfo(
            branch=branch,
            gate_head=gate_head,
            upstream_head=None,
            is_stale=False,
            commits_behind=None,
            commits_ahead=None,
            last_checked=now,
            error="No upstream URL configured",
        )

    upstream_info = _get_upstream_head(self._upstream_url, branch, env)
    if upstream_info is None:
        return GateStalenessInfo(
            branch=branch,
            gate_head=gate_head,
            upstream_head=None,
            is_stale=False,
            commits_behind=None,
            commits_ahead=None,
            last_checked=now,
            error="Could not reach upstream",
        )

    upstream_head = upstream_info["commit_hash"]
    is_stale = gate_head != upstream_head

    commits_behind = None
    commits_ahead = None
    if is_stale:
        commits_behind = _count_commits_range(self._gate_path, gate_head, upstream_head, env)
        commits_ahead = _count_commits_range(self._gate_path, upstream_head, gate_head, env)

    return GateStalenessInfo(
        branch=branch,
        gate_head=gate_head,
        upstream_head=upstream_head,
        is_stale=is_stale,
        commits_behind=commits_behind if is_stale else 0,
        commits_ahead=commits_ahead if is_stale else 0,
        last_checked=now,
        error=None,
    )

last_commit()

Get information about the last commit on the configured branch.

Returns None if the gate doesn't exist or is not accessible.

Source code in src/terok_sandbox/gate/mirror.py
def last_commit(self) -> CommitInfo | None:
    """Get information about the last commit on the configured branch.

    Returns ``None`` if the gate doesn't exist or is not accessible.
    """
    try:
        gate_dir = self._gate_path

        if not gate_dir.exists() or not gate_dir.is_dir():
            return None

        env = self._ssh_env()

        rev = f"refs/heads/{self._default_branch}" if self._default_branch else "HEAD"
        cmd = [
            "git",
            "-C",
            str(gate_dir),
            "log",
            "-1",
            rev,
            "--pretty=format:%H%x00%ad%x00%an%x00%s",
            "--date=iso",
        ]

        result = subprocess.run(cmd, capture_output=True, text=True, env=env)  # nosec B603 — argv is a fixed list controlled by this module
        if result.returncode != 0 and self._default_branch:
            cmd[5] = "HEAD"
            result = subprocess.run(cmd, capture_output=True, text=True, env=env)  # nosec B603 — argv is a fixed list controlled by this module
        if result.returncode != 0:
            return None

        parts = result.stdout.strip().split("\x00", 3)
        if len(parts) == 4:
            return {
                "commit_hash": parts[0],
                "commit_date": parts[1],
                "commit_author": parts[2],
                "commit_message": parts[3],
            }
        return None

    except Exception:
        return None

GateServer(*, mirror_root, token, scope, socket_path=None, host=None, port=None)

Per-container git gate, composed by the supervisor alongside the vault.

Serves the task's repo out of the shared per-project bare mirror at mirror_root, gated on the single token (scoped to scope). Binds either a per-container Unix socket (socket_path) or a per-container 127.0.0.1 TCP port (host + port); exactly one transport must be supplied.

Stateless and self-contained — the only terok dependency is the SELinux socket-labelling helper the Unix listener needs.

Bind the gate's configuration; start brings the listener up.

Source code in src/terok_sandbox/gate/server.py
def __init__(
    self,
    *,
    mirror_root: Path,
    token: str,
    scope: str,
    socket_path: Path | None = None,
    host: str | None = None,
    port: int | None = None,
) -> None:
    """Bind the gate's configuration; ``start`` brings the listener up."""
    self._mirror_root = mirror_root
    self._token = token
    self._scope = scope
    self._socket_path = socket_path
    self._host = host
    self._port = port
    self._server: HTTPServer | None = None
    self._thread: threading.Thread | None = None

start() async

Bind the listener and serve it on a daemon thread.

Source code in src/terok_sandbox/gate/server.py
async def start(self) -> None:
    """Bind the listener and serve it on a daemon thread."""
    import asyncio

    handler = _make_handler_class(
        self._mirror_root, _SingleTokenStore(self._token, self._scope)
    )
    if self._socket_path is not None:
        server: HTTPServer = await asyncio.get_running_loop().run_in_executor(
            None, _create_unix_server, handler, self._socket_path
        )
    elif self._host and self._port:
        server = _ThreadingHTTPServer((self._host, self._port), handler)
    else:
        raise ValueError("GateServer needs either socket_path or host+port")
    self._server = server
    self._thread = threading.Thread(target=server.serve_forever, daemon=True, name="terok-gate")
    self._thread.start()

stop() async

Stop the listener and join the serving thread.

shutdown() blocks until the accept loop exits, so it runs in an executor rather than inline on the event loop — calling it on the loop thread would deadlock.

Source code in src/terok_sandbox/gate/server.py
async def stop(self) -> None:
    """Stop the listener and join the serving thread.

    ``shutdown()`` blocks until the accept loop exits, so it runs in
    an executor rather than inline on the event loop — calling it on
    the loop thread would deadlock.
    """
    import asyncio

    if self._server is None:
        return
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, self._server.shutdown)
    self._server.server_close()
    if self._thread is not None:
        self._thread.join(timeout=2.0)
    self._server = None
    self._thread = None

ShieldHooks

Host-wide OCI hooks installer — no task context.

Thin pass-through to terok-shield's HooksInstaller. Kept as a class so the sandbox setup aggregator can swap it out in tests without poking around terok-shield internals.

install() staticmethod

Install global OCI hooks for shield egress firewalling.

Global hooks are required on all podman versions to survive container stop/start cycles (terok-shield#122). Single layout: scripts, ballast, and JSON descriptors all land in namespace_state_dir("shield") / "hooks"; containers.conf is patched to register that path.

Source code in src/terok_sandbox/integrations/shield.py
@staticmethod
def install() -> None:
    """Install global OCI hooks for shield egress firewalling.

    Global hooks are required on all podman versions to survive
    container stop/start cycles (terok-shield#122).  Single
    layout: scripts, ballast, and JSON descriptors all land in
    ``namespace_state_dir("shield") / "hooks"``;
    ``containers.conf`` is patched to register that path.
    """
    HooksInstaller().install()

uninstall() staticmethod

Remove the global OCI hooks install writes.

Idempotent — missing files are tolerated.

Source code in src/terok_sandbox/integrations/shield.py
@staticmethod
def uninstall() -> None:
    """Remove the global OCI hooks [`install`][terok_sandbox.integrations.shield.ShieldHooks.install] writes.

    Idempotent — missing files are tolerated.
    """
    HooksInstaller().uninstall()

ShieldManager(task_dir, cfg=None, *, runtime=ShieldRuntime.DEFAULT, loopback_ports_override=None)

Per-task wrapper around Shield.

Holds the (task_dir, cfg, runtime) tuple a Shield is built from and caches the constructed instance — the previous free-function surface rebuilt a Shield on every call, which paid the ShieldConfig + collaborator-wiring cost twice for every transition pair (pre_startup, updown, …).

Bypassable methods (pre_start, up, down) short-circuit when shield_bypass is set on the configuration. Non-bypassable methods (quarantine, state) always run — panic overrides every safety bypass, and state probes report what nft actually sees regardless of operator intent.

Bind the manager to a task directory and shield configuration.

runtime selects the container runtime category — DEFAULT for crun/runc/youki (dnsmasq on netns 127.0.0.1), KRUN for the libkrun microVM path (dnsmasq on a link-local address the guest can reach via passt). Callers that drive the launch path map their runtime string (RunSpec.runtime) to the enum.

loopback_ports_override replaces the cfg-derived (gate_port, token_broker_port, ssh_signer_port) triple — the per-container launch path passes the freshly-allocated broker and signer ports so shield's nft rules allow the actual host ports the supervisor binds.

Source code in src/terok_sandbox/integrations/shield.py
def __init__(
    self,
    task_dir: Path,
    cfg: SandboxConfig | None = None,
    *,
    runtime: ShieldRuntime = ShieldRuntime.DEFAULT,
    loopback_ports_override: tuple[int, ...] | None = None,
) -> None:
    """Bind the manager to a task directory and shield configuration.

    *runtime* selects the container runtime category — ``DEFAULT``
    for crun/runc/youki (dnsmasq on netns ``127.0.0.1``), ``KRUN``
    for the libkrun microVM path (dnsmasq on a link-local address
    the guest can reach via passt).  Callers that drive the launch
    path map their runtime string (``RunSpec.runtime``) to the
    enum.

    *loopback_ports_override* replaces the cfg-derived
    ``(gate_port, token_broker_port, ssh_signer_port)`` triple — the
    per-container launch path passes the freshly-allocated broker
    and signer ports so shield's nft rules allow the actual host
    ports the supervisor binds.
    """
    self._task_dir = task_dir
    self._cfg = cfg or SandboxConfig()
    self._runtime = runtime
    self._loopback_ports_override = loopback_ports_override

state_dir property

Per-task shield state directory: {task_dir}/shield.

bypass property

True when shield_bypass is set on the sandbox configuration.

shield cached property

Lazily constructed Shield instance.

Built from a ShieldConfig whose loopback_ports reflect the actual gate/broker/signer ports — auto-allocated configs default those fields to None, which would otherwise silently produce an empty tuple and a shield ruleset with no tcp dport <p> ip daddr 169.254.1.2 accept rules, causing container→host TCP traffic to fall through to the private-range reject (#156 regression follow-up).

pre_start(container)

Return extra podman run args for egress firewalling.

Returns an empty list (no firewall args) when the dangerous bypass_firewall_no_protection override is active.

Raises SystemExit with setup instructions when the podman environment requires one-time hook installation.

Source code in src/terok_sandbox/integrations/shield.py
def pre_start(self, container: str) -> list[str]:
    """Return extra ``podman run`` args for egress firewalling.

    Returns an empty list (no firewall args) when the dangerous
    ``bypass_firewall_no_protection`` override is active.

    Raises [`SystemExit`][SystemExit] with setup instructions when
    the podman environment requires one-time hook installation.
    """
    if self.bypass:
        warnings.warn(_BYPASS_WARNING, stacklevel=2)
        return []
    try:
        return self.shield.pre_start(container)
    except ShieldNeedsSetup as exc:
        raise SystemExit(str(exc)) from None

up(container, container_id)

Set shield to deny-all mode for a running container.

container is the operator-facing podman name (audit-log key); container_id is the full podman UUID — terok-shield's per- container hub socket is keyed on it. Both are mandatory: terok-shield removed the global-hub fallback in feat/per-container-supervisor.

Source code in src/terok_sandbox/integrations/shield.py
def up(self, container: str, container_id: str) -> None:
    """Set shield to deny-all mode for a running container.

    *container* is the operator-facing podman name (audit-log key);
    *container_id* is the full podman UUID — terok-shield's per-
    container hub socket is keyed on it.  Both are mandatory:
    terok-shield removed the global-hub fallback in
    ``feat/per-container-supervisor``.
    """
    if self.bypass:
        return
    self.shield.up(container, container_id)

down(container, container_id, *, allow_all=False)

Set shield to bypass mode (allow egress) for a running container.

container / container_id — see up. When allow_all is True, also permits private-range (RFC 1918) traffic.

Source code in src/terok_sandbox/integrations/shield.py
def down(self, container: str, container_id: str, *, allow_all: bool = False) -> None:
    """Set shield to bypass mode (allow egress) for a running container.

    *container* / *container_id* — see
    [`up`][terok_sandbox.integrations.shield.ShieldManager.up].  When
    *allow_all* is True, also permits private-range (RFC 1918)
    traffic.
    """
    if self.bypass:
        return
    self.shield.down(container, container_id, allow_all=allow_all)

quarantine(container)

Total network blackout — drop all traffic, log dropped traffic.

Ignores shield_bypass because panic overrides every safety bypass.

Source code in src/terok_sandbox/integrations/shield.py
def quarantine(self, container: str) -> None:
    """Total network blackout — drop all traffic, log dropped traffic.

    Ignores ``shield_bypass`` because panic overrides every safety bypass.
    """
    self.shield.quarantine(container)

state(container)

Return the live shield state for a running container.

Queries actual nft state even when bypass is set, because containers started before bypass was enabled may still have active rules.

Source code in src/terok_sandbox/integrations/shield.py
def state(self, container: str) -> ShieldState:
    """Return the live shield state for a running container.

    Queries actual nft state even when bypass is set, because
    containers started *before* bypass was enabled may still have
    active rules.
    """
    return self.shield.state(container)

status()

Return shield status dict from the sandbox configuration.

Reads only the sandbox configuration — does not instantiate the underlying Shield, so callers that only want configuration-level shape don't pay the Shield wire-up cost.

Source code in src/terok_sandbox/integrations/shield.py
def status(self) -> dict:
    """Return shield status dict from the sandbox configuration.

    Reads only the sandbox configuration — does not instantiate
    the underlying Shield, so callers that only want
    configuration-level shape don't pay the Shield wire-up cost.
    """
    result: dict = {
        "mode": "hook",
        "profiles": list(self._cfg.shield_profiles),
        "audit_enabled": self._cfg.shield_audit,
    }
    if self.bypass:
        result["bypass_firewall_no_protection"] = True
    return result

check_environment()

Check the podman environment for shield compatibility.

Returns a synthetic EnvironmentCheck with bypass info when the dangerous bypass override is active.

Source code in src/terok_sandbox/integrations/shield.py
def check_environment(self) -> EnvironmentCheck:
    """Check the podman environment for shield compatibility.

    Returns a synthetic [`EnvironmentCheck`][terok_shield.EnvironmentCheck]
    with bypass info when the dangerous bypass override is active.
    """
    if self.bypass:
        return EnvironmentCheck(
            ok=False,
            health="bypass",
            issues=["bypass_firewall_no_protection is set — egress firewall disabled"],
        )
    return self.shield.check_environment()

interactive_session(container)

Run the terminal clearance fallback for this task's shield.

Thin wrapper that spares callers from reaching into terok_shield.simple_clearance and rebuilding the state_dir themselves. Refuses to run when the D-Bus clearance hub is already handling the session.

Source code in src/terok_sandbox/integrations/shield.py
def interactive_session(self, container: str) -> None:
    """Run the terminal clearance fallback for this task's shield.

    Thin wrapper that spares callers from reaching into
    [`terok_shield.simple_clearance`][terok_shield.simple_clearance]
    and rebuilding the ``state_dir`` themselves.  Refuses to run
    when the D-Bus clearance hub is already handling the session.
    """
    from terok_shield.simple_clearance import run_simple_clearance

    run_simple_clearance(self.state_dir, container)

watch_session(container)

Stream shield blocked-access events for this task as JSON lines.

Thin wrapper that spares callers from reaching into terok_shield.watch and rebuilding the state_dir themselves.

Source code in src/terok_sandbox/integrations/shield.py
def watch_session(self, container: str) -> None:
    """Stream shield blocked-access events for this task as JSON lines.

    Thin wrapper that spares callers from reaching into
    [`terok_shield.watch`][terok_shield.watch] and rebuilding the
    ``state_dir`` themselves.
    """
    from terok_shield.watch import run_watch

    run_watch(self.state_dir, container)

PerContainerResources(container_runtime_dir, token_broker_port, ssh_signer_port, gate_port) dataclass

Per-container socket dir + (for TCP mode) ports.

Allocated once per launch so the same values reach mount flags, env vars, and the sidecar JSON the supervisor reads. Keeps concurrent containers from colliding on host-global filenames or ports.

container_runtime_dir instance-attribute

Host-side directory that becomes /run/terok/ inside the container. Contains the supervisor-bound vault.sock / ssh-agent.sock. Created (mode 0700) before the bind mount.

token_broker_port instance-attribute

Per-container TCP port for the vault proxy in TCP mode; None in socket mode.

ssh_signer_port instance-attribute

Per-container TCP port for the SSH signer in TCP mode; None in socket mode.

gate_port instance-attribute

Per-container TCP port for the git gate in TCP mode; None in socket mode.

ContainerRuntime

Bases: Protocol

The container runtime — factory for handles, plus operations that have no single-object receiver.

One instance per process, typically constructed at the top-level entry point and threaded down through higher layers (Sandbox, executor's AgentRunner, terok's CLI/TUI).

container(name)

Return a handle to the container named name.

Does not verify existence; call Container.state for that.

Source code in src/terok_sandbox/runtime/protocol.py
def container(self, name: str) -> Container:
    """Return a handle to the container named *name*.

    Does not verify existence; call [`Container.state`][terok_sandbox.runtime.protocol.Container.state] for that.
    """
    ...

containers_with_prefix(prefix)

Return handles for every container whose name starts with prefix.

Source code in src/terok_sandbox/runtime/protocol.py
def containers_with_prefix(self, prefix: str) -> list[Container]:
    """Return handles for every container whose name starts with *prefix*."""
    ...

image(ref)

Return a handle to the image identified by tag or ID ref.

Does not verify existence; call Image.exists for that.

Source code in src/terok_sandbox/runtime/protocol.py
def image(self, ref: str) -> Image:
    """Return a handle to the image identified by tag or ID *ref*.

    Does not verify existence; call [`Image.exists`][terok_sandbox.runtime.protocol.Image.exists] for that.
    """
    ...

images(*, dangling_only=False)

Enumerate local images.

dangling_only narrows to untagged images (those listed as <none>:<none>).

Source code in src/terok_sandbox/runtime/protocol.py
def images(self, *, dangling_only: bool = False) -> list[Image]:
    """Enumerate local images.

    *dangling_only* narrows to untagged images (those listed as
    ``<none>:<none>``).
    """
    ...

exec(container, cmd, *, timeout=None)

Run cmd inside container and return its completion record.

The operation that diverges most across backends: podman uses podman exec; the krun backend uses SSH over a passt-forwarded TCP port.

Source code in src/terok_sandbox/runtime/protocol.py
def exec(
    self,
    container: Container,
    cmd: list[str],
    *,
    timeout: float | None = None,
) -> ExecResult:
    """Run *cmd* inside *container* and return its completion record.

    The operation that diverges most across backends: podman uses
    ``podman exec``; the krun backend uses SSH over a passt-forwarded
    TCP port.
    """
    ...

exec_stdio(container, cmd, *, stdin, stdout, stderr=None, env=None, timeout=None)

Run cmd inside container with stdio bridged to caller-supplied streams.

Forwards bytes bidirectionally between stdin/stdout/stderr and the spawned process — distinct from exec, which captures output into an ExecResult. Used by the host-side ACP proxy to bridge a Unix socket to an in-container ACP-stdio agent without the runtime ever materialising the conversation.

Blocks until the child exits; returns the exit code. EOF on either side terminates forwarding cleanly. Implementations are expected to be transport-agnostic — stdin/stdout are arbitrary byte streams (a socket's file-object face, a pipe end, a test buffer).

Source code in src/terok_sandbox/runtime/protocol.py
def exec_stdio(
    self,
    container: Container,
    cmd: list[str],
    *,
    stdin: BinaryIO,
    stdout: BinaryIO,
    stderr: BinaryIO | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
) -> int:
    """Run *cmd* inside *container* with stdio bridged to caller-supplied streams.

    Forwards bytes bidirectionally between *stdin*/*stdout*/*stderr* and the
    spawned process — distinct from [`exec`][terok_sandbox.runtime.null.NullRuntime.exec], which captures output into
    an [`ExecResult`][terok_sandbox.runtime.protocol.ExecResult].  Used by the host-side ACP proxy to bridge a Unix
    socket to an in-container ACP-stdio agent without the runtime ever
    materialising the conversation.

    Blocks until the child exits; returns the exit code.  EOF on either
    side terminates forwarding cleanly.  Implementations are expected to
    be transport-agnostic — *stdin*/*stdout* are arbitrary byte streams
    (a socket's file-object face, a pipe end, a test buffer).
    """
    ...

force_remove(containers)

Forcibly stop and remove containers.

Best-effort — continues through individual failures and returns one ContainerRemoveResult per input. An already-absent container counts as removed (the post-condition holds).

Source code in src/terok_sandbox/runtime/protocol.py
def force_remove(self, containers: list[Container]) -> list[ContainerRemoveResult]:
    """Forcibly stop and remove *containers*.

    Best-effort — continues through individual failures and returns
    one [`ContainerRemoveResult`][terok_sandbox.runtime.protocol.ContainerRemoveResult] per input.  An already-absent
    container counts as *removed* (the post-condition holds).
    """
    ...

reserve_port(host='127.0.0.1')

Reserve a free TCP port on host.

The returned PortReservation exposes the port number via reservation.port and releases the socket on close. Use to pass a pre-reserved port to an external process.

Source code in src/terok_sandbox/runtime/protocol.py
def reserve_port(self, host: str = "127.0.0.1") -> PortReservation:
    """Reserve a free TCP port on *host*.

    The returned [`PortReservation`][terok_sandbox.runtime.protocol.PortReservation] exposes the port number via
    ``reservation.port`` and releases the socket on close.  Use to
    pass a pre-reserved port to an external process.
    """
    ...

ExecResult(exit_code, stdout, stderr) dataclass

Outcome of ContainerRuntime.exec.

Backend-neutral so the SSH-over-passt krun backend can fill it from an SSH response without pretending to be a subprocess.CompletedProcess.

exit_code instance-attribute

stdout instance-attribute

stderr instance-attribute

ok property

Convenience — True when the command exited with code 0.

GpuConfigError(message, *, hint=_CDI_HINT)

Bases: RuntimeError

CDI/NVIDIA misconfiguration detected during container launch.

Store the CDI hint alongside the standard error message.

Source code in src/terok_sandbox/runtime/podman.py
def __init__(self, message: str, *, hint: str = _CDI_HINT) -> None:
    """Store the CDI *hint* alongside the standard error *message*."""
    self.hint = hint
    super().__init__(message)

hint = hint instance-attribute

Image

Bases: Protocol

Handle to a local container image. Cheap to construct.

ref instance-attribute

Tag ("terok-l2-cli:abcd") or ID ("sha256:...") used on lookup.

id property

Resolved image ID, or None if the image is not present.

repository property

Repository portion of the tag ("<none>" for dangling).

tag property

Tag portion ("<none>" for dangling).

size property

Podman-rendered human-readable size ("1.2GB").

created property

Podman-rendered creation timestamp.

exists()

Return True if the image is present locally.

Source code in src/terok_sandbox/runtime/protocol.py
def exists(self) -> bool:
    """Return ``True`` if the image is present locally."""
    ...

labels()

Return the OCI Config.Labels as a flat string dict.

Source code in src/terok_sandbox/runtime/protocol.py
def labels(self) -> dict[str, str]:
    """Return the OCI ``Config.Labels`` as a flat string dict."""
    ...

history()

Return the CreatedBy string of each layer, top to bottom.

Source code in src/terok_sandbox/runtime/protocol.py
def history(self) -> list[str]:
    """Return the ``CreatedBy`` string of each layer, top to bottom."""
    ...

remove()

Remove the image; return True on success.

Source code in src/terok_sandbox/runtime/protocol.py
def remove(self) -> bool:
    """Remove the image; return ``True`` on success."""
    ...

KrunRuntime(*, transport, podman=None)

Container runtime that launches tasks inside KVM microVMs.

Composition, not inheritance: holds a PodmanRuntime for every lifecycle verb (podman --runtime krun is just podman driving a different OCI runtime) and a KrunTransport for the one verb that can't go through podman — exec.

The transport is required: there is no sensible default beyond a real SSH-over-passt-TCP implementation, and the fake exists explicitly for tests. Production callers wire the real transport at the ContainerRuntime selection point in the orchestrator.

Source code in src/terok_sandbox/runtime/krun.py
def __init__(
    self,
    *,
    transport: KrunTransport,
    podman: PodmanRuntime | None = None,
) -> None:
    self._podman = podman or PodmanRuntime()
    self._transport = transport

transport property

Return the transport used for exec.

container(name)

Return a KrunContainer handle wrapping the podman container — same lifecycle, krun-aware login_command.

Return type stays the Container Protocol rather than the narrower concrete class: mypy treats Protocol method return types as invariant, so a narrower annotation breaks structural ContainerRuntime matching for downstream consumers (terok's _runtime: ContainerRuntime assignment was the loud failure). The runtime value is genuinely a KrunContainer — callers needing the concrete type cast at the call site.

Source code in src/terok_sandbox/runtime/krun.py
def container(self, name: str) -> Container:
    """Return a [`KrunContainer`][terok_sandbox.runtime.krun.KrunContainer]
    handle wrapping the podman container — same lifecycle, krun-aware
    ``login_command``.

    Return type stays the [`Container`][terok_sandbox.runtime.protocol.Container]
    Protocol rather than the narrower concrete class: mypy treats
    Protocol method return types as invariant, so a narrower
    annotation breaks structural ``ContainerRuntime`` matching for
    downstream consumers (terok's ``_runtime: ContainerRuntime``
    assignment was the loud failure).  The runtime value is
    genuinely a ``KrunContainer`` — callers needing the concrete
    type ``cast`` at the call site.
    """
    return KrunContainer(name, runtime=self._podman, transport=self._transport)

containers_with_prefix(prefix)

Same prefix lookup as podman; rewrap each handle as a KrunContainer so its login_command routes through the TCP-SSH transport.

Same Protocol-invariance rationale as container for the wider declared return type.

Source code in src/terok_sandbox/runtime/krun.py
def containers_with_prefix(self, prefix: str) -> list[Container]:
    """Same prefix lookup as podman; rewrap each handle as a
    [`KrunContainer`][terok_sandbox.runtime.krun.KrunContainer] so its
    ``login_command`` routes through the TCP-SSH transport.

    Same Protocol-invariance rationale as
    [`container`][terok_sandbox.runtime.krun.KrunRuntime.container]
    for the wider declared return type.
    """
    return [
        KrunContainer(c.name, runtime=self._podman, transport=self._transport)
        for c in self._podman.containers_with_prefix(prefix)
    ]

image(ref)

Delegate image-handle construction to podman.

Source code in src/terok_sandbox/runtime/krun.py
def image(self, ref: str) -> Image:
    """Delegate image-handle construction to podman."""
    return self._podman.image(ref)

images(*, dangling_only=False)

Delegate image enumeration to podman.

Source code in src/terok_sandbox/runtime/krun.py
def images(self, *, dangling_only: bool = False) -> list[Image]:
    """Delegate image enumeration to podman."""
    return self._podman.images(dangling_only=dangling_only)

exec(container, cmd, *, timeout=None)

Route to the transport — typically SSH-over-passt-TCP.

Source code in src/terok_sandbox/runtime/krun.py
def exec(
    self,
    container: Container,
    cmd: list[str],
    *,
    timeout: float | None = None,
) -> ExecResult:
    """Route to the transport — typically SSH-over-passt-TCP."""
    if not cmd:
        raise ValueError("exec argv must not be empty")
    return self._transport.exec(container, cmd, timeout=timeout)

exec_stdio(container, cmd, *, stdin, stdout, stderr=None, env=None, timeout=None)

Route stdio-bridged exec to the transport.

Source code in src/terok_sandbox/runtime/krun.py
def exec_stdio(
    self,
    container: Container,
    cmd: list[str],
    *,
    stdin: BinaryIO,
    stdout: BinaryIO,
    stderr: BinaryIO | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
) -> int:
    """Route stdio-bridged exec to the transport."""
    if not cmd:
        raise ValueError("exec_stdio argv must not be empty")
    return self._transport.exec_stdio(
        container,
        cmd,
        stdin=stdin,
        stdout=stdout,
        stderr=stderr,
        env=env,
        timeout=timeout,
    )

force_remove(containers)

Delegate forcible removal to podman.

Source code in src/terok_sandbox/runtime/krun.py
def force_remove(self, containers: list[Container]) -> list[ContainerRemoveResult]:
    """Delegate forcible removal to podman."""
    return self._podman.force_remove(containers)

reserve_port(host='127.0.0.1')

Delegate port reservation to podman.

Source code in src/terok_sandbox/runtime/krun.py
def reserve_port(self, host: str = "127.0.0.1") -> PortReservation:
    """Delegate port reservation to podman."""
    return self._podman.reserve_port(host)

NullRuntime()

Stub ContainerRuntime for tests and dry-run modes.

All state lives in dictionaries on the runtime instance. Tests pre-populate fixtures via the set_container_state, add_image, etc. helpers.

Source code in src/terok_sandbox/runtime/null.py
def __init__(self) -> None:
    self._container_states: dict[str, str] = {}
    self._container_images: dict[str, str] = {}
    self._container_rw_sizes: dict[str, int] = {}
    self._container_exit_codes: dict[str, int] = {}
    self._ready_results: dict[str, bool] = {}
    self._image_records: dict[str, dict[str, str]] = {}
    self._image_labels: dict[str, dict[str, str]] = {}
    self._image_history: dict[str, tuple[str, ...]] = {}
    self._exec_results: dict[tuple[str, tuple[str, ...]], ExecResult] = {}
    self._exec_stdio_scripts: dict[
        tuple[str, tuple[str, ...]], tuple[tuple[ExecStdioStep, ...], int]
    ] = {}
    self._exec_stdio_calls: list[tuple[str, tuple[str, ...], dict[str, str]]] = []
    self._copy_in_calls: list[tuple[str, Path, str]] = []
    self._force_remove_calls: list[list[str]] = []

set_container_state(name, state)

Record state ("running", "exited", ...) for container name.

Source code in src/terok_sandbox/runtime/null.py
def set_container_state(self, name: str, state: str) -> None:
    """Record *state* (``"running"``, ``"exited"``, ...) for container *name*."""
    self._container_states[name] = state

set_container_image(name, image_ref)

Record the image ref behind container name.

Source code in src/terok_sandbox/runtime/null.py
def set_container_image(self, name: str, image_ref: str) -> None:
    """Record the image ref behind container *name*."""
    self._container_images[name] = image_ref

set_container_rw_size(name, bytes_)

Record the writable-layer size of container name.

Source code in src/terok_sandbox/runtime/null.py
def set_container_rw_size(self, name: str, bytes_: int) -> None:
    """Record the writable-layer size of container *name*."""
    self._container_rw_sizes[name] = bytes_

set_exit_code(name, code)

Record the exit code Container.wait will return for name.

Source code in src/terok_sandbox/runtime/null.py
def set_exit_code(self, name: str, code: int) -> None:
    """Record the exit code [`Container.wait`][terok_sandbox.runtime.Container.wait] will return for *name*."""
    self._container_exit_codes[name] = code

set_ready_result(name, ready)

Record the outcome Container.stream_initial_logs returns.

Source code in src/terok_sandbox/runtime/null.py
def set_ready_result(self, name: str, ready: bool) -> None:
    """Record the outcome [`Container.stream_initial_logs`][terok_sandbox.runtime.Container.stream_initial_logs] returns."""
    self._ready_results[name] = ready

add_image(ref, *, repository='', tag='', size='', created='', labels=None, history=())

Register an image fixture.

Source code in src/terok_sandbox/runtime/null.py
def add_image(
    self,
    ref: str,
    *,
    repository: str = "",
    tag: str = "",
    size: str = "",
    created: str = "",
    labels: dict[str, str] | None = None,
    history: tuple[str, ...] = (),
) -> None:
    """Register an image fixture."""
    self._image_records[ref] = {
        "repository": repository,
        "tag": tag,
        "size": size,
        "created": created,
    }
    if labels:
        self._image_labels[ref] = dict(labels)
    if history:
        self._image_history[ref] = tuple(history)

set_exec_result(container_name, cmd, result)

Pre-register the result exec returns for exact cmd.

Source code in src/terok_sandbox/runtime/null.py
def set_exec_result(
    self,
    container_name: str,
    cmd: tuple[str, ...],
    result: ExecResult,
) -> None:
    """Pre-register the result [`exec`][exec] returns for exact *cmd*."""
    self._exec_results[(container_name, cmd)] = result

set_exec_stdio_script(container_name, cmd, script, *, exit_code=0)

Pre-register a stdio interaction for exec_stdio.

script is a sequence of ("read", bytes) / ("write", bytes) steps replayed in order: read consumes the matching prefix from the caller-supplied stdin; write emits the bytes to stdout. Use this to drive deterministic ACP-handshake tests without spinning up a real container.

Source code in src/terok_sandbox/runtime/null.py
def set_exec_stdio_script(
    self,
    container_name: str,
    cmd: tuple[str, ...],
    script: tuple[ExecStdioStep, ...],
    *,
    exit_code: int = 0,
) -> None:
    """Pre-register a stdio interaction for [`exec_stdio`][terok_sandbox.runtime.null.NullRuntime.exec_stdio].

    *script* is a sequence of ``("read", bytes)`` / ``("write", bytes)``
    steps replayed in order: ``read`` consumes the matching prefix from
    the caller-supplied *stdin*; ``write`` emits the bytes to *stdout*.
    Use this to drive deterministic ACP-handshake tests without spinning
    up a real container.
    """
    self._exec_stdio_scripts[(container_name, cmd)] = (tuple(script), exit_code)

container(name)

Return a NullContainer handle.

Source code in src/terok_sandbox/runtime/null.py
def container(self, name: str) -> Container:
    """Return a [`NullContainer`][terok_sandbox.runtime.null.NullContainer] handle."""
    return NullContainer(name, runtime=self)

containers_with_prefix(prefix)

Return fixtures whose name starts with prefix-.

Source code in src/terok_sandbox/runtime/null.py
def containers_with_prefix(self, prefix: str) -> list[Container]:
    """Return fixtures whose name starts with ``prefix-``."""
    return [
        NullContainer(name, runtime=self)
        for name in self._container_states
        if name.startswith(f"{prefix}-")
    ]

image(ref)

Return a NullImage handle.

Source code in src/terok_sandbox/runtime/null.py
def image(self, ref: str) -> Image:
    """Return a [`NullImage`][terok_sandbox.runtime.null.NullImage] handle."""
    return NullImage(ref, runtime=self)

images(*, dangling_only=False)

Return fixture images; dangling_only filters by tag == "<none>".

Source code in src/terok_sandbox/runtime/null.py
def images(self, *, dangling_only: bool = False) -> list[Image]:
    """Return fixture images; *dangling_only* filters by ``tag == "<none>"``."""
    images: list[Image] = []
    for ref, rec in self._image_records.items():
        if dangling_only and rec.get("tag") != "<none>":
            continue
        images.append(NullImage(ref, runtime=self))
    return images

exec(container, cmd, *, timeout=None)

Return a pre-registered result, or a default empty success.

Source code in src/terok_sandbox/runtime/null.py
def exec(
    self,
    container: Container,
    cmd: list[str],
    *,
    timeout: float | None = None,
) -> ExecResult:
    """Return a pre-registered result, or a default empty success."""
    key = (container.name, tuple(cmd))
    return self._exec_results.get(key, ExecResult(exit_code=0, stdout="", stderr=""))

exec_stdio(container, cmd, *, stdin, stdout, stderr=None, env=None, timeout=None)

Replay a pre-registered stdio script, or no-op with exit code 0.

Records every call (with env) for test inspection. When a script is registered for (container, cmd), replays it in order: read consumes from stdin and asserts a match; write pushes bytes to stdout. Without a script, returns immediately with exit code 0 — matches the empty-success default of exec.

Source code in src/terok_sandbox/runtime/null.py
def exec_stdio(
    self,
    container: Container,
    cmd: list[str],
    *,
    stdin: BinaryIO,
    stdout: BinaryIO,
    stderr: BinaryIO | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
) -> int:
    """Replay a pre-registered stdio script, or no-op with exit code 0.

    Records every call (with env) for test inspection.  When a script is
    registered for ``(container, cmd)``, replays it in order: ``read``
    consumes from *stdin* and asserts a match; ``write`` pushes bytes to
    *stdout*.  Without a script, returns immediately with exit code 0
    — matches the empty-success default of [`exec`][terok_sandbox.runtime.null.NullRuntime.exec].
    """
    key = (container.name, tuple(cmd))
    self._exec_stdio_calls.append((container.name, tuple(cmd), dict(env or {})))
    script_entry = self._exec_stdio_scripts.get(key)
    if script_entry is None:
        return 0
    script, exit_code = script_entry
    for direction, payload in script:
        if direction == "read":
            got = stdin.read(len(payload))
            if got != payload:
                raise AssertionError(
                    f"NullRuntime.exec_stdio script mismatch for {cmd!r}: "
                    f"expected {payload!r}, got {got!r}"
                )
        elif direction == "write":
            stdout.write(payload)
            stdout.flush()
        else:
            raise ValueError(f"unknown exec_stdio script direction: {direction!r}")
    return exit_code

force_remove(containers)

Record the call and clear every fixture for each container.

Source code in src/terok_sandbox/runtime/null.py
def force_remove(self, containers: list[Container]) -> list[ContainerRemoveResult]:
    """Record the call and clear every fixture for each container."""
    names = [c.name for c in containers]
    self._force_remove_calls.append(names)
    for name in names:
        self._container_states.pop(name, None)
        self._container_images.pop(name, None)
        self._container_rw_sizes.pop(name, None)
        self._container_exit_codes.pop(name, None)
        self._ready_results.pop(name, None)
        # Drop any pre-registered exec results keyed by this container name
        self._exec_results = {
            key: result for key, result in self._exec_results.items() if key[0] != name
        }
    return [ContainerRemoveResult(name=n, removed=True) for n in names]

reserve_port(host='127.0.0.1')

Reserve a real host port (even null backend callers want a live port).

Source code in src/terok_sandbox/runtime/null.py
def reserve_port(self, host: str = "127.0.0.1") -> PortReservation:
    """Reserve a real host port (even null backend callers want a live port)."""
    return NullPortReservation(host)

PodmanRuntime

The default ContainerRuntime — talks to the podman CLI.

container(name)

Return a handle to the container named name.

Source code in src/terok_sandbox/runtime/podman.py
def container(self, name: str) -> Container:
    """Return a handle to the container named *name*."""
    return PodmanContainer(name, runtime=self)

containers_with_prefix(prefix)

Return handles for every container whose name starts with prefix-.

Single podman ps -a call under the hood; the returned handles are lazy (fresh inspect on property access).

Source code in src/terok_sandbox/runtime/podman.py
def containers_with_prefix(self, prefix: str) -> list[Container]:
    """Return handles for every container whose name starts with *prefix-*.

    Single ``podman ps -a`` call under the hood; the returned handles
    are lazy (fresh inspect on property access).
    """
    try:
        out = subprocess.check_output(  # nosec B603 B607 — argv built from fixed verbs + caller-controlled scope/container names — binary PATH lookup is the cross-distro contract
            [
                "podman",
                "ps",
                "-a",
                "--filter",
                f"name=^{prefix}-",
                "--format",
                "{{.Names}}",
                "--no-trunc",
            ],
            stderr=subprocess.DEVNULL,
            text=True,
        )
    except (subprocess.CalledProcessError, FileNotFoundError):
        return []
    return [PodmanContainer(name, runtime=self) for name in out.strip().splitlines() if name]

image(ref)

Return a handle to the image identified by tag or ID ref.

Source code in src/terok_sandbox/runtime/podman.py
def image(self, ref: str) -> Image:
    """Return a handle to the image identified by tag or ID *ref*."""
    return PodmanImage(ref)

images(*, dangling_only=False)

Enumerate local images.

dangling_only narrows to untagged <none>:<none> entries.

Source code in src/terok_sandbox/runtime/podman.py
def images(self, *, dangling_only: bool = False) -> list[Image]:
    """Enumerate local images.

    *dangling_only* narrows to untagged ``<none>:<none>`` entries.
    """
    cmd = ["podman", "images", "--format", _IMAGES_FORMAT, "--no-trunc"]
    if dangling_only:
        cmd[2:2] = ["--filter", "dangling=true"]
    try:
        result = subprocess.run(  # nosec B603 — argv is a fixed list controlled by this module
            cmd,
            capture_output=True,
            text=True,
            timeout=30,
            check=False,
        )
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return []
    if result.returncode != 0:
        return []

    images: list[Image] = []
    for line in result.stdout.strip().splitlines():
        parts = line.split("\t")
        if len(parts) == 5:
            repo, tag, image_id, size, created = parts
            images.append(
                PodmanImage(
                    ref=image_id,
                    repository=repo,
                    tag=tag,
                    size=size,
                    created=created,
                )
            )
    return images

exec(container, cmd, *, timeout=None)

Run cmd inside container via podman exec.

Lets FileNotFoundError (podman missing) and subprocess.TimeoutExpired propagate unchanged.

Raises ValueError if cmd is empty — podman exec with no argv is never a valid request and catching it here avoids a later IndexError in the debug log.

Source code in src/terok_sandbox/runtime/podman.py
def exec(
    self,
    container: Container,
    cmd: list[str],
    *,
    timeout: float | None = None,
) -> ExecResult:
    """Run *cmd* inside *container* via ``podman exec``.

    Lets [`FileNotFoundError`][FileNotFoundError] (podman missing) and
    [`subprocess.TimeoutExpired`][subprocess.TimeoutExpired] propagate unchanged.

    Raises [`ValueError`][ValueError] if *cmd* is empty — podman exec with
    no argv is never a valid request and catching it here avoids a
    later ``IndexError`` in the debug log.
    """
    if not cmd:
        raise ValueError("exec argv must not be empty")
    log_debug(
        f"PodmanRuntime.exec({container.name}, cmd[0]={cmd[0]!r}, "
        f"argc={len(cmd)}, timeout={timeout})"
    )
    proc = subprocess.run(  # nosec B603 B607 — argv built from fixed verbs + caller-controlled scope/container names — binary PATH lookup is the cross-distro contract
        ["podman", "exec", container.name, *cmd],
        capture_output=True,
        text=True,
        timeout=timeout,
        check=False,
    )
    return ExecResult(
        exit_code=proc.returncode,
        stdout=proc.stdout or "",
        stderr=proc.stderr or "",
    )

exec_stdio(container, cmd, *, stdin, stdout, stderr=None, env=None, timeout=None)

Bridge byte streams to podman exec -i for cmd inside container.

Synchronous: spawns the child, runs three daemon pump threads (one per direction) copying bytes until either side reaches EOF or the child exits, joins the pumps, returns the exit code. Async callers drive this via run_in_executor.

Lets FileNotFoundError (podman missing) propagate. On timeout, terminates the child (terminate → 2 s wait → kill) and re-raises TimeoutExpired.

Source code in src/terok_sandbox/runtime/podman.py
def exec_stdio(
    self,
    container: Container,
    cmd: list[str],
    *,
    stdin: BinaryIO,
    stdout: BinaryIO,
    stderr: BinaryIO | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
) -> int:
    """Bridge byte streams to ``podman exec -i`` for *cmd* inside *container*.

    Synchronous: spawns the child, runs three daemon pump threads
    (one per direction) copying bytes until either side reaches
    EOF or the child exits, joins the pumps, returns the exit code.
    Async callers drive this via
    [`run_in_executor`][asyncio.loop.run_in_executor].

    Lets [`FileNotFoundError`][] (podman missing) propagate.  On
    timeout, terminates the child (terminate → 2 s wait → kill) and
    re-raises [`TimeoutExpired`][subprocess.TimeoutExpired].
    """
    if not cmd:
        raise ValueError("exec_stdio argv must not be empty")
    log_debug(
        f"PodmanRuntime.exec_stdio({container.name}, cmd[0]={cmd[0]!r}, "
        f"argc={len(cmd)}, timeout={timeout})"
    )
    argv = ["podman", "exec", "-i"]
    for k, v in (env or {}).items():
        argv += ["-e", f"{k}={v}"]
    argv += [container.name, *cmd]

    proc = subprocess.Popen(  # noqa: S603 — argv built above  # nosec B603 — argv is a fixed list controlled by this module
        argv,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE if stderr is not None else subprocess.DEVNULL,
    )

    pumps = _start_stdio_pumps(proc, stdin, stdout, stderr)
    try:
        return proc.wait(timeout=timeout)
    except subprocess.TimeoutExpired:
        proc.terminate()
        try:
            proc.wait(timeout=2)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait()
        raise
    finally:
        # Join pumps before closing parent-side pipes — the stdout
        # and stderr pumps drain whatever the child wrote before
        # exiting, and closing the read end first would chop off
        # tail bytes that are still in the kernel buffer.
        for t in pumps:
            t.join(timeout=1)
        _close_proc_streams(proc)

force_remove(containers)

Best-effort podman rm -f of each container.

Continues through individual failures. An already-absent container counts as removed — the post-condition holds.

Source code in src/terok_sandbox/runtime/podman.py
def force_remove(self, containers: list[Container]) -> list[ContainerRemoveResult]:
    """Best-effort ``podman rm -f`` of each container.

    Continues through individual failures.  An already-absent
    container counts as *removed* — the post-condition holds.
    """
    results: list[ContainerRemoveResult] = []
    for container in containers:
        name = container.name
        try:
            log_debug(f"force_remove: podman rm -f {name} (start)")
            proc = subprocess.run(  # nosec B603 B607 — argv built from fixed verbs + caller-controlled scope/container names — binary PATH lookup is the cross-distro contract
                ["podman", "rm", "-f", name],
                check=False,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.PIPE,
                text=True,
                timeout=_CONTAINER_REMOVE_TIMEOUT,
            )
            if proc.returncode == 0:
                log_debug(f"force_remove: {name} (done)")
                results.append(ContainerRemoveResult(name=name, removed=True))
            elif "no such container" in (proc.stderr or "").lower():
                log_debug(f"force_remove: {name} already absent")
                results.append(ContainerRemoveResult(name=name, removed=True))
            else:
                reason = (proc.stderr or "").strip() or f"exit code {proc.returncode}"
                log_debug(f"force_remove: {name} failed: {reason}")
                results.append(ContainerRemoveResult(name=name, removed=False, error=reason))
        except subprocess.TimeoutExpired:
            log_debug(f"force_remove: {name} timed out")
            results.append(
                ContainerRemoveResult(
                    name=name,
                    removed=False,
                    error=f"timed out after {_CONTAINER_REMOVE_TIMEOUT}s",
                )
            )
        except FileNotFoundError:
            log_debug(f"force_remove: podman not found for {name}")
            results.append(
                ContainerRemoveResult(name=name, removed=False, error="podman not found")
            )
        except Exception as exc:  # noqa: BLE001
            log_debug(f"force_remove: {name} failed: {exc}")
            results.append(ContainerRemoveResult(name=name, removed=False, error=str(exc)))
    return results

reserve_port(host='127.0.0.1')

Reserve a free TCP port; release on close.

Source code in src/terok_sandbox/runtime/podman.py
def reserve_port(self, host: str = "127.0.0.1") -> PortReservation:
    """Reserve a free TCP port; release on close."""
    return PodmanPortReservation(host)

container_states(prefix)

Return {container_name: state} for matching containers.

Optimisation over [c.state for c in containers_with_prefix(prefix)] — single podman ps -a instead of N inspects. Backend-specific; not part of the ContainerRuntime protocol.

Source code in src/terok_sandbox/runtime/podman.py
def container_states(self, prefix: str) -> dict[str, str]:
    """Return ``{container_name: state}`` for matching containers.

    Optimisation over ``[c.state for c in containers_with_prefix(prefix)]``
    — single ``podman ps -a`` instead of N inspects.  Backend-specific;
    not part of the [`ContainerRuntime`][terok_sandbox.ContainerRuntime] protocol.
    """
    try:
        out = subprocess.check_output(  # nosec B603 B607 — argv built from fixed verbs + caller-controlled scope/container names — binary PATH lookup is the cross-distro contract
            [
                "podman",
                "ps",
                "-a",
                "--filter",
                f"name=^{prefix}-",
                "--format",
                "{{.Names}} {{.State}}",
                "--no-trunc",
            ],
            stderr=subprocess.DEVNULL,
            text=True,
        )
    except (subprocess.CalledProcessError, FileNotFoundError):
        return {}

    result: dict[str, str] = {}
    for line in out.strip().splitlines():
        parts = line.split(None, 1)
        if len(parts) == 2:
            result[parts[0]] = parts[1].lower()
    return result

container_rw_sizes(prefix)

Return {container_name: rw_bytes} for matching containers.

Single podman ps --size call — --size is expensive (overlay diffs) but one bulk call beats N inspects. Backend-specific; not part of the ContainerRuntime protocol.

Source code in src/terok_sandbox/runtime/podman.py
def container_rw_sizes(self, prefix: str) -> dict[str, int]:
    """Return ``{container_name: rw_bytes}`` for matching containers.

    Single ``podman ps --size`` call — ``--size`` is expensive (overlay
    diffs) but one bulk call beats N inspects.  Backend-specific; not
    part of the [`ContainerRuntime`][terok_sandbox.ContainerRuntime] protocol.
    """
    try:
        out = subprocess.check_output(  # nosec B603 B607 — argv built from fixed verbs + caller-controlled scope/container names — binary PATH lookup is the cross-distro contract
            [
                "podman",
                "ps",
                "-a",
                "--size",
                "--filter",
                f"name=^{prefix}-",
                "--format",
                "{{.Names}}\t{{.Size}}",
                "--no-trunc",
            ],
            stderr=subprocess.DEVNULL,
            text=True,
            timeout=120,
        )
    except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired):
        return {}

    result: dict[str, int] = {}
    for line in out.strip().splitlines():
        parts = line.split("\t", 1)
        if len(parts) == 2:
            parsed = _parse_human_size(parts[1])
            if parsed is not None:
                result[parts[0]] = parsed
    return result

TcpSSHTransport(*, identity_file, endpoint_resolver, ssh_user=DEFAULT_SSH_USER, ssh_binary='ssh')

OpenSSH-over-loopback-TCP implementation of KrunTransport.

Holds the host-side identity (private key path) and an endpoint resolver that maps a Container to a TcpEndpoint. The transport never touches the credentials vault directly — the orchestrator exports the %host key to a tmpfs file and passes that path in, keeping vault access out of the runtime layer.

Source code in src/terok_sandbox/runtime/krun_transport.py
def __init__(
    self,
    *,
    identity_file: Path,
    endpoint_resolver: Callable[[Container], TcpEndpoint],
    ssh_user: str = DEFAULT_SSH_USER,
    ssh_binary: str = "ssh",
) -> None:
    self._identity_file = identity_file
    self._resolver = endpoint_resolver
    self._user = ssh_user
    self._ssh = ssh_binary

exec(container, cmd, *, timeout=None)

Run cmd in the guest and return its outcome.

Each cmd token is shlex.quoted into a single remote command string so the in-guest shell treats embedded metacharacters as literal data — argv semantics are preserved across the inherently-shell-parsed ssh wire format.

Source code in src/terok_sandbox/runtime/krun_transport.py
def exec(
    self,
    container: Container,
    cmd: list[str],
    *,
    timeout: float | None = None,
) -> ExecResult:
    """Run *cmd* in the guest and return its outcome.

    Each *cmd* token is ``shlex.quote``d into a single remote
    command string so the in-guest shell treats embedded
    metacharacters as literal data — argv semantics are preserved
    across the inherently-shell-parsed ssh wire format.
    """
    endpoint = self._resolver(container)
    remote_str = _remote_command(cmd)
    argv = [*self._ssh_argv(endpoint), "--", remote_str]
    proc = subprocess.run(  # nosec B603 — argv built from fixed verbs + caller-controlled scope/container names
        argv,
        capture_output=True,
        text=True,
        timeout=timeout,
        check=False,
    )
    return ExecResult(
        exit_code=proc.returncode,
        stdout=proc.stdout or "",
        stderr=proc.stderr or "",
    )

exec_stdio(container, cmd, *, stdin, stdout, stderr=None, env=None, timeout=None)

Bridge byte streams to cmd in the guest; return its exit code.

Environment variables are propagated via a remote env prefix rather than SendEnv so the transport doesn't depend on the guest's AcceptEnv whitelist. Env var names are validated against [A-Za-z_][A-Za-z0-9_]* because the remote env command expects bare identifiers; values and cmd tokens are shlex.quoted so embedded shell metacharacters cross the wire as literal data.

Source code in src/terok_sandbox/runtime/krun_transport.py
def exec_stdio(
    self,
    container: Container,
    cmd: list[str],
    *,
    stdin: BinaryIO,
    stdout: BinaryIO,
    stderr: BinaryIO | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
) -> int:
    """Bridge byte streams to *cmd* in the guest; return its exit code.

    Environment variables are propagated via a remote ``env`` prefix
    rather than ``SendEnv`` so the transport doesn't depend on the
    guest's ``AcceptEnv`` whitelist.  Env var **names** are
    validated against ``[A-Za-z_][A-Za-z0-9_]*`` because the remote
    ``env`` command expects bare identifiers; values and *cmd*
    tokens are ``shlex.quote``d so embedded shell metacharacters
    cross the wire as literal data.
    """
    endpoint = self._resolver(container)
    remote_str = _remote_command(cmd, env=env)
    argv = [*self._ssh_argv(endpoint), "--", remote_str]

    proc = subprocess.Popen(  # noqa: S603 — argv built above  # nosec B603 — argv is built from fixed verbs + caller-controlled scope/container names
        argv,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE if stderr is not None else subprocess.DEVNULL,
    )
    _start_stdio_pumps(proc, stdin, stdout, stderr)
    try:
        return proc.wait(timeout=timeout)
    except subprocess.TimeoutExpired:
        proc.terminate()
        try:
            proc.wait(timeout=2)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait()
        raise

login_command(container, *, command=())

Return an ssh argv that attaches a PTY to the guest's shell.

Mirrors what PodmanContainer.login_command does for the conventional runtime — emits the argv the operator (or terok login) execs into. Adds -tt so sshd allocates a real PTY even when stdin isn't a terminal (the caller may be running under tmux or an IDE proxy).

Both the empty-command path (interactive login → bash -l) and the explicit-command path land at /workspace via _at_workspace, so the operator's starting cwd matches what podman exec gives under crun. Argv tokens past -- are shlex.quoted (same helper the exec paths use) so the SSH wire format preserves argv semantics across the login-shell parse on the far side.

Source code in src/terok_sandbox/runtime/krun_transport.py
def login_command(
    self,
    container: Container,
    *,
    command: tuple[str, ...] = (),
) -> list[str]:
    """Return an ``ssh`` argv that attaches a PTY to the guest's shell.

    Mirrors what [`PodmanContainer.login_command`][terok_sandbox.runtime.podman.PodmanContainer.login_command]
    does for the conventional runtime — emits the argv the operator
    (or ``terok login``) execs into.  Adds ``-tt`` so sshd allocates
    a real PTY even when stdin isn't a terminal (the caller may be
    running under tmux or an IDE proxy).

    Both the empty-*command* path (interactive login → ``bash -l``)
    and the explicit-*command* path land at ``/workspace`` via
    ``_at_workspace``, so the operator's starting cwd matches what
    ``podman exec`` gives under crun.  Argv tokens past ``--`` are ``shlex.quote``d
    (same helper the exec paths use) so the SSH wire format
    preserves argv semantics across the login-shell parse on the
    far side.
    """
    endpoint = self._resolver(container)
    argv = self._ssh_argv(endpoint, interactive=True)
    remote = _remote_command(list(command)) if command else _at_workspace("bash -l")
    return [*argv, "--", remote]

LifecycleHooks(pre_start=None, post_start=None, post_ready=None, post_stop=None) dataclass

Optional callbacks fired at container lifecycle transitions.

All slots are None by default. Sandbox.run() fires pre_start before podman run and post_start after a successful launch. post_ready and post_stop are available for callers to invoke at the appropriate time (e.g. after log streaming or container exit).

pre_start = None class-attribute instance-attribute

Fired before podman run.

post_start = None class-attribute instance-attribute

Fired after a successful podman run.

post_ready = None class-attribute instance-attribute

Fired when the container reports ready (caller responsibility).

post_stop = None class-attribute instance-attribute

Fired after the container exits (caller responsibility).

RunSpec(container_name, image, env, volumes, command, task_dir, gpu_enabled=False, memory=None, cpus=None, extra_args=(), unrestricted=True, sealed=False, hostname=None, runtime=None, annotations=(lambda: MappingProxyType({}))(), loopback_ports=()) dataclass

Everything needed for a single podman run invocation.

container_name instance-attribute

Unique container name.

image instance-attribute

Image tag to run (e.g. terok-l1-cli:ubuntu-24.04).

env instance-attribute

Environment variables injected into the container.

volumes instance-attribute

Host↔container directory bindings (mounted or injected per sealed).

command instance-attribute

Command to execute inside the container.

task_dir instance-attribute

Host-side task directory (for shield state, logs, etc.).

gpu_enabled = False class-attribute instance-attribute

Whether to pass GPU device args to podman.

memory = None class-attribute instance-attribute

Podman --memory value (e.g. "4g", "512m"). None = unlimited.

cpus = None class-attribute instance-attribute

Podman --cpus value (e.g. "2.0", "0.5"). None = unlimited.

extra_args = () class-attribute instance-attribute

Additional podman run arguments (e.g. port publishing).

unrestricted = True class-attribute instance-attribute

When False, adds --security-opt no-new-privileges.

sealed = False class-attribute instance-attribute

When True, volumes are injected via podman cp instead of bind-mounted.

hostname = None class-attribute instance-attribute

Override the in-container hostname (podman --hostname).

When None (default), podman assigns the short container ID as the hostname. Orchestrators may set this to a value that correlates with their own task/container identity — e.g. so a shell prompt inside the container matches the name the operator sees in task lists. Must be a valid DNS hostname (letters/digits/hyphens, ≤253 chars); podman enforces the rule when parsing the flag.

runtime = None class-attribute instance-attribute

OCI runtime to use (podman --runtime).

None (default) lets podman pick — its built-in default is crun. Set to "krun" to launch the task inside a KVM microVM (Phase 3 KrunRuntime). Backend-neutral here; the runtime string is passed through verbatim and any compatibility decisions live higher up (e.g. orchestrator config validation).

annotations = field(default_factory=(lambda: MappingProxyType({}))) class-attribute instance-attribute

OCI annotations forwarded as podman --annotation k=v entries.

Keys must be on SAFE_ANNOTATION_KEYS. Declared as Mapping so callers can pass plain dicts; __post_init__ snapshots into a MappingProxyType so the frozen-dataclass guarantee holds against caller mutation.

loopback_ports = () class-attribute instance-attribute

Per-container host ports shield's nft rules must allow.

Empty falls back to the cfg-resolved (gate_port, token_broker_port, ssh_signer_port) triple (legacy / single-daemon shape). The per-container launch path passes (gate_port, per_container.token_broker_port, per_container.ssh_signer_port) so shield allows the actual ports the supervisor binds — without this override, shield blocks the per-container broker/signer with "No route to host".

__post_init__()

Snapshot annotations so a caller-owned dict can't mutate the spec.

Callers may legitimately pass a plain dict (Pydantic, JSON-load, tests) — we'd lose the frozen guarantee if we kept the live reference. Take a copy, wrap it in a MappingProxyType, and write it back through object.__setattr__ since the dataclass itself is frozen=True.

Source code in src/terok_sandbox/sandbox.py
def __post_init__(self) -> None:
    """Snapshot ``annotations`` so a caller-owned dict can't mutate the spec.

    Callers may legitimately pass a plain ``dict`` (Pydantic, JSON-load,
    tests) — we'd lose the frozen guarantee if we kept the live
    reference.  Take a copy, wrap it in a ``MappingProxyType``, and
    write it back through ``object.__setattr__`` since the dataclass
    itself is ``frozen=True``.
    """
    object.__setattr__(self, "annotations", MappingProxyType(dict(self.annotations)))

Sandbox(config=None, *, runtime=None)

Per-task orchestrator composing runtime + services.

Holds a ContainerRuntime (defaulting to PodmanRuntime) and a SandboxConfig, and exposes gate / shield / lifecycle verbs bundled in one place. Container lifecycle verbs delegate to the runtime; the launch path (run, create) still drives podman directly because shield / gate integration is podman-specific today.

Source code in src/terok_sandbox/sandbox.py
def __init__(
    self,
    config: SandboxConfig | None = None,
    *,
    runtime: ContainerRuntime | None = None,
) -> None:
    # ``Sandbox`` is the facade that launches containers + composes
    # gate/vault managers; resolve TCP ports here so the same
    # registry pass covers everyone downstream.  ``cfg`` itself
    # stays pure — only the cfg ``Sandbox`` carries is allocated.
    self._cfg = (config or SandboxConfig()).with_resolved_ports()
    self._runtime: ContainerRuntime = runtime or PodmanRuntime()

config property

Return the sandbox configuration.

runtime property

Return the injected container runtime.

mint_gate_token()

Mint a fresh per-container gate token.

The gate lives in each container's supervisor; the token travels to the container via the sidecar and is validated in-process, so there is nothing to persist.

Source code in src/terok_sandbox/sandbox.py
def mint_gate_token(self) -> str:
    """Mint a fresh per-container gate token.

    The gate lives in each container's supervisor; the token
    travels to the container via the sidecar and is validated
    in-process, so there is nothing to persist.
    """
    from .gate.tokens import mint_gate_token

    return mint_gate_token()

gate_url(repo_path, token)

Build the in-container HTTP URL for gate access to repo_path.

Always uses the fixed loopback bridge port (see _CONTAINER_GATE_PORT): the container reaches the per-container gate through the socat bridge in both transport modes, so the URL carries no host address (gate_port is None in socket mode).

Source code in src/terok_sandbox/sandbox.py
def gate_url(self, repo_path: Path, token: str) -> str:
    """Build the in-container HTTP URL for gate access to *repo_path*.

    Always uses the fixed loopback bridge port (see
    `_CONTAINER_GATE_PORT`): the container reaches the per-container
    gate through the socat bridge in both transport modes, so the URL
    carries no host address (``gate_port`` is ``None`` in socket mode).
    """
    rel = repo_path.relative_to(self._cfg.gate_base_path).as_posix()
    return f"http://{token}@localhost:{_CONTAINER_GATE_PORT}/{rel}"

pre_start_args(container, task_dir, *, runtime=None, loopback_ports=())

Return extra podman args for shield integration.

runtime is the podman --runtime selector — passed to ShieldRuntime.from_runtime_name so shield picks the right dnsmasq bind for the krun guest's isolated loopback.

loopback_ports overrides shield's cfg-derived allowlist with per-container ports (see RunSpec.loopback_ports).

Source code in src/terok_sandbox/sandbox.py
def pre_start_args(
    self,
    container: str,
    task_dir: Path,
    *,
    runtime: str | None = None,
    loopback_ports: tuple[int, ...] = (),
) -> list[str]:
    """Return extra podman args for shield integration.

    *runtime* is the podman ``--runtime`` selector — passed to
    [`ShieldRuntime.from_runtime_name`][terok_shield.ShieldRuntime.from_runtime_name]
    so shield picks the right dnsmasq bind for the krun guest's
    isolated loopback.

    *loopback_ports* overrides shield's cfg-derived allowlist
    with per-container ports (see ``RunSpec.loopback_ports``).
    """
    from .integrations.shield import ShieldManager, ShieldRuntime

    return ShieldManager(
        task_dir,
        self._cfg,
        runtime=ShieldRuntime.from_runtime_name(runtime),
        loopback_ports_override=loopback_ports or None,
    ).pre_start(container)

shield_down(container, container_id, task_dir)

Remove shield rules for a container (allow all egress).

container is the operator-facing podman name (audit-log key); container_id is the full podman UUID — terok-shield's per- container hub socket is keyed on it. Both are mandatory.

Source code in src/terok_sandbox/sandbox.py
def shield_down(self, container: str, container_id: str, task_dir: Path) -> None:
    """Remove shield rules for a container (allow all egress).

    *container* is the operator-facing podman name (audit-log key);
    *container_id* is the full podman UUID — terok-shield's per-
    container hub socket is keyed on it.  Both are mandatory.
    """
    from .integrations.shield import ShieldManager

    ShieldManager(task_dir, self._cfg).down(container, container_id)

run(spec, *, hooks=None)

Launch a detached container from spec.

In shared mode (default), assembles and executes a single podman run -d with bind mounts.

In sealed mode (spec.sealed), splits into create → inject → start: the container is created without volumes, directories are copied in via podman cp, and the container is then started.

Fires hooks.pre_start before creation and hooks.post_start after a successful start. Raises GpuConfigError when the launch fails due to NVIDIA CDI misconfiguration.

Source code in src/terok_sandbox/sandbox.py
def run(self, spec: RunSpec, *, hooks: LifecycleHooks | None = None) -> None:
    """Launch a detached container from *spec*.

    In **shared** mode (default), assembles and executes a single
    ``podman run -d`` with bind mounts.

    In **sealed** mode (``spec.sealed``), splits into create → inject →
    start: the container is created without volumes, directories are
    copied in via ``podman cp``, and the container is then started.

    Fires *hooks.pre_start* before creation and *hooks.post_start*
    after a successful start.  Raises [`GpuConfigError`][terok_sandbox.GpuConfigError] when the
    launch fails due to NVIDIA CDI misconfiguration.
    """
    if spec.sealed:
        self.create(spec, hooks=hooks)
        # ``live`` volumes are bind-mounted (handled by _build_cmd);
        # only the rest get copied in here.
        present = tuple(v for v in spec.volumes if not v.live and v.host_path.exists())
        # Drop overlay file mounts (a file landing inside a sibling
        # dir mount); the dir-copy already wrote them, and podman cp
        # refuses to overwrite.
        dir_targets = tuple(v.container_path for v in present if v.host_path.is_dir())

        def _under_dir_mount(path: str) -> bool:
            return any(path == d or path.startswith(d.rstrip("/") + "/") for d in dir_targets)

        effective = tuple(
            v for v in present if v.host_path.is_dir() or not _under_dir_mount(v.container_path)
        )
        self._ensure_parents(spec.container_name, effective)
        for vol in effective:
            self.copy_to(spec.container_name, vol.host_path, vol.container_path)
        self.start(spec.container_name, hooks=hooks)
        return

    cmd = self._build_cmd(spec, verb="run")
    print("$", shlex.join(redact_env_args(cmd)))

    if hooks and hooks.pre_start:
        hooks.pre_start()

    self._exec_podman(cmd)

    if hooks and hooks.post_start:
        hooks.post_start()

create(spec, *, hooks=None)

Create a container without starting it.

Returns the container name. Fires hooks.pre_start before podman create. The container can then receive injected files via copy_to before being started with start.

Source code in src/terok_sandbox/sandbox.py
def create(self, spec: RunSpec, *, hooks: LifecycleHooks | None = None) -> str:
    """Create a container without starting it.

    Returns the container name.  Fires *hooks.pre_start* before
    ``podman create``.  The container can then receive injected files
    via [`copy_to`][terok_sandbox.sandbox.Sandbox.copy_to] before being started with [`start`][terok_sandbox.sandbox.Sandbox.start].
    """
    cmd = self._build_cmd(spec, verb="create")
    print("$", shlex.join(redact_env_args(cmd)))

    if hooks and hooks.pre_start:
        hooks.pre_start()

    self._exec_podman(cmd)
    return spec.container_name

start(container_name, *, hooks=None)

Start a previously created container via the runtime.

Fires hooks.post_start after a successful start.

Source code in src/terok_sandbox/sandbox.py
def start(self, container_name: str, *, hooks: LifecycleHooks | None = None) -> None:
    """Start a previously created container via the runtime.

    Fires *hooks.post_start* after a successful start.
    """
    self._runtime.container(container_name).start()
    if hooks and hooks.post_start:
        hooks.post_start()

copy_to(container_name, src, dest)

Copy a host path into a stopped container via the runtime.

Source code in src/terok_sandbox/sandbox.py
def copy_to(self, container_name: str, src: Path, dest: str) -> None:
    """Copy a host path into a stopped container via the runtime."""
    self._runtime.container(container_name).copy_in(src, dest)

stream_logs(container, *, timeout=None, ready_check=None)

Stream container logs until ready_check matches or timeout.

Source code in src/terok_sandbox/sandbox.py
def stream_logs(
    self,
    container: str,
    *,
    timeout: float | None = None,
    ready_check: Callable[[str], bool] | None = None,
) -> bool:
    """Stream container logs until *ready_check* matches or timeout."""
    check = ready_check or (lambda line: READY_MARKER in line)
    return self._runtime.container(container).stream_initial_logs(check, timeout)

wait_for_exit(container, timeout=None)

Block until container exits; return its exit code.

Source code in src/terok_sandbox/sandbox.py
def wait_for_exit(self, container: str, timeout: float | None = None) -> int:
    """Block until *container* exits; return its exit code."""
    return self._runtime.container(container).wait(timeout)

stop(containers)

Best-effort stop and remove containers.

Returns one ContainerRemoveResult per entry.

Source code in src/terok_sandbox/sandbox.py
def stop(self, containers: list[str]) -> list[ContainerRemoveResult]:
    """Best-effort stop and remove *containers*.

    Returns one [`ContainerRemoveResult`][terok_sandbox.runtime.ContainerRemoveResult] per entry.
    """
    handles = [self._runtime.container(name) for name in containers]
    return self._runtime.force_remove(handles)

task_state_dir(container)

Per-container state directory used by the launch / cleanup verbs.

The path is consumed by the launch module: compose writes the plan + readiness markers under it, and launch.cleanup removes it on teardown. The facade owns the derivationstate_dir / "sandbox" / "runs" / {container} — so the runs subtree layout has a single canonical owner.

Source code in src/terok_sandbox/sandbox.py
def task_state_dir(self, container: str) -> Path:
    """Per-container state directory used by the launch / cleanup verbs.

    The path is consumed by the
    [`launch`][terok_sandbox.launch] module: ``compose`` writes
    the plan + readiness markers under it, and
    [`launch.cleanup`][terok_sandbox.launch.cleanup] removes it on
    teardown.  The facade owns the *derivation* — ``state_dir /
    "sandbox" / "runs" / {container}`` — so the runs subtree
    layout has a single canonical owner.
    """
    return self._cfg.state_dir / "sandbox" / "runs" / container

init_ssh(scope)

Create an SSH manager for scope that owns its own CredentialDB.

Callers receive an SSHManager whose DB connection is opened against SandboxConfig.db_path. Use it as a context manager (with sandbox.init_ssh(scope) as m: ...) or call SSHManager.close when done.

Source code in src/terok_sandbox/sandbox.py
def init_ssh(self, scope: str) -> SSHManager:
    """Create an SSH manager for *scope* that owns its own ``CredentialDB``.

    Callers receive an ``SSHManager`` whose DB connection is opened
    against [`SandboxConfig.db_path`][terok_sandbox.SandboxConfig.db_path].  Use it as a context
    manager (``with sandbox.init_ssh(scope) as m: ...``) or call
    [`SSHManager.close`][terok_sandbox.SSHManager.close] when done.
    """
    from .vault.ssh.manager import SSHManager

    # Library code never prompts: a locked vault raises rather than
    # spinning up a prompt_toolkit prompt (which cannot own a running
    # event loop).  The frontend unlocks before calling in.
    return SSHManager.open_for_config(scope=scope, cfg=self._cfg, prompt_on_tty=False)

Sharing

Directory sharing semantics — expresses intent, not backend details.

The sandbox translates these into backend-specific flags (e.g. SELinux relabel :z / :Z for Podman) and uses them to drive sealed-mode decisions (private dirs are injected, shared dirs may be skipped).

PRIVATE = 'private' class-attribute instance-attribute

Exclusive to one container — no other container accesses this directory.

SHARED = 'shared' class-attribute instance-attribute

Shared across multiple containers (e.g. agent auth/config directories).

VolumeSpec(host_path, container_path, sharing=Sharing.SHARED, read_only=False, live=False) dataclass

Typed description of a host↔container directory binding.

Replaces raw volume strings ("host:container:z") with structured data so the sandbox can decide how to materialise each binding — as a bind mount (shared mode) or a podman cp injection (sealed mode).

sharing expresses the caller's intent (private vs shared); the sandbox translates that into backend-specific flags (e.g. SELinux relabeling for Podman). In sealed mode, sharing semantics can also drive whether a directory is injected (private) or skipped (shared config that the vault replaces).

host_path instance-attribute

Absolute host-side path to mount or copy in.

container_path instance-attribute

Absolute path inside the container (e.g. "/workspace").

sharing = Sharing.SHARED class-attribute instance-attribute

Sharing semantics: Sharing.PRIVATE or Sharing.SHARED.

read_only = False class-attribute instance-attribute

When True, mount the volume read-only inside the container.

Used to layer immutable views on top of writable directory mounts — e.g. exposing a credential file to the agent while preventing it from overwriting the host-side phantom token.

live = False class-attribute instance-attribute

When True, this volume is bind-mounted even in sealed mode.

Service plumbing (per-container vault/ssh-agent socket dir, gate socket, sourced-at-runtime bridge scripts) must be live: sealed-mode podman cp would snapshot an empty dir on the container side and the supervisor's later-bound sockets would never appear inside. Operator state (workspace, agent config) leaves this False so sealed mode gets fresh copies as designed.

to_mount_arg()

Format as a -v flag value for podman run.

Source code in src/terok_sandbox/sandbox.py
def to_mount_arg(self) -> str:
    """Format as a ``-v`` flag value for ``podman run``."""
    try:
        relabel = _SHARING_TO_RELABEL[self.sharing]
    except KeyError:
        raise ValueError(f"Unknown sharing mode: {self.sharing!r}") from None
    opts = relabel + (",ro" if self.read_only else "")
    return f"{self.host_path}:{self.container_path}:{opts}"

SetupVerdict

Bases: Enum

Result of needs_setup — five possible states a launch can be in.

OK = 'ok' class-attribute instance-attribute

Stamp matches all installed package versions exactly.

FIRST_RUN = 'first_run' class-attribute instance-attribute

No stamp on disk — the user has never run setup (or wiped state).

STALE_AFTER_UPDATE = 'stale_after_update' class-attribute instance-attribute

At least one installed package is newer than the stamped version.

STALE_AFTER_DOWNGRADE = 'stale_after_downgrade' class-attribute instance-attribute

At least one installed package is older than the stamped version.

Downgrades aren't tested and can leave systemd units / state DB in forms the older code can't interpret. Frontends should treat this as a hard stop until the user explicitly overrides.

STAMP_CORRUPT = 'stamp_corrupt' class-attribute instance-attribute

Stamp file exists but can't be parsed. Frontends should treat as FIRST_RUN.

SSHInitResult

Bases: TypedDict

Public summary of an ssh-init invocation.

key_id instance-attribute

key_type instance-attribute

fingerprint instance-attribute

comment instance-attribute

public_line instance-attribute

SSHManager(*, scope, db)

Mints SSH keypairs for a scope and stores them in the vault.

Each scope may hold multiple keys (e.g. GitHub + GitLab), each with a distinct fingerprint. init is idempotent for the default invocation: re-running ssh-init on a scope that already has a tk-main: key returns that key without minting a new one — the operator sees the same public line they registered upstream rather than a fresh side key they'd have to re-register. force=True rotates atomically (new key takes the scope in a single transaction that revokes prior assignments), and a custom comment opts back into the additive path so multi-deploy-key setups (GitHub + GitLab on one scope) still work — but only when asked for explicitly.

Two constructors for two ownership stories:

  • SSHManager(scope=..., db=...) binds the manager to a caller-owned CredentialDB. The manager uses it and never closes it. Right shape for tests and pooled connections.
  • SSHManager.open_for_config opens its own DB via the supplied config's chain seam (cfg.open_credential_db) and closes it on close / context exit / garbage collection. Right shape for one-shot CLI commands. Pass db_path when the caller already holds a runtime path (typically VaultStatus.db_path) so the open targets that DB while still using cfg's tier policy.

Bind the manager to a caller-provided CredentialDB.

Source code in src/terok_sandbox/vault/ssh/manager.py
def __init__(self, *, scope: str, db: CredentialDB) -> None:
    """Bind the manager to a caller-provided [`CredentialDB`][terok_sandbox.CredentialDB]."""
    self._scope = scope
    self._db = db
    self._owned_db: CredentialDB | None = None

open_for_config(*, scope, cfg, db_path=None, prompt_on_tty=False) classmethod

Return a manager that owns a connection opened via cfg.open_credential_db.

db_path defaults to cfg.db_path; callers with a runtime path override (e.g. the daemon's actual VaultStatus.db_path) pass it explicitly. Tier knobs always come from cfg — no cross-package fan-out when sandbox adds a new chain tier.

Source code in src/terok_sandbox/vault/ssh/manager.py
@classmethod
def open_for_config(
    cls,
    *,
    scope: str,
    cfg: SandboxConfig,
    db_path: Path | None = None,
    prompt_on_tty: bool = False,
) -> SSHManager:
    """Return a manager that owns a connection opened via ``cfg.open_credential_db``.

    *db_path* defaults to ``cfg.db_path``; callers with a runtime
    path override (e.g. the daemon's actual ``VaultStatus.db_path``)
    pass it explicitly.  Tier knobs always come from *cfg* — no
    cross-package fan-out when sandbox adds a new chain tier.
    """
    db = cfg.open_credential_db(db_path, prompt_on_tty=prompt_on_tty)
    manager = cls(scope=scope, db=db)
    manager._owned_db = db
    return manager

close()

Close the DB connection if this manager opened it (idempotent).

Source code in src/terok_sandbox/vault/ssh/manager.py
def close(self) -> None:
    """Close the DB connection if this manager opened it (idempotent)."""
    if self._owned_db is not None:
        self._owned_db.close()
        self._owned_db = None

__enter__()

Enter the runtime context; returns self.

Source code in src/terok_sandbox/vault/ssh/manager.py
def __enter__(self) -> SSHManager:
    """Enter the runtime context; returns self."""
    return self

__exit__(*exc)

Close the owned DB on exit.

Source code in src/terok_sandbox/vault/ssh/manager.py
def __exit__(self, *exc: object) -> None:
    """Close the owned DB on exit."""
    self.close()

__del__()

Best-effort close on garbage collection.

Source code in src/terok_sandbox/vault/ssh/manager.py
def __del__(self) -> None:
    """Best-effort close on garbage collection."""
    try:
        self.close()
    except Exception:  # noqa: BLE001  # nosec B110 — best-effort __del__ close on GC
        pass

init(key_type='ed25519', comment=None, force=False)

Provision a keypair for the scope.

Parameters:

Name Type Description Default
key_type str

"ed25519" (default) or "rsa".

'ed25519'
comment str | None

Comment to embed in the public key. When None, falls back to tk-main:<scope> on first init and to idempotent reuse on subsequent inits. A non-None value (including "") opts back into additive generation — the value lands verbatim and the call always mints a new key.

None
force bool

When True, rotate — the new key takes the scope in a single transaction that drops every prior assignment.

False

Returns:

Type Description
SSHInitResult

Metadata sufficient to display the key to the user or register

SSHInitResult

it with a remote. No filesystem paths.

Raises:

Type Description
InvalidScopeName

if the scope fails validation. Checked before any key material is generated so a rejected call leaves no orphaned row in ssh_keys.

Source code in src/terok_sandbox/vault/ssh/manager.py
def init(
    self,
    key_type: str = "ed25519",
    comment: str | None = None,
    force: bool = False,
) -> SSHInitResult:
    """Provision a keypair for the scope.

    Args:
        key_type: ``"ed25519"`` (default) or ``"rsa"``.
        comment: Comment to embed in the public key.  When ``None``,
            falls back to ``tk-main:<scope>`` on first init and to
            idempotent reuse on subsequent inits.  A non-``None``
            value (including ``""``) opts back into additive
            generation — the value lands verbatim and the call
            always mints a new key.
        force: When ``True``, rotate — the new key takes the scope in
            a single transaction that drops every prior assignment.

    Returns:
        Metadata sufficient to display the key to the user or register
        it with a remote.  No filesystem paths.

    Raises:
        InvalidScopeName: if the scope fails validation.  Checked
            *before* any key material is generated so a rejected
            call leaves no orphaned row in ``ssh_keys``.
    """
    _require_safe_scope(self._scope)

    # Idempotent default path: a bare ``ssh-init`` on a scope that
    # already carries a primary key returns the existing one rather
    # than minting a side key the user would have to re-register
    # upstream.  An explicit ``comment`` or ``force`` is treated as
    # the operator opting back into "make a new key": ``comment``
    # for additive multi-deploy-key setups, ``force`` for rotation.
    if not force and comment is None:
        for record in self._db.load_ssh_keys_for_scope(self._scope):
            if record.comment.startswith("tk-main:"):
                return SSHInitResult(
                    key_id=record.id,
                    key_type=record.key_type,
                    fingerprint=record.fingerprint,
                    comment=record.comment,
                    public_line=public_line_of(record),
                )

    existing = self._db.list_ssh_keys_for_scope(self._scope)
    # After a force-rotation the new key is the scope's only key, so it
    # *is* the primary even when prior keys existed.  An explicit empty
    # comment is honored; only ``None`` falls back to the derived default.
    primary = force or not existing
    effective_comment = (
        comment
        if comment is not None
        else self._default_comment(existing_count=len(existing), primary=primary)
    )

    keypair = generate_keypair(key_type, comment=effective_comment)
    key_id = self._db.store_ssh_key(
        key_type=keypair.key_type,
        private_der=keypair.private_der,
        public_blob=keypair.public_blob,
        comment=keypair.comment,
        fingerprint=keypair.fingerprint,
    )
    if force:
        self._db.replace_ssh_keys_for_scope(self._scope, keep_key_id=key_id)
    else:
        self._db.assign_ssh_key(self._scope, key_id)

    return SSHInitResult(
        key_id=key_id,
        key_type=keypair.key_type,
        fingerprint=keypair.fingerprint,
        comment=keypair.comment,
        public_line=keypair.public_line,
    )

CredentialDB(db_path, *, passphrase)

SQLite-backed store for provider credentials, SSH keys, and phantom tokens.

The on-disk file is always SQLCipher-encrypted. Callers either supply passphrase explicitly or leave it None to walk the runtime resolution chain (keyring → credentials.passphrase). A missing passphrase raises NoPassphraseError; a stale plaintext file raises PlaintextDBFoundError — both are diagnostic-only. Operator-facing remediation (which CLI verb to run, which doc page to read) is the caller's job: library code shouldn't bake one frontend's verbs into its exception text.

Source code in src/terok_sandbox/vault/store/db.py
def __init__(self, db_path: Path, *, passphrase: str) -> None:
    if not passphrase:
        raise NoPassphraseError(f"no SQLCipher passphrase available for {db_path}")
    db_path.parent.mkdir(parents=True, exist_ok=True)
    self._conn = _open_connection(db_path, passphrase)
    # Set by ``transaction()`` so write methods know whether to
    # commit themselves or defer to the outer scope.  Bool is fine
    # — ``BEGIN IMMEDIATE`` rejects nested calls, so the flag never
    # needs to count.
    self._in_outer_tx: bool = False
    try:
        self._conn.execute("PRAGMA journal_mode=WAL")
        self._conn.execute("PRAGMA foreign_keys=ON")
        ensure_credentials_schema(self._conn)
        migrate_credential_db_schema(self._conn)
    except _DB_ERRORS as exc:
        self._conn.close()
        if _looks_like_plaintext_db(db_path):
            raise PlaintextDBFoundError(
                f"{db_path} is a legacy plaintext sqlite DB — run "
                "`terok-sandbox credentials encrypt-db` to migrate it.\n"
                "  The migration path is deprecated in 0.8.0 and will be "
                "removed in 0.9.0; run it before upgrading past 0.8.x."
            ) from exc
        raise WrongPassphraseError(
            f"could not decrypt {db_path} — wrong passphrase, or the DB was"
            " created with a different key"
        ) from exc

transaction()

Run the body in an explicit BEGIN IMMEDIATE transaction.

Take the write lock up front so callers can compose read-then-write sequences and trust the whole thing serialises against concurrent writers. Every mutating method on this class (credentials, SSH keys, phantom tokens) consults the self._in_outer_tx flag this context manager sets and skips its own per-call commit — so the API contract is "any composition of write methods inside with db.transaction(): is atomic", with no kwarg plumbing at the call site.

On exit: COMMIT on clean exit, ROLLBACK on any BaseException (KeyboardInterrupt / SystemExit included — leaving a half-written %scope keypair around would be worse than a re-mint on retry).

Source code in src/terok_sandbox/vault/store/db.py
@contextlib.contextmanager
def transaction(self) -> Iterator[Any]:
    """Run the body in an explicit ``BEGIN IMMEDIATE`` transaction.

    Take the write lock up front so callers can compose
    read-then-write sequences and trust the whole thing serialises
    against concurrent writers.  Every mutating method on this
    class (credentials, SSH keys, phantom tokens) consults the
    ``self._in_outer_tx`` flag this context manager sets and skips
    its own per-call commit — so the API contract is "any
    composition of write methods inside ``with db.transaction():``
    is atomic", with no kwarg plumbing at the call site.

    On exit: ``COMMIT`` on clean exit, ``ROLLBACK`` on any
    ``BaseException`` (``KeyboardInterrupt`` / ``SystemExit``
    included — leaving a half-written ``%scope`` keypair around
    would be worse than a re-mint on retry).
    """
    self._conn.execute("BEGIN IMMEDIATE")
    self._in_outer_tx = True
    try:
        yield self._conn
    except BaseException:
        self._conn.execute("ROLLBACK")
        raise
    else:
        self._conn.execute("COMMIT")
    finally:
        self._in_outer_tx = False

store_credential(credential_set, provider, data)

Insert or replace a credential entry.

Source code in src/terok_sandbox/vault/store/db.py
def store_credential(self, credential_set: str, provider: str, data: dict) -> None:
    """Insert or replace a credential entry."""
    self._conn.execute(
        "INSERT OR REPLACE INTO credentials (credential_set, provider, data) VALUES (?, ?, ?)",
        (credential_set, provider, json.dumps(data)),
    )
    if not self._in_outer_tx:
        self._conn.commit()

load_credential(credential_set, provider)

Return the credential dict, or None if not found.

Source code in src/terok_sandbox/vault/store/db.py
def load_credential(self, credential_set: str, provider: str) -> dict | None:
    """Return the credential dict, or ``None`` if not found."""
    row = self._conn.execute(
        "SELECT data FROM credentials WHERE credential_set = ? AND provider = ?",
        (credential_set, provider),
    ).fetchone()
    return json.loads(row[0]) if row else None

list_credentials(credential_set)

Return provider names that have stored credentials.

Source code in src/terok_sandbox/vault/store/db.py
def list_credentials(self, credential_set: str) -> list[str]:
    """Return provider names that have stored credentials."""
    rows = self._conn.execute(
        "SELECT provider FROM credentials WHERE credential_set = ? ORDER BY provider",
        (credential_set,),
    ).fetchall()
    return [r[0] for r in rows]

list_credential_sets()

Return distinct credential-set names with at least one stored credential.

Source code in src/terok_sandbox/vault/store/db.py
def list_credential_sets(self) -> list[str]:
    """Return distinct credential-set names with at least one stored credential."""
    rows = self._conn.execute(
        "SELECT DISTINCT credential_set FROM credentials ORDER BY credential_set"
    ).fetchall()
    return [r[0] for r in rows]

delete_credential(credential_set, provider)

Remove a credential entry (idempotent).

Source code in src/terok_sandbox/vault/store/db.py
def delete_credential(self, credential_set: str, provider: str) -> None:
    """Remove a credential entry (idempotent)."""
    self._conn.execute(
        "DELETE FROM credentials WHERE credential_set = ? AND provider = ?",
        (credential_set, provider),
    )
    if not self._in_outer_tx:
        self._conn.commit()

store_ssh_key(key_type, private_der, public_blob, comment, fingerprint)

Register a keypair, dedup-by-fingerprint; return the ssh_keys.id.

When a row with the same fingerprint already exists the stored bytes and comment are left untouched (the caller is re-asserting an already-known key, which is expected on repeat ssh-import).

Auto-commits unless called inside a transaction() scope — in which case the outer block owns the commit.

Source code in src/terok_sandbox/vault/store/db.py
def store_ssh_key(
    self,
    key_type: str,
    private_der: bytes,
    public_blob: bytes,
    comment: str,
    fingerprint: str,
) -> int:
    """Register a keypair, dedup-by-fingerprint; return the ``ssh_keys.id``.

    When a row with the same fingerprint already exists the stored bytes
    and comment are left untouched (the caller is re-asserting an
    already-known key, which is expected on repeat ``ssh-import``).

    Auto-commits unless called inside a
    [`transaction()`][terok_sandbox.vault.store.db.CredentialDB.transaction]
    scope — in which case the outer block owns the commit.
    """
    self._conn.execute(
        "INSERT OR IGNORE INTO ssh_keys"
        " (key_type, private_der, public_blob, comment, fingerprint)"
        " VALUES (?, ?, ?, ?, ?)",
        (key_type, private_der, public_blob, comment, fingerprint),
    )
    if not self._in_outer_tx:
        self._conn.commit()
    row = self._conn.execute(
        "SELECT id FROM ssh_keys WHERE fingerprint = ?",
        (fingerprint,),
    ).fetchone()
    return row[0]

get_ssh_key_by_fingerprint(fingerprint)

Look up a key by fingerprint; returns metadata only.

Source code in src/terok_sandbox/vault/store/db.py
def get_ssh_key_by_fingerprint(self, fingerprint: str) -> SSHKeyRow | None:
    """Look up a key by fingerprint; returns metadata only."""
    row = self._conn.execute(
        "SELECT id, key_type, fingerprint, comment, created_at"
        " FROM ssh_keys WHERE fingerprint = ?",
        (fingerprint,),
    ).fetchone()
    return SSHKeyRow(*row) if row else None

set_ssh_key_comment(fingerprint, comment)

Update the comment of the key with fingerprint.

Returns True if a row was updated, False if the fingerprint is unknown. The comment is validated by the same safety helper that gates import_ssh_keypair — control characters and overlong strings raise UnsafeCommentError so the storage-entry-point invariant holds for this path too.

The new comment surfaces to subsequent ssh-add -L queries from the container because the signer resolves keys fresh from the DB on every request.

Source code in src/terok_sandbox/vault/store/db.py
def set_ssh_key_comment(self, fingerprint: str, comment: str) -> bool:
    """Update the comment of the key with *fingerprint*.

    Returns ``True`` if a row was updated, ``False`` if the fingerprint
    is unknown.  The comment is validated by the same safety helper
    that gates ``import_ssh_keypair`` — control characters and
    overlong strings raise
    [`UnsafeCommentError`][terok_sandbox.vault.store.db.UnsafeCommentError]
    so the storage-entry-point invariant holds for this path too.

    The new comment surfaces to subsequent ``ssh-add -L`` queries from
    the container because the signer resolves keys fresh from the DB
    on every request.
    """
    _require_safe_comment(comment)
    cur = self._conn.execute(
        "UPDATE ssh_keys SET comment = ? WHERE fingerprint = ?",
        (comment, fingerprint),
    )
    if not self._in_outer_tx:
        self._conn.commit()
    return bool(cur.rowcount)

assign_ssh_key(scope, key_id, *, allow_infra=False)

Grant scope access to key_id (idempotent).

Rejects unsafe scope names with InvalidScopeName — the value is later embedded in per-scope Unix-socket paths, so traversal-like strings (../, /) must not be persisted.

By default also rejects %-prefixed infrastructure scopes so callers driven by user input can't write to sandbox-reserved names (%host for the krun host-side keypair, future %name slots). Sandbox internals that legitimately provision infrastructure scopes pass allow_infra=True.

Auto-commits unless called inside a transaction() scope — in which case the outer block owns the commit.

Source code in src/terok_sandbox/vault/store/db.py
def assign_ssh_key(self, scope: str, key_id: int, *, allow_infra: bool = False) -> None:
    """Grant *scope* access to *key_id* (idempotent).

    Rejects unsafe scope names with [`InvalidScopeName`][terok_sandbox.vault.store.db.InvalidScopeName] — the
    value is later embedded in per-scope Unix-socket paths, so
    traversal-like strings (``../``, ``/``) must not be persisted.

    By default also rejects ``%``-prefixed infrastructure scopes so
    callers driven by user input can't write to sandbox-reserved
    names (``%host`` for the krun host-side keypair, future
    ``%name`` slots).  Sandbox internals that legitimately provision
    infrastructure scopes pass ``allow_infra=True``.

    Auto-commits unless called inside a
    [`transaction()`][terok_sandbox.vault.store.db.CredentialDB.transaction]
    scope — in which case the outer block owns the commit.
    """
    if allow_infra:
        _require_safe_scope(scope)
    else:
        _require_user_scope(scope)
    self._conn.execute(
        "INSERT OR IGNORE INTO ssh_key_assignments (scope, key_id) VALUES (?, ?)",
        (scope, key_id),
    )
    if not self._in_outer_tx:
        self._conn.commit()

unassign_ssh_key(scope, key_id, *, allow_infra=False)

Revoke scope's access to key_id; drop the key row if orphaned.

Refuses %-prefixed infrastructure scopes by default — pair with allow_infra=True for sandbox internals that need to decommission a reserved scope.

Source code in src/terok_sandbox/vault/store/db.py
def unassign_ssh_key(self, scope: str, key_id: int, *, allow_infra: bool = False) -> None:
    """Revoke *scope*'s access to *key_id*; drop the key row if orphaned.

    Refuses ``%``-prefixed infrastructure scopes by default — pair
    with ``allow_infra=True`` for sandbox internals that need to
    decommission a reserved scope.
    """
    if allow_infra:
        _require_safe_scope(scope)
    else:
        _require_user_scope(scope)
    cur = self._conn.execute(
        "DELETE FROM ssh_key_assignments WHERE scope = ? AND key_id = ?",
        (scope, key_id),
    )
    if cur.rowcount:
        self._conn.execute(
            "DELETE FROM ssh_keys WHERE id = ? AND NOT EXISTS ("
            "  SELECT 1 FROM ssh_key_assignments WHERE key_id = ?"
            ")",
            (key_id, key_id),
        )
    if not self._in_outer_tx:
        self._conn.commit()

replace_ssh_keys_for_scope(scope, *, keep_key_id, allow_infra=False)

Atomically make keep_key_id the scope's sole assigned key.

Wraps the "assign new + revoke every other" sequence in a single SQLite transaction so two concurrent init(force=True) calls can't both leave their own keys assigned — whichever transaction commits last wins the scope, and exactly one primary survives. Orphaned ssh_keys rows for revoked keys are cleaned up in the same step via unassign_ssh_key semantics.

Refuses %-prefixed infrastructure scopes by default; sandbox internals provisioning infra keys pass allow_infra=True.

Source code in src/terok_sandbox/vault/store/db.py
def replace_ssh_keys_for_scope(
    self, scope: str, *, keep_key_id: int, allow_infra: bool = False
) -> None:
    """Atomically make *keep_key_id* the scope's sole assigned key.

    Wraps the "assign new + revoke every other" sequence in a single
    SQLite transaction so two concurrent ``init(force=True)`` calls
    can't both leave their own keys assigned — whichever transaction
    commits last wins the scope, and exactly one primary survives.
    Orphaned ``ssh_keys`` rows for revoked keys are cleaned up in the
    same step via ``unassign_ssh_key`` semantics.

    Refuses ``%``-prefixed infrastructure scopes by default; sandbox
    internals provisioning infra keys pass ``allow_infra=True``.
    """
    if allow_infra:
        _require_safe_scope(scope)
    else:
        _require_user_scope(scope)

    def _body() -> None:
        self._conn.execute(
            "INSERT OR IGNORE INTO ssh_key_assignments (scope, key_id) VALUES (?, ?)",
            (scope, keep_key_id),
        )
        stale_ids = [
            r[0]
            for r in self._conn.execute(
                "SELECT key_id FROM ssh_key_assignments WHERE scope = ? AND key_id != ?",
                (scope, keep_key_id),
            ).fetchall()
        ]
        if stale_ids:
            # ``placeholders`` is a fixed-length string of ``?`` marks,
            # never user input — the variadic IN() clause is the reason
            # we build the SQL with f-string instead of plain params.
            placeholders = ",".join("?" * len(stale_ids))
            self._conn.execute(
                f"DELETE FROM ssh_key_assignments"  # nosec B608
                f" WHERE scope = ? AND key_id IN ({placeholders})",
                (scope, *stale_ids),
            )
            self._conn.execute(
                f"DELETE FROM ssh_keys WHERE id IN ({placeholders})"  # nosec B608
                f" AND NOT EXISTS ("
                f"  SELECT 1 FROM ssh_key_assignments WHERE key_id = ssh_keys.id"
                f")",
                tuple(stale_ids),
            )

    # Same ``_in_outer_tx`` pattern as the rest of the write methods.
    # ``with self._conn:`` is the sqlite3 connection's own auto-commit
    # context — it would clobber the outer ``BEGIN IMMEDIATE`` that
    # ``transaction()`` started.  When inside an outer scope, run the
    # body raw and let the outer block own the commit; standalone
    # callers still get the self-contained connection-managed
    # transaction they used to.
    if self._in_outer_tx:
        _body()
    else:
        with self._conn:
            _body()

unassign_all_ssh_keys(scope, *, allow_infra=False)

Revoke every key currently assigned to scope. Returns count removed.

Refuses %-prefixed infrastructure scopes by default — pair with allow_infra=True for sandbox internals.

Source code in src/terok_sandbox/vault/store/db.py
def unassign_all_ssh_keys(self, scope: str, *, allow_infra: bool = False) -> int:
    """Revoke every key currently assigned to *scope*.  Returns count removed.

    Refuses ``%``-prefixed infrastructure scopes by default — pair
    with ``allow_infra=True`` for sandbox internals.
    """
    if allow_infra:
        _require_safe_scope(scope)
    else:
        _require_user_scope(scope)
    key_ids = [
        r[0]
        for r in self._conn.execute(
            "SELECT key_id FROM ssh_key_assignments WHERE scope = ?",
            (scope,),
        ).fetchall()
    ]
    for kid in key_ids:
        self.unassign_ssh_key(scope, kid, allow_infra=allow_infra)
    return len(key_ids)

list_ssh_keys_for_scope(scope)

Return metadata rows for every key assigned to scope.

Ordered by assigned_at with k.id as a secondary key so two assignments inside the same SQLite-second (datetime('now') has 1-second resolution) sort by insert order rather than implementation-defined order. Callers that do rows[-1] to pick "the most recently assigned" get a deterministic answer even under sub-second concurrency.

Source code in src/terok_sandbox/vault/store/db.py
def list_ssh_keys_for_scope(self, scope: str) -> list[SSHKeyRow]:
    """Return metadata rows for every key assigned to *scope*.

    Ordered by ``assigned_at`` with ``k.id`` as a secondary key so
    two assignments inside the same SQLite-second (``datetime('now')``
    has 1-second resolution) sort by insert order rather than
    implementation-defined order.  Callers that do ``rows[-1]`` to
    pick "the most recently assigned" get a deterministic answer
    even under sub-second concurrency.
    """
    rows = self._conn.execute(
        "SELECT k.id, k.key_type, k.fingerprint, k.comment, k.created_at"
        " FROM ssh_keys k"
        " JOIN ssh_key_assignments a ON a.key_id = k.id"
        " WHERE a.scope = ?"
        " ORDER BY a.assigned_at, k.id",
        (scope,),
    ).fetchall()
    return [SSHKeyRow(*r) for r in rows]

load_ssh_keys_for_scope(scope)

Return full records (with raw bytes) for every key assigned to scope.

Same deterministic ordering as list_ssh_keys_for_scopeassigned_at first, then k.id as the sub-second tiebreak.

Source code in src/terok_sandbox/vault/store/db.py
def load_ssh_keys_for_scope(self, scope: str) -> list[SSHKeyRecord]:
    """Return full records (with raw bytes) for every key assigned to *scope*.

    Same deterministic ordering as
    [`list_ssh_keys_for_scope`][terok_sandbox.vault.store.db.CredentialDB.list_ssh_keys_for_scope]
    — ``assigned_at`` first, then ``k.id`` as the sub-second tiebreak.
    """
    rows = self._conn.execute(
        "SELECT k.id, k.key_type, k.private_der, k.public_blob,"
        " k.comment, k.fingerprint"
        " FROM ssh_keys k"
        " JOIN ssh_key_assignments a ON a.key_id = k.id"
        " WHERE a.scope = ?"
        " ORDER BY a.assigned_at, k.id",
        (scope,),
    ).fetchall()
    return [SSHKeyRecord(*r) for r in rows]

list_scopes_with_ssh_keys()

Return every scope that currently has at least one assigned key.

Source code in src/terok_sandbox/vault/store/db.py
def list_scopes_with_ssh_keys(self) -> list[str]:
    """Return every scope that currently has at least one assigned key."""
    rows = self._conn.execute(
        "SELECT DISTINCT scope FROM ssh_key_assignments ORDER BY scope",
    ).fetchall()
    return [r[0] for r in rows]

count_ssh_keys()

Return the number of distinct keypairs stored in the DB.

Counts ssh_keys rows (deduplicated by fingerprint) rather than ssh_key_assignments rows — a single key shared across scopes is one stored key, not N. Surfaces to TUI/CLI status consumers so they can show a count without opening the DB themselves.

Source code in src/terok_sandbox/vault/store/db.py
def count_ssh_keys(self) -> int:
    """Return the number of distinct keypairs stored in the DB.

    Counts ``ssh_keys`` rows (deduplicated by fingerprint) rather
    than ``ssh_key_assignments`` rows — a single key shared across
    scopes is one stored key, not N.  Surfaces to TUI/CLI status
    consumers so they can show a count without opening the DB
    themselves.
    """
    row = self._conn.execute("SELECT count(*) FROM ssh_keys").fetchone()
    return row[0] if row else 0

create_token(scope, subject, credential_set, provider)

Mint a phantom token bound to (scope, subject, credential_set, provider).

subject is an opaque caller-supplied correlation label — the sandbox stores it verbatim and never interprets its contents. Today terok puts the orchestrator's task id there; the sandbox treats the value as a string.

Token format: terok-p-<32 hex chars>.

Source code in src/terok_sandbox/vault/store/db.py
def create_token(self, scope: str, subject: str, credential_set: str, provider: str) -> str:
    """Mint a phantom token bound to ``(scope, subject, credential_set, provider)``.

    ``subject`` is an opaque caller-supplied correlation label — the
    sandbox stores it verbatim and never interprets its contents.
    Today terok puts the orchestrator's task id there; the sandbox
    treats the value as a string.

    Token format: ``terok-p-<32 hex chars>``.
    """
    token = f"terok-p-{secrets.token_hex(16)}"
    self._conn.execute(
        "INSERT INTO proxy_tokens (token, scope, subject, credential_set, provider)"
        " VALUES (?, ?, ?, ?, ?)",
        (token, scope, subject, credential_set, provider),
    )
    if not self._in_outer_tx:
        self._conn.commit()
    return token

lookup_token(token)

Return {scope, subject, credential_set, provider} or None.

Source code in src/terok_sandbox/vault/store/db.py
def lookup_token(self, token: str) -> dict | None:
    """Return ``{scope, subject, credential_set, provider}`` or ``None``."""
    row = self._conn.execute(
        "SELECT scope, subject, credential_set, provider FROM proxy_tokens WHERE token = ?",
        (token,),
    ).fetchone()
    if row is None:
        return None
    return {
        "scope": row[0],
        "subject": row[1],
        "credential_set": row[2],
        "provider": row[3],
    }

list_tokens()

Return every proxy-token row as a list of dicts.

Read-only inventory for operator-facing CLI inspection (terok vault list --include-tokens). The raw token value is included so the operator can cross-reference what's actually mounted into containers; callers MUST mask it before display.

Source code in src/terok_sandbox/vault/store/db.py
def list_tokens(self) -> list[dict]:
    """Return every proxy-token row as a list of dicts.

    Read-only inventory for operator-facing CLI inspection
    (``terok vault list --include-tokens``).  The raw token value
    is included so the operator can cross-reference what's actually
    mounted into containers; callers MUST mask it before display.
    """
    rows = self._conn.execute(
        "SELECT token, scope, subject, credential_set, provider"
        " FROM proxy_tokens ORDER BY scope, subject, provider, token"
    ).fetchall()
    return [
        {
            "token": r[0],
            "scope": r[1],
            "subject": r[2],
            "credential_set": r[3],
            "provider": r[4],
        }
        for r in rows
    ]

revoke_tokens(scope, subject)

Revoke every phantom token bound to (scope, subject).

Returns the number of rows removed. The sandbox makes no claim about what subject identifies; callers (the orchestrator) pass whatever opaque label they used at create_token time.

Source code in src/terok_sandbox/vault/store/db.py
def revoke_tokens(self, scope: str, subject: str) -> int:
    """Revoke every phantom token bound to ``(scope, subject)``.

    Returns the number of rows removed.  The sandbox makes no claim
    about what ``subject`` identifies; callers (the orchestrator) pass
    whatever opaque label they used at
    [`create_token`][terok_sandbox.vault.store.db.CredentialDB.create_token]
    time.
    """
    cur = self._conn.execute(
        "DELETE FROM proxy_tokens WHERE scope = ? AND subject = ?",
        (scope, subject),
    )
    if not self._in_outer_tx:
        self._conn.commit()
    return cur.rowcount

close()

Close the database connection.

Source code in src/terok_sandbox/vault/store/db.py
def close(self) -> None:
    """Close the database connection."""
    self._conn.close()

__del__()

Best-effort close on garbage collection.

Source code in src/terok_sandbox/vault/store/db.py
def __del__(self) -> None:
    """Best-effort close on garbage collection."""
    try:
        self._conn.close()
    except Exception:  # noqa: BLE001  # nosec B110 — best-effort __del__ close on GC
        pass

NoPassphraseError

Bases: RuntimeError

No SQLCipher passphrase resolved — the DB cannot be opened.

WrongPassphraseError

Bases: RuntimeError

SQLCipher could not decrypt the DB — passphrase doesn't match its encryption key.

RecoveryStatus(acknowledged, source) dataclass

Combined marker + resolved-source view for the recovery-key warning surfaces.

Returned by RecoveryStatus.load so sickbay / doctor / TUI / post-launch CLI all paint the same picture of "is the operator one reboot away from losing their vault?".

acknowledged instance-attribute

True iff the zero-byte marker file is present.

source instance-attribute

Whichever resolver tier unlocked the chain right now, or None if locked.

session_only property

True iff the passphrase lives only in the tmpfs session-unlock file.

That tier dies on the next reboot — without an off-host copy the vault becomes unrecoverable the moment the machine restarts. Severity should escalate accordingly on every surface that renders this status.

urgent property

True iff unacknowledged AND session-only (one reboot away from loss).

load(cfg=None) classmethod

Resolve marker + passphrase source for cfg (defaults if None).

Single seam for every "recovery key unconfirmed" surface — doctor, sickbay, TUI pill, post-task-launch CLI footer. Walking the resolver chain to find the source is cheap (no DB open, just tier knobs) and bundling it with the marker check here means no caller has to repeat the "is this session-only?" lookup.

Source code in src/terok_sandbox/vault/store/recovery.py
@classmethod
def load(cls, cfg: SandboxConfig | None = None) -> RecoveryStatus:
    """Resolve marker + passphrase source for *cfg* (defaults if ``None``).

    Single seam for every "recovery key unconfirmed" surface —
    doctor, sickbay, TUI pill, post-task-launch CLI footer.
    Walking the resolver chain to find the source is cheap (no DB
    open, just tier knobs) and bundling it with the marker check
    here means no caller has to repeat the "is this session-only?"
    lookup.
    """
    from ...config import SandboxConfig  # noqa: PLC0415
    from .encryption import NoPassphraseError, WrongPassphraseError  # noqa: PLC0415

    cfg = cfg or SandboxConfig()
    try:
        _passphrase, source = cfg.resolve_passphrase_with_source()
    except (NoPassphraseError, WrongPassphraseError):
        source = None
    return cls(
        acknowledged=acknowledged(cfg.vault_recovery_marker_file),
        source=source,
    )

is_acknowledged(cfg=None) staticmethod

Cheap marker-only check (no passphrase resolution).

The vault's resolver tiers (systemd-creds, keyring, session-file) are all bound to this machine, account, or boot — a hardware failure or TPM transplant strands the vault without an off-host copy of the passphrase. This check is what surfaces the "unconfirmed recovery key" warning in sickbay / doctor / the TUI pill: presence of a zero-byte marker file at vault_recovery_marker_file means the operator has acknowledged at some point. Absence (or an unreadable marker) reports False — the warning is conservative by design.

Source code in src/terok_sandbox/vault/store/recovery.py
@staticmethod
def is_acknowledged(cfg: SandboxConfig | None = None) -> bool:
    """Cheap marker-only check (no passphrase resolution).

    The vault's resolver tiers (systemd-creds, keyring,
    session-file) are all bound to *this* machine, account, or
    boot — a hardware failure or TPM transplant strands the vault
    without an off-host copy of the passphrase.  This check is
    what surfaces the "unconfirmed recovery key" warning in
    sickbay / doctor / the TUI pill: presence of a zero-byte
    marker file at
    [`vault_recovery_marker_file`][terok_sandbox.SandboxConfig.vault_recovery_marker_file]
    means the operator has acknowledged at some point.  Absence
    (or an unreadable marker) reports ``False`` — the warning is
    conservative by design.
    """
    from ...config import SandboxConfig  # noqa: PLC0415

    cfg = cfg or SandboxConfig()
    return acknowledged(cfg.vault_recovery_marker_file)

acknowledge(cfg=None) staticmethod

Mark the recovery key as saved (writes the zero-byte sidecar marker).

Always succeeds — the marker is independent of the passphrase resolver, so a locked vault doesn't block acknowledgement. Idempotent; safe to call on an already-acknowledged vault.

Source code in src/terok_sandbox/vault/store/recovery.py
@staticmethod
def acknowledge(cfg: SandboxConfig | None = None) -> None:
    """Mark the recovery key as saved (writes the zero-byte sidecar marker).

    Always succeeds — the marker is independent of the passphrase
    resolver, so a locked vault doesn't block acknowledgement.
    Idempotent; safe to call on an already-acknowledged vault.
    """
    from ...config import SandboxConfig  # noqa: PLC0415

    cfg = cfg or SandboxConfig()
    acknowledge(cfg.vault_recovery_marker_file)

bold(text)

Return text wrapped in ANSI bold when supports_color is true.

Source code in src/terok_sandbox/_stage.py
def bold(text: str) -> str:
    """Return *text* wrapped in ANSI bold when [`supports_color`][terok_sandbox._stage.supports_color] is true."""
    return _color(text, "1")

red(text)

Return text wrapped in ANSI red for failure banners when colour is on.

Source code in src/terok_sandbox/_stage.py
def red(text: str) -> str:
    """Return *text* wrapped in ANSI red for failure banners when colour is on."""
    return _color(text, "31")

stage_line(label)

Return a StageLine context manager for progressive rendering.

Thin factory so the call site reads with stage_line("Vault") as s: rather than the class name.

Source code in src/terok_sandbox/_stage.py
def stage_line(label: str) -> StageLine:
    """Return a [`StageLine`][terok_sandbox._stage.StageLine] context manager for progressive rendering.

    Thin factory so the call site reads ``with stage_line("Vault") as
    s:`` rather than the class name.
    """
    return StageLine(label)

yellow(text)

Return text wrapped in ANSI yellow for warning banners when colour is on.

Source code in src/terok_sandbox/_stage.py
def yellow(text: str) -> str:
    """Return *text* wrapped in ANSI yellow for warning banners when colour is on."""
    return _color(text, "33")

check_apparmor_status()

Evaluate whether the dnsmasq AppArmor addendum is needed or installed.

File-based and unprivileged: an AppArmor-enabled host with dnsmasq and a stock dnsmasq profile but no terok addendum is PROFILE_MISSING; everything else is NOT_APPLICABLE or OK.

Source code in src/terok_sandbox/_util/_apparmor.py
def check_status() -> AppArmorCheckResult:
    """Evaluate whether the dnsmasq AppArmor addendum is needed or installed.

    File-based and unprivileged: an AppArmor-enabled host with dnsmasq and
    a stock dnsmasq profile but no terok addendum is ``PROFILE_MISSING``;
    everything else is ``NOT_APPLICABLE`` or ``OK``.
    """
    if not is_apparmor_enabled() or shutil.which("dnsmasq") is None:
        return AppArmorCheckResult(AppArmorStatus.NOT_APPLICABLE)
    profile = _dnsmasq_profile()
    if profile is None:
        return AppArmorCheckResult(AppArmorStatus.NOT_APPLICABLE)
    if _addendum_installed(profile):
        return AppArmorCheckResult(AppArmorStatus.OK)
    return AppArmorCheckResult(AppArmorStatus.PROFILE_MISSING)

apparmor_install_command(state_root)

Return the sudo bash <script> <state_root> installer invocation.

state_root is the sandbox-live root whose tasks/*/*/shield tree the rendered profile must permit. The caller supplies it because the script runs under sudo and cannot resolve the operator's home.

Source code in src/terok_sandbox/_util/_apparmor.py
def install_command(state_root: Path) -> str:
    """Return the ``sudo bash <script> <state_root>`` installer invocation.

    *state_root* is the sandbox-live root whose ``tasks/*/*/shield`` tree
    the rendered profile must permit.  The caller supplies it because the
    script runs under ``sudo`` and cannot resolve the operator's home.
    """
    return f"sudo bash {install_script_path()} {state_root}"

apparmor_install_script() cached

Return the path to the bundled install_profile.sh AppArmor installer.

Installation is delegated to this short, inspectable shell script — run with sudo bash <path> <state_root> — so it can be cat-ed and audited before the privilege escalation.

Source code in src/terok_sandbox/_util/_apparmor.py
@lru_cache(maxsize=1)
def install_script_path() -> Path:
    """Return the path to the bundled ``install_profile.sh`` AppArmor installer.

    Installation is delegated to this short, inspectable shell script —
    run with ``sudo bash <path> <state_root>`` — so it can be ``cat``-ed
    and audited before the privilege escalation.
    """
    return Path(str(_resource_files("terok_sandbox.resources.apparmor") / "install_profile.sh"))

check_selinux_status(*, services_mode)

Evaluate SELinux readiness for socket-transport services.

services_mode is the caller's configured transport (tcp or socket) — passed in rather than read from sandbox config so the helper stays free of cross-package config plumbing. Consumers (terok setup, terok sickbay) call terok_sandbox.config.services_mode themselves.

Source code in src/terok_sandbox/_util/_selinux.py
def check_status(*, services_mode: str) -> SelinuxCheckResult:
    """Evaluate SELinux readiness for socket-transport services.

    *services_mode* is the caller's configured transport (``tcp`` or
    ``socket``) — passed in rather than read from sandbox config so the
    helper stays free of cross-package config plumbing.  Consumers
    (``terok setup``, ``terok sickbay``) call
    [`terok_sandbox.config.services_mode`][terok_sandbox.config.services_mode] themselves.
    """
    if services_mode != "socket":
        return SelinuxCheckResult(SelinuxStatus.NOT_APPLICABLE_TCP_MODE)
    if not is_selinux_enforcing():
        return SelinuxCheckResult(SelinuxStatus.NOT_APPLICABLE_PERMISSIVE)
    if not is_policy_installed():
        return SelinuxCheckResult(
            SelinuxStatus.POLICY_MISSING,
            missing_policy_tools=tuple(missing_policy_tools()),
        )
    if not is_libselinux_available():
        return SelinuxCheckResult(SelinuxStatus.LIBSELINUX_MISSING)
    if is_supervisor_socket_rule_loaded() is False:
        # Type is present but the supervisor's container_runtime_t rule
        # isn't — a pre-supervisor (v1.0) policy still loaded.  Re-running
        # the installer rebuilds it; surface the same tool prerequisites.
        return SelinuxCheckResult(
            SelinuxStatus.POLICY_OUTDATED,
            missing_policy_tools=tuple(missing_policy_tools()),
        )
    return SelinuxCheckResult(SelinuxStatus.OK)

selinux_install_command()

Return the full sudo bash <path> shell command for the installer.

Single source for the command string so the setup hint, the sickbay check, and any future caller all render the same invocation.

Source code in src/terok_sandbox/_util/_selinux.py
def install_command() -> str:
    """Return the full ``sudo bash <path>`` shell command for the installer.

    Single source for the command string so the setup hint, the sickbay
    check, and any future caller all render the same invocation.
    """
    return f"sudo bash {install_script_path()}"

selinux_install_script() cached

Return the path to the bundled install_policy.sh installer.

Installation is delegated to this short, inspectable shell script — which users run with sudo bash <path> — rather than a Python wrapper. Running Python as root imports a large dependency graph; a dedicated shell script can be cat-ed and audited in seconds before the privilege escalation.

Source code in src/terok_sandbox/_util/_selinux.py
@lru_cache(maxsize=1)
def install_script_path() -> Path:
    """Return the path to the bundled ``install_policy.sh`` installer.

    Installation is delegated to this short, inspectable shell script —
    which users run with ``sudo bash <path>`` — rather than a Python
    wrapper.  Running Python as root imports a large dependency graph;
    a dedicated shell script can be ``cat``-ed and audited in seconds
    before the privilege escalation.
    """
    return Path(str(_resource_files("terok_sandbox.resources.selinux") / "install_policy.sh"))

yaml_update_section(path, section, updates)

Merge updates into data[section] at path, preserving comments.

Source code in src/terok_sandbox/_yaml.py
def update_section(path: Path, section: str, updates: dict[str, Any]) -> None:
    """Merge *updates* into ``data[section]`` at *path*, preserving comments."""
    yaml = YAML(typ="rt")
    yaml.preserve_quotes = True
    if path.exists():
        existing = yaml.load(path.read_text(encoding="utf-8")) or {}
    else:
        path.parent.mkdir(parents=True, exist_ok=True, mode=0o700)
        existing = {}
    if not isinstance(existing, dict):
        raise ValueError(
            f"{path} top-level is {type(existing).__name__}, expected a mapping;"
            " refusing to silently overwrite — fix or move aside the file by hand"
        )
    # ``setdefault`` returns whatever sits at the key — a stale scalar
    # written by a previous schema version would explode on ``.update``.
    if not isinstance(existing.get(section), dict):
        existing[section] = {}
    existing[section].update(updates)
    buf = StringIO()
    yaml.dump(existing, buf)
    write_secret_text(path, buf.getvalue())

sandbox_uninstall(*, no_shield=False, cfg=None)

Tear down the stack in reverse install order.

Losing supervisor hooks mid-flight is recoverable, but losing shield hooks while containers are live is the most disruptive — shield goes last so live containers stay firewalled as long as possible.

Best-effort across phases: a failing phase reports the error and the next phase runs anyway, so a partial-install teardown still removes what it can instead of leaving orphans behind. Exits non-zero only after every phase has had its attempt.

The git gate has no host-side install, so there is no gate uninstall phase — the legacy sweep removes any pre-supervisor gate units.

Source code in src/terok_sandbox/commands/sandbox.py
def _handle_sandbox_uninstall(
    *,
    no_shield: bool = False,
    cfg: SandboxConfig | None = None,
) -> None:
    """Tear down the stack in reverse install order.

    Losing supervisor hooks mid-flight is recoverable, but losing shield
    hooks while containers are live is the most disruptive — shield goes
    last so live containers stay firewalled as long as possible.

    Best-effort across phases: a failing phase reports the error and
    the next phase runs anyway, so a partial-install teardown still
    removes what it can instead of leaving orphans behind.  Exits
    non-zero only after every phase has had its attempt.

    The git gate has no host-side install, so there is no gate uninstall
    phase — the legacy sweep removes any pre-supervisor gate units.
    """
    from ..setup_stamp import clear_stamp

    # ``cfg`` is accepted for handler-dispatch uniformity but unused: the
    # teardown phases discover their own paths and take no config.
    del cfg

    print("Services:")

    failed = False
    # Supervisor hooks come down first so a slow uninstall on lower
    # layers can't surprise-fire a still-installed OCI hook.
    failed |= not run_supervisor_uninstall_phase()
    if not no_shield:
        failed |= not run_shield_uninstall_phase()
    # Legacy-install sweep also runs at uninstall so a host that's
    # being decommissioned doesn't leave pre-supervisor systemd units
    # behind for a future operator to puzzle over.
    failed |= not run_legacy_install_cleanup_phase()

    if clear_stamp():
        print("→ setup stamp removed")
    if failed:
        raise SystemExit(1)

handle_vault_seal(*, cfg=None, key='auto')

Seal the credentials-DB passphrase into a systemd-creds credential.

Adds the systemd-creds tier to the resolution chain: machine-bound (TPM2 + host key, or either alone), survives reboot, no OS keyring required. After sealing, every new supervisor resolves the passphrase via systemd-creds decrypt on start — no operator interaction needed at boot, no plaintext-on-disk.

Requires an already-resolvable passphrase — typically from a fresh vault unlock in the current session.

Source code in src/terok_sandbox/commands/vault.py
def handle_vault_seal(*, cfg: SandboxConfig | None = None, key: str = "auto") -> None:
    """Seal the credentials-DB passphrase into a systemd-creds credential.

    Adds the systemd-creds tier to the resolution chain: machine-bound
    (TPM2 + host key, or either alone), survives reboot, no OS
    keyring required.  After sealing, every new supervisor resolves the
    passphrase via ``systemd-creds decrypt`` on start — no operator
    interaction needed at boot, no plaintext-on-disk.

    Requires an already-resolvable passphrase — typically from a fresh
    ``vault unlock`` in the current session.
    """
    from ..vault.store import systemd_creds
    from ..vault.store.encryption import WrongPassphraseError

    if cfg is None:
        cfg = SandboxConfig()

    if not systemd_creds.is_available():
        raise SystemExit(
            "systemd-creds unavailable: needs systemd ≥ 257 with the Varlink"
            " io.systemd.Credentials interface (Fedora ≥ 42, Debian ≥ 13)"
        )

    key_mode = _SEAL_KEY_MODES.get(key)
    if key_mode is None:
        choices = ", ".join(sorted(_SEAL_KEY_MODES))
        raise SystemExit(f"unknown --key value: {key!r} (expected one of: {choices})")

    # A prompt here would accept a freshly-typed value and seal *that*,
    # leaving the next chain walk holding a key that doesn't open the DB.
    try:
        passphrase = cfg.resolve_passphrase()
    except WrongPassphraseError as exc:
        raise SystemExit(f"cannot seal: {exc}") from exc
    if passphrase is None:
        raise SystemExit("no current passphrase to seal — run `terok-sandbox vault unlock` first")

    try:
        systemd_creds.seal(passphrase, cfg.vault_systemd_creds_file, key_mode=key_mode)
    except RuntimeError as exc:
        # ``tpm2`` requested on a TPM-less host surfaces as a CalledProcessError
        # bubbled to RuntimeError — pass it through with the hint attached.
        raise SystemExit(str(exc)) from exc

    print(f"→ sealed passphrase to {cfg.vault_systemd_creds_file} (--with-key={key_mode})")
    print(
        "  the resolution chain will pick this up the next time a supervisor"
        " starts; no restart required"
    )

handle_vault_to_keyring(*, cfg=None)

Move the current passphrase from its current tier into the OS keyring.

Resolves the passphrase via the chain (or prompts as a last resort), writes it to the keyring, flips credentials.use_keyring to true in config.yml, clears any plaintext credentials.passphrase / credentials.passphrase_command wiring, and removes the session-file and sealed systemd-creds copies.

The validate-before-destroy ordering is deliberate: if the keyring write fails, the source tier is still intact.

Source code in src/terok_sandbox/commands/vault.py
def handle_vault_to_keyring(*, cfg: SandboxConfig | None = None) -> None:
    """Move the current passphrase from its current tier into the OS keyring.

    Resolves the passphrase via the chain (or prompts as a last resort),
    writes it to the keyring, flips ``credentials.use_keyring`` to true
    in ``config.yml``, clears any plaintext ``credentials.passphrase`` /
    ``credentials.passphrase_command`` wiring, and removes the
    session-file and sealed systemd-creds copies.

    The validate-before-destroy ordering is deliberate: if the keyring
    write fails, the source tier is still intact.
    """
    from .. import config as _config
    from ..vault.store.encryption import (
        WrongPassphraseError,
        store_passphrase_in_keyring,
    )

    if cfg is None:
        cfg = SandboxConfig()

    try:
        passphrase, source = cfg.resolve_passphrase_with_source(prompt_on_tty=True)
    except WrongPassphraseError as exc:
        raise SystemExit(f"cannot move to keyring: {exc}") from exc

    if not passphrase:
        raise SystemExit("no current passphrase resolvable; run `terok-sandbox vault unlock` first")
    if source == "keyring":
        print("→ passphrase is already in the keyring; nothing to do")
        return

    if not store_passphrase_in_keyring(passphrase):
        raise SystemExit("OS keyring is unreachable or denied; aborting (nothing was changed)")
    print(f"→ stored passphrase in keyring (was: {source})")

    # Switch the config's tier wiring atomically: flip use_keyring on,
    # drop the plaintext + helper fallbacks so the chain can't re-resolve
    # via a stale lower tier.
    from ..paths import config_file_paths

    user_config = next((p for label, p in config_file_paths() if label == "user"), None)
    if user_config is not None:
        # nosec: B105 — clearing config keys to None, not hardcoding secrets
        updates = {  # nosec: B105
            "use_keyring": True,
            "passphrase": None,  # nosec: B105
            "passphrase_command": None,  # nosec: B105
        }
        _yaml_update_section(user_config, "credentials", updates)
        _config._credentials_section.cache_clear()
        print(f"→ updated {user_config} (use_keyring: true, plaintext fields cleared)")

    # Remove the old tier's persistent copy.  Session file is removed
    # because the chain prefers it over keyring; sealed systemd-creds
    # likewise outranks keyring on the resolution order.
    for stale in (cfg.vault_passphrase_file, cfg.vault_systemd_creds_file):
        if stale.exists():
            stale.unlink()
            print(f"→ removed {sanitize_tty(str(stale))}")

gate_use_personal_ssh_default()

Resolve the host gate's ssh.use_personal global default.

Reads the ssh: section from the shared config.yml, validates via RawSSHSection, and returns the bool. An unset section, a missing key, or a malformed value collapses to False — the safe historical default ("terok never touches your real keys").

Higher layers compose this with project-level and per-invocation overrides; the resolution chain ends up:

CLI ``--use-personal-ssh``     (highest)
project ``project.yml`` ssh
global ``config.yml`` ssh      ← THIS function
False                          (default)

Lives in sandbox because the consumer (_git_env_with_ssh) is here too — same package owns the schema and the reader.

Source code in src/terok_sandbox/config_schema.py
def gate_use_personal_ssh_default() -> bool:
    """Resolve the host gate's ``ssh.use_personal`` global default.

    Reads the ``ssh:`` section from the shared ``config.yml``, validates
    via [`RawSSHSection`][terok_sandbox.config_schema.RawSSHSection], and returns the bool.  An unset section,
    a missing key, or a malformed value collapses to ``False`` — the
    safe historical default ("terok never touches your real keys").

    Higher layers compose this with project-level and per-invocation
    overrides; the resolution chain ends up:

        CLI ``--use-personal-ssh``     (highest)
        project ``project.yml`` ssh
        global ``config.yml`` ssh      ← THIS function
        False                          (default)

    Lives in sandbox because the consumer
    (`_git_env_with_ssh`) is here too —
    same package owns the schema and the reader.
    """
    from .paths import read_config_section

    raw = read_config_section("ssh")
    if not raw:
        return False
    try:
        section = RawSSHSection.model_validate(raw)
    except Exception:  # noqa: BLE001 — malformed config falls back to safe default
        return False
    return bool(section.use_personal)

sandbox_doctor_checks(*, token_broker_port=None, ssh_signer_port=None, desired_shield_state=None)

Return sandbox-level health checks for in-container diagnostics.

Parameters:

Name Type Description Default
token_broker_port int | None

Token broker TCP port (skip check if None).

None
ssh_signer_port int | None

SSH signer TCP port (skip check if None).

None
desired_shield_state str | None

Expected shield state from shield_desired_state file ("up", "down", "disengaged", or None to skip).

None

Returns:

Type Description
list[DoctorCheck]

List of DoctorCheck instances ready for orchestration.

Source code in src/terok_sandbox/doctor.py
def sandbox_doctor_checks(
    *,
    token_broker_port: int | None = None,
    ssh_signer_port: int | None = None,
    desired_shield_state: str | None = None,
) -> list[DoctorCheck]:
    """Return sandbox-level health checks for in-container diagnostics.

    Args:
        token_broker_port: Token broker TCP port (skip check if ``None``).
        ssh_signer_port: SSH signer TCP port (skip check if ``None``).
        desired_shield_state: Expected shield state from ``shield_desired_state``
            file (``"up"``, ``"down"``, ``"disengaged"``, or ``None`` to skip).

    Returns:
        List of [`DoctorCheck`][terok_sandbox.doctor.DoctorCheck] instances ready for orchestration.
    """
    checks: list[DoctorCheck] = [
        _make_vault_unlocked_check(),
        _make_plaintext_passphrase_warning_check(),
    ]
    if token_broker_port is not None:
        checks.append(_make_token_broker_check(token_broker_port))
    if ssh_signer_port is not None:
        checks.append(_make_ssh_signer_check(ssh_signer_port))
    checks.append(_make_shield_check(desired_shield_state))
    return checks

is_ssh_url(url)

Return True for SSH-scheme git URLs.

Accepts the two forms git itself accepts:

  • ssh://[user@]host[:port]/path — explicit URL scheme.
  • [user@]host:path — scp-style shorthand. The user part is optional (git@github.com:foo.git, deploy@host:repo.git, bare github.com:foo.git).

Shared with terok-main: both the gate's env builder and callers that branch on "does this project use SSH?" (e.g. deploy-key prompts, gate-sync fallback hints) must agree on one definition.

Source code in src/terok_sandbox/gate/mirror.py
def is_ssh_url(url: str | None) -> bool:
    """Return ``True`` for SSH-scheme git URLs.

    Accepts the two forms git itself accepts:

    - ``ssh://[user@]host[:port]/path`` — explicit URL scheme.
    - ``[user@]host:path`` — scp-style shorthand.  The user part is
      optional (``git@github.com:foo.git``, ``deploy@host:repo.git``,
      bare ``github.com:foo.git``).

    Shared with terok-main: both the gate's env builder and callers that
    branch on "does this project use SSH?" (e.g. deploy-key prompts,
    gate-sync fallback hints) must agree on one definition.
    """
    if not url:
        return False
    candidate = url.strip()
    lowered = candidate.lower()
    if lowered.startswith("ssh://"):
        return True
    if "://" in candidate:
        return False
    return bool(_SCP_SSH_RE.match(candidate))

mint_gate_token()

Generate a fresh 128-bit hex gate token.

Uses secrets.token_hex(16) for cryptographic randomness. The supervisor validates this single token directly via _SingleTokenStore, so there is nothing to persist.

Source code in src/terok_sandbox/gate/tokens.py
def mint_gate_token() -> str:
    """Generate a fresh 128-bit hex gate token.

    Uses ``secrets.token_hex(16)`` for cryptographic randomness.  The
    supervisor validates this single token directly via
    `_SingleTokenStore`,
    so there is nothing to persist.
    """
    return f"terok-g-{secrets.token_hex(16)}"

check_environment(cfg=None)

Probe the podman environment with no task context.

Returns a synthetic EnvironmentCheck when shield_bypass is set; otherwise constructs a throwaway ShieldManager bound to a temp directory and delegates to its check_environment. Kept as a free function because the setup CLI runs before any task directory exists.

Source code in src/terok_sandbox/integrations/shield.py
def check_environment(cfg: SandboxConfig | None = None) -> EnvironmentCheck:
    """Probe the podman environment with no task context.

    Returns a synthetic [`EnvironmentCheck`][terok_shield.EnvironmentCheck]
    when ``shield_bypass`` is set; otherwise constructs a throwaway
    [`ShieldManager`][terok_sandbox.integrations.shield.ShieldManager]
    bound to a temp directory and delegates to its
    [`check_environment`][terok_sandbox.integrations.shield.ShieldManager.check_environment].
    Kept as a free function because the setup CLI runs before any
    task directory exists.
    """
    with tempfile.TemporaryDirectory() as tmp:
        return ShieldManager(Path(tmp), cfg).check_environment()

allocate_per_container_resources(cfg, container)

Compute per-container paths + (for TCP mode) ports.

Both transport modes get a per-container directory under cfg.runtime_dir/run/<container> (mode 0700) that the caller bind-mounts at /run/terok/ inside the container. In TCP mode, two free ports are claimed via bind(0) + getsockname + close so each container gets its own pair instead of fighting over the singleton from cfg.

The narrow window between bind(0)'s close and the supervisor's re-bind on the same port is an EADDRINUSE-loud failure mode, not silent breakage.

Source code in src/terok_sandbox/launch.py
def allocate_per_container_resources(cfg: SandboxConfig, container: str) -> PerContainerResources:
    """Compute per-container paths + (for TCP mode) ports.

    Both transport modes get a per-container directory under
    ``cfg.runtime_dir/run/<container>`` (mode 0700) that the caller
    bind-mounts at ``/run/terok/`` inside the container.  In TCP mode,
    two free ports are claimed via ``bind(0)`` + ``getsockname`` +
    close so each container gets its own pair instead of fighting
    over the singleton from ``cfg``.

    The narrow window between ``bind(0)``'s close and the supervisor's
    re-bind on the same port is an EADDRINUSE-loud failure mode, not
    silent breakage.
    """
    container_runtime_dir = cfg.runtime_dir / "run" / container
    container_runtime_dir.mkdir(parents=True, exist_ok=True)
    container_runtime_dir.chmod(0o700)

    if cfg.services_mode != "tcp":
        return PerContainerResources(
            container_runtime_dir=container_runtime_dir,
            token_broker_port=None,
            ssh_signer_port=None,
            gate_port=None,
        )

    # Allocate all three ports against open sockets *simultaneously* —
    # consecutive ``bind(0)`` + close pairs can legitimately hand back
    # the same port (the kernel is free to reuse the just-freed slot
    # before the next call) and that would crash one of the services on
    # startup with ``EADDRINUSE``.
    broker_port, signer_port, gate_port = _pick_free_tcp_ports(3)
    return PerContainerResources(
        container_runtime_dir=container_runtime_dir,
        token_broker_port=broker_port,
        ssh_signer_port=signer_port,
        gate_port=gate_port,
    )

claim_port(service_key, preferred=None, *, explicit=False)

Claim one port via the default registry.

Source code in src/terok_sandbox/port_registry.py
def claim_port(
    service_key: str,
    preferred: int | None = None,
    *,
    explicit: bool = False,
) -> int:
    """Claim one port via the default registry."""
    return _default.claim(service_key, preferred, explicit=explicit)

release_port(service_key)

Release a previously claimed port via the default registry.

Source code in src/terok_sandbox/port_registry.py
def release_port(service_key: str) -> None:
    """Release a previously claimed port via the default registry."""
    _default.release(service_key)

check_gpu_available()

Return True when a CDI spec declares the nvidia.com/gpu kind.

Wizards call this to decide whether to offer the NVIDIA base image; the on-launch check_gpu_error path is the authoritative one and stays in place. Any failure (missing podman, missing CDI dirs, unreadable spec) collapses to False so callers can treat this as a pure yes/no signal.

Source code in src/terok_sandbox/runtime/podman.py
def check_gpu_available() -> bool:
    """Return ``True`` when a CDI spec declares the ``nvidia.com/gpu`` kind.

    Wizards call this to decide whether to offer the NVIDIA base image;
    the on-launch [`check_gpu_error`][terok_sandbox.runtime.podman.check_gpu_error]
    path is the authoritative one and stays in place.  Any failure
    (missing podman, missing CDI dirs, unreadable spec) collapses to
    ``False`` so callers can treat this as a pure yes/no signal.
    """
    return any(_NVIDIA_GPU_KIND in _safe_read(p) for p in _cdi_spec_paths())

podman_port_resolver(*, guest_port=DEFAULT_GUEST_SSHD_PORT, host=DEFAULT_SSH_HOST)

Return a resolver that reads the forwarded host port via podman port.

The orchestrator launches the container with -p <reserved>:22; podman already records that mapping in its own metadata, so this resolver just asks for it back — no terok-private annotation in the middle. podman port <name> <guest_port>/tcp emits a single <host_ip>:<host_port> line per matching mapping, which is exactly what we need.

The resolved host is overridden to host (loopback by default) so the SSH connect goes through 127.0.0.1 even when pasta bound the forward to 0.0.0.0; trusting whatever podman reports would open the door to reaching the guest via a routable interface.

Source code in src/terok_sandbox/runtime/krun_transport.py
def podman_port_resolver(
    *,
    guest_port: int = DEFAULT_GUEST_SSHD_PORT,
    host: str = DEFAULT_SSH_HOST,
) -> Callable[[Container], TcpEndpoint]:
    """Return a resolver that reads the forwarded host port via ``podman port``.

    The orchestrator launches the container with ``-p <reserved>:22``;
    podman already records that mapping in its own metadata, so this
    resolver just asks for it back — no terok-private annotation in the
    middle.  ``podman port <name> <guest_port>/tcp`` emits a single
    ``<host_ip>:<host_port>`` line per matching mapping, which is
    exactly what we need.

    The resolved host is overridden to *host* (loopback by default) so
    the SSH connect goes through ``127.0.0.1`` even when pasta bound
    the forward to ``0.0.0.0``; trusting whatever podman reports would
    open the door to reaching the guest via a routable interface.
    """

    def _resolve(container: Container) -> TcpEndpoint:
        # ``--`` ends podman's own option parsing, so a container handle
        # carrying a leading-dash name can't be reinterpreted as a flag.
        argv = ["podman", "port", "--", container.name, f"{guest_port}/tcp"]
        # A short timeout keeps the resolver from blocking forever on a
        # wedged podman (daemon trouble, NFS-backed storage stall):
        # ``podman port`` is a metadata read, so 5 s is generous.  Raise
        # ``RuntimeError`` for every failure mode so callers see one
        # exception type across "no mapping", "unparseable output", and
        # "podman didn't answer".
        try:
            out = subprocess.check_output(  # nosec B603 B607 — argv built from fixed verbs + caller-controlled scope/container names — binary PATH lookup is the cross-distro contract
                argv,
                text=True,
                timeout=_RESOLVER_PORT_TIMEOUT_S,
            ).strip()
        except subprocess.CalledProcessError as exc:
            raise RuntimeError(
                f"podman port failed for container {container.name!r}: {exc} — "
                f"no ``-p HOST:{guest_port}`` mapping at launch?"
            ) from exc
        except subprocess.TimeoutExpired as exc:
            raise RuntimeError(
                f"podman port timed out after {_RESOLVER_PORT_TIMEOUT_S}s "
                f"resolving forwarded port for container {container.name!r} — "
                "podman daemon stuck or storage backend stalled"
            ) from exc
        if not out:
            raise RuntimeError(
                f"container {container.name!r} has no {guest_port}/tcp port mapping — "
                f"the orchestrator must launch with ``-p HOST:{guest_port}``"
            )
        # Take the first mapping line; podman emits one per binding (it
        # would only emit several if the operator added extra ``-p`` for
        # the same guest port).  ``rpartition`` lets the host-ip side
        # contain colons (IPv6 literals) without us having to special-case.
        first_line = out.splitlines()[0]
        _, sep, port_str = first_line.rpartition(":")
        if not sep:
            raise RuntimeError(
                f"container {container.name!r} podman-port output {first_line!r} "
                f"doesn't look like ``<host>:<port>``"
            )
        try:
            port = int(port_str)
        except ValueError as exc:
            raise RuntimeError(
                f"container {container.name!r} podman-port output {first_line!r} "
                f"has non-integer port: {port_str!r}"
            ) from exc
        # ``TcpEndpoint.__post_init__`` does the range check.
        try:
            return TcpEndpoint(port=port, host=host)
        except ValueError as exc:
            raise RuntimeError(
                f"container {container.name!r} has invalid forwarded port {port}: {exc}"
            ) from exc

    return _resolve

installed_versions()

Return {package: version} for every tracked package present in the install.

Missing packages are silently dropped — a standalone terok-sandbox install doesn't have terok available, and that's fine. The invariant we check on the read side is that every package the stamp knows about is also installed (and at the right version).

Source code in src/terok_sandbox/setup_stamp.py
def installed_versions() -> dict[str, str]:
    """Return ``{package: version}`` for every tracked package present in the install.

    Missing packages are silently dropped — a standalone ``terok-sandbox``
    install doesn't have ``terok`` available, and that's fine.  The
    invariant we check on the read side is that every package the *stamp*
    knows about is also installed (and at the right version).
    """
    out: dict[str, str] = {}
    for pkg in _TRACKED_PACKAGES:
        with contextlib.suppress(PackageNotFoundError):
            out[pkg] = _meta_version(pkg)
    return out

needs_setup()

Compare the on-disk stamp against currently-installed package versions.

See SetupVerdict for the five possible outcomes. Designed to be cheap enough to call on every TUI startup.

Source code in src/terok_sandbox/setup_stamp.py
def needs_setup() -> SetupVerdict:
    """Compare the on-disk stamp against currently-installed package versions.

    See [`SetupVerdict`][terok_sandbox.setup_stamp.SetupVerdict] for the five possible outcomes.  Designed
    to be cheap enough to call on every TUI startup.
    """
    path = stamp_path()
    if not path.exists():
        return SetupVerdict.FIRST_RUN
    if not path.is_file():
        # Something at the stamp location, but not a regular file — a
        # directory or device left there by a misbehaving sync tool.
        # That's not "user hasn't run setup" (FIRST_RUN); it's a corrupt
        # state the next ``write_stamp`` would also fail on.
        return SetupVerdict.STAMP_CORRUPT

    try:
        stamped = read_stamp(path)
    except (OSError, json.JSONDecodeError, ValueError):
        return SetupVerdict.STAMP_CORRUPT

    installed = installed_versions()
    return _compare(stamped, installed)

read_stamp(path)

Parse the stamp file, returning the packages mapping.

Raises ValueError if the schema version doesn't match — a schema bump should be handled explicitly, not silently coerced.

Source code in src/terok_sandbox/setup_stamp.py
def read_stamp(path: Path) -> dict[str, str]:
    """Parse the stamp file, returning the ``packages`` mapping.

    Raises [`ValueError`][ValueError] if the schema version doesn't match — a
    schema bump should be handled explicitly, not silently coerced.
    """
    raw = json.loads(path.read_text(encoding="utf-8"))
    if not isinstance(raw, dict):
        raise ValueError(f"stamp root is not an object: {type(raw).__name__}")
    if raw.get("version") != _STAMP_SCHEMA_VERSION:
        raise ValueError(f"unsupported stamp schema version: {raw.get('version')!r}")
    pkgs = raw.get("packages")
    if not isinstance(pkgs, dict):
        raise ValueError(f"stamp packages is not an object: {type(pkgs).__name__}")
    # Coerce values to str so a malformed stamp with an int can't sneak past.
    return {str(k): str(v) for k, v in pkgs.items()}

stamp_path()

Return the canonical on-disk location of the setup stamp.

Honours the umbrella paths.root resolver so a user who relocates the state tree (paths.root: /virt/terok in config.yml) sees the stamp move with it — same place every package would look.

Source code in src/terok_sandbox/setup_stamp.py
def stamp_path() -> Path:
    """Return the canonical on-disk location of the setup stamp.

    Honours the umbrella ``paths.root`` resolver so a user who relocates
    the state tree (``paths.root: /virt/terok`` in ``config.yml``) sees
    the stamp move with it — same place every package would look.
    """
    return namespace_state_dir() / _STAMP_FILENAME

ensure_infra_keypair(scope, *, db, comment=None, key_type='ed25519')

Load or generate the %scope infrastructure keypair.

The single place sandbox-internal callers go for the load-or-mint dance:

  1. If scope already has an assigned key, re-serialise it as OpenSSH PEM + render the public line and return.
  2. Otherwise mint a fresh keypair, persist it under scope with assign_ssh_key(..., allow_infra=True), and return the same shape.

Only accepts %-prefixed scopes (the infrastructure form the DB-layer safe-scope validator recognises) — user scopes go through the normal ssh init / import_ssh_keypair paths.

The load-or-mint sequence runs inside a single db.transaction() so two concurrent callers can't both observe "empty" and both proceed to mint. Trust model: the returned private_pem is plaintext key material; possession of an unlocked CredentialDB is already operator-equivalent in this design, so callers with a DB handle can read any infra key. Callers MUST NOT log, serialise, or otherwise persist private_pem outside the intended consumer (e.g. ssh -i file or in-process signer). The keypair material is intended for sandbox-owned services that need a stable host-side identity (krun %host, future infrastructure slots); user-controlled code never goes through this helper.

Parameters:

Name Type Description Default
scope str

"%name" infrastructure scope. Validated structurally by the DB layer and refused here if it doesn't start with %.

required
db CredentialDB

Open CredentialDB — caller manages the lifetime.

required
comment str | None

Comment to embed in the public line on fresh generation. Ignored when the keypair already exists (existing comment is preserved). Defaults to "terok-infra:<scope>".

None
key_type str

"ed25519" (default) or "rsa".

'ed25519'

Returns:

Type Description
InfraKeypair
InfraKeypair

with the OpenSSH PEM private + public line.

Source code in src/terok_sandbox/vault/ssh/keypair.py
def ensure_infra_keypair(
    scope: str,
    *,
    db: CredentialDB,
    comment: str | None = None,
    key_type: str = "ed25519",
) -> InfraKeypair:
    """Load or generate the ``%scope`` infrastructure keypair.

    The single place sandbox-internal callers go for the load-or-mint
    dance:

    1. If *scope* already has an assigned key, re-serialise it as
       OpenSSH PEM + render the public line and return.
    2. Otherwise mint a fresh keypair, persist it under *scope* with
       ``assign_ssh_key(..., allow_infra=True)``, and return the same
       shape.

    Only accepts ``%``-prefixed scopes (the infrastructure form the
    DB-layer safe-scope validator recognises) — user scopes go through
    the normal ``ssh init`` / [`import_ssh_keypair`][terok_sandbox.vault.ssh.keypair.import_ssh_keypair]
    paths.

    The load-or-mint sequence runs inside a single
    [`db.transaction()`][terok_sandbox.vault.store.db.CredentialDB.transaction]
    so two concurrent callers can't both observe "empty" and both
    proceed to mint.  Trust model: the returned ``private_pem`` is
    plaintext key material; possession of an unlocked
    [`CredentialDB`][terok_sandbox.vault.store.db.CredentialDB] is
    already operator-equivalent in this design, so callers with a DB
    handle can read any infra key.  Callers MUST NOT log, serialise,
    or otherwise persist ``private_pem`` outside the intended
    consumer (e.g. ``ssh -i`` file or in-process signer).  The keypair material is intended
    for sandbox-owned services that need a stable host-side identity
    (krun ``%host``, future infrastructure slots); user-controlled
    code never goes through this helper.

    Args:
        scope: ``"%name"`` infrastructure scope.  Validated structurally
            by the DB layer and refused here if it doesn't start with
            ``%``.
        db: Open [`CredentialDB`][terok_sandbox.vault.store.db.CredentialDB]
            — caller manages the lifetime.
        comment: Comment to embed in the public line on fresh
            generation.  Ignored when the keypair already exists
            (existing comment is preserved).  Defaults to
            ``"terok-infra:<scope>"``.
        key_type: ``"ed25519"`` (default) or ``"rsa"``.

    Returns:
        An [`InfraKeypair`][terok_sandbox.vault.ssh.keypair.InfraKeypair]
        with the OpenSSH PEM private + public line.
    """
    if not scope.startswith("%"):
        raise ValueError(
            f"ensure_infra_keypair: scope {scope!r} must start with '%' "
            "(infrastructure-reserved form); user scopes use ssh init or "
            "import_ssh_keypair instead"
        )

    # Wrap the entire check-mint-assign in a single SQLite transaction
    # so two concurrent callers can't both observe "empty" and both
    # proceed to mint a separate key for the same scope.  The
    # re-check inside the transaction is what closes the race window.
    with db.transaction():
        existing = db.load_ssh_keys_for_scope(scope)
        if existing:
            # ``load_ssh_keys_for_scope`` orders by ``assigned_at``
            # ascending, so the last element is the most recently
            # assigned key.  Prefer it: if an additive rotation ever
            # leaves more than one key under the scope, returning the
            # oldest would silently resurrect the rotated-out material.
            record = existing[-1]
            return InfraKeypair(
                scope=scope,
                private_pem=openssh_pem_of(record.private_der),
                public_line=public_line_of(record),
                fingerprint=record.fingerprint,
                key_type=record.key_type,
                created=False,
            )

        keypair = generate_keypair(
            key_type,
            comment=comment if comment is not None else f"terok-infra:{scope}",
        )
        key_id = db.store_ssh_key(
            key_type=keypair.key_type,
            private_der=keypair.private_der,
            public_blob=keypair.public_blob,
            comment=keypair.comment,
            fingerprint=keypair.fingerprint,
        )
        db.assign_ssh_key(scope, key_id, allow_infra=True)
        return InfraKeypair(
            scope=scope,
            private_pem=openssh_pem_of(keypair.private_der),
            public_line=keypair.public_line,
            fingerprint=keypair.fingerprint,
            key_type=keypair.key_type,
            created=True,
        )

public_line_of(record)

Render record as the one-line OpenSSH public key form.

Format: <algo> <base64-blob> <comment> — matches what ssh-keygen writes to .pub files and what a remote's deploy-key field expects. Callers that rendered this inline now go through this single helper so the algo-name mapping lives in one place.

Source code in src/terok_sandbox/vault/ssh/keypair.py
def public_line_of(record: SSHKeyRecord) -> str:
    """Render *record* as the one-line OpenSSH public key form.

    Format: ``<algo> <base64-blob> <comment>`` — matches what
    ``ssh-keygen`` writes to ``.pub`` files and what a remote's deploy-key
    field expects.  Callers that rendered this inline now go through this
    single helper so the algo-name mapping lives in one place.
    """
    algo = _algo_name(record.key_type)
    b64 = base64.b64encode(record.public_blob).decode("ascii")
    return f"{algo} {b64} {record.comment}".rstrip()

systemd_creds_has_tpm2()

Return True when the host has a TPM2 device usable by systemd-creds.

Mirrors systemd-creds has-tpm2's exit code. A preference probe, not a precondition: a missing TPM doesn't break the tier — host-key sealing still works in --user mode — so callers use this to choose between TPM2 and host-key, not to gate availability.

Source code in src/terok_sandbox/vault/store/systemd_creds.py
def has_tpm2() -> bool:
    """Return ``True`` when the host has a TPM2 device usable by systemd-creds.

    Mirrors ``systemd-creds has-tpm2``'s exit code.  A *preference*
    probe, not a precondition: a missing TPM doesn't break the tier —
    host-key sealing still works in ``--user`` mode — so callers use
    this to choose between TPM2 and host-key, not to gate availability.
    """
    if not is_available():
        return False
    try:
        result = subprocess.run(  # nosec: B603 — absolute path, fixed argv, no user input
            [_require_exe(), "has-tpm2"],
            capture_output=True,
            timeout=_PROBE_TIMEOUT,
            check=False,
        )
    except (OSError, subprocess.TimeoutExpired):
        return False
    return result.returncode == 0