Skip to content

terok_sandbox

terok_sandbox

terok-sandbox: hardened Podman container runner with gate and shield integration.

Public API for standalone use and integration with terok.

The primary configuration type is :class:SandboxConfig:

>>> from terok_sandbox import SandboxConfig
>>> cfg = SandboxConfig(gate_port=9418)

READY_MARKER = '>> init complete' module-attribute

Default log line emitted by init-ssh-and-repo.sh when the container is ready.

CommandDef(name, help='', handler=None, args=(), group='') dataclass

Definition of a sandbox subcommand.

Attributes:

Name Type Description
name str

Subcommand name (e.g. "gate start").

help str

One-line help string.

handler Callable[..., None] | None

Callable implementing the command.

args tuple[ArgDef, ...]

Argument definitions.

group str

Command group (e.g. "gate", "shield").

SandboxConfig(state_dir=_state_root(), runtime_dir=_runtime_root(), config_dir=_config_root(), credentials_dir=_credentials_root(), gate_port=9418, proxy_port=18731, ssh_agent_port=18732, shield_profiles=('dev-standard',), shield_audit=True, shield_bypass=False) dataclass

Immutable configuration for the sandbox layer.

All paths default to the XDG/FHS-resolved values from :mod:paths. Override individual fields when constructing from terok's global config or when using terok-sandbox standalone.

state_dir = field(default_factory=_state_root) class-attribute instance-attribute

Writable state root (tokens, gate repos, task data).

runtime_dir = field(default_factory=_runtime_root) class-attribute instance-attribute

Transient runtime directory (PID files, sockets).

config_dir = field(default_factory=_config_root) class-attribute instance-attribute

Sandbox-scoped configuration root.

Note: shield profiles are resolved by :attr:shield_profiles_dir via :func:~terok_sandbox.paths.umbrella_config_root, not from this directory.

credentials_dir = field(default_factory=_credentials_root) class-attribute instance-attribute

Shared credentials directory (DB, routes, env mounts).

gate_port = 9418 class-attribute instance-attribute

HTTP port for the gate server.

proxy_port = 18731 class-attribute instance-attribute

TCP port for the credential proxy (container access).

ssh_agent_port = 18732 class-attribute instance-attribute

TCP port for the SSH agent proxy (container access).

shield_profiles = ('dev-standard',) class-attribute instance-attribute

Shield egress firewall profile names.

shield_audit = True class-attribute instance-attribute

Whether shield audit logging is enabled.

shield_bypass = False class-attribute instance-attribute

DANGEROUS: when True, the egress firewall is completely disabled.

gate_base_path property

Return the gate server's repo base path.

token_file_path property

Return the path to the gate token file.

pid_file_path property

Return the PID file path for the managed gate daemon.

shield_profiles_dir property

Return the directory for terok-managed shield profiles.

proxy_db_path property

Return the path to the credential proxy sqlite3 database.

proxy_socket_path property

Return the Unix socket path for the credential proxy.

proxy_pid_file_path property

Return the PID file path for the managed credential proxy daemon.

proxy_routes_path property

Return the path to the proxy route configuration JSON.

ssh_keys_dir property

Return the base directory for per-project SSH keys.

ssh_keys_json_path property

Return the path to the SSH key mapping JSON.

CredentialDB(db_path)

SQLite-backed credential store and phantom token registry.

Parameters:

Name Type Description Default
db_path Path

Path to the sqlite3 database file. Parent directories are created automatically.

required
Source code in src/terok_sandbox/credential_db.py
def __init__(self, db_path: Path) -> None:
    db_path.parent.mkdir(parents=True, exist_ok=True)
    self._conn = sqlite3.connect(str(db_path), isolation_level="DEFERRED")
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute("PRAGMA foreign_keys=ON")
    self._create_tables()

store_credential(credential_set, provider, data)

Insert or replace a credential entry.

Source code in src/terok_sandbox/credential_db.py
def store_credential(self, credential_set: str, provider: str, data: dict) -> None:
    """Insert or replace a credential entry."""
    self._conn.execute(
        "INSERT OR REPLACE INTO credentials (credential_set, provider, data) VALUES (?, ?, ?)",
        (credential_set, provider, json.dumps(data)),
    )
    self._conn.commit()

load_credential(credential_set, provider)

Return the credential dict, or None if not found.

Source code in src/terok_sandbox/credential_db.py
def load_credential(self, credential_set: str, provider: str) -> dict | None:
    """Return the credential dict, or ``None`` if not found."""
    row = self._conn.execute(
        "SELECT data FROM credentials WHERE credential_set = ? AND provider = ?",
        (credential_set, provider),
    ).fetchone()
    return json.loads(row[0]) if row else None

list_credentials(credential_set)

Return provider names that have stored credentials.

Source code in src/terok_sandbox/credential_db.py
def list_credentials(self, credential_set: str) -> list[str]:
    """Return provider names that have stored credentials."""
    rows = self._conn.execute(
        "SELECT provider FROM credentials WHERE credential_set = ? ORDER BY provider",
        (credential_set,),
    ).fetchall()
    return [r[0] for r in rows]

delete_credential(credential_set, provider)

Remove a credential entry (idempotent).

Source code in src/terok_sandbox/credential_db.py
def delete_credential(self, credential_set: str, provider: str) -> None:
    """Remove a credential entry (idempotent)."""
    self._conn.execute(
        "DELETE FROM credentials WHERE credential_set = ? AND provider = ?",
        (credential_set, provider),
    )
    self._conn.commit()

create_proxy_token(project, task, credential_set, provider)

Create a per-task, per-provider phantom token.

Token format: terok-p-<32 hex chars>.

Source code in src/terok_sandbox/credential_db.py
def create_proxy_token(
    self, project: str, task: str, credential_set: str, provider: str
) -> str:
    """Create a per-task, per-provider phantom token.

    Token format: ``terok-p-<32 hex chars>``.
    """
    token = f"terok-p-{secrets.token_hex(16)}"
    self._conn.execute(
        "INSERT INTO proxy_tokens (token, project, task, credential_set, provider)"
        " VALUES (?, ?, ?, ?, ?)",
        (token, project, task, credential_set, provider),
    )
    self._conn.commit()
    return token

lookup_proxy_token(token)

Return {project, task, credential_set, provider} or None.

Source code in src/terok_sandbox/credential_db.py
def lookup_proxy_token(self, token: str) -> dict | None:
    """Return ``{project, task, credential_set, provider}`` or ``None``."""
    row = self._conn.execute(
        "SELECT project, task, credential_set, provider FROM proxy_tokens WHERE token = ?",
        (token,),
    ).fetchone()
    if row is None:
        return None
    return {"project": row[0], "task": row[1], "credential_set": row[2], "provider": row[3]}

revoke_proxy_tokens(project, task)

Revoke all tokens for a project/task pair. Returns count revoked.

Source code in src/terok_sandbox/credential_db.py
def revoke_proxy_tokens(self, project: str, task: str) -> int:
    """Revoke all tokens for a project/task pair.  Returns count revoked."""
    cur = self._conn.execute(
        "DELETE FROM proxy_tokens WHERE project = ? AND task = ?",
        (project, task),
    )
    self._conn.commit()
    return cur.rowcount

close()

Close the database connection.

Source code in src/terok_sandbox/credential_db.py
def close(self) -> None:
    """Close the database connection."""
    self._conn.close()

__del__()

Best-effort close on garbage collection.

Source code in src/terok_sandbox/credential_db.py
def __del__(self) -> None:
    """Best-effort close on garbage collection."""
    try:
        self._conn.close()
    except Exception:  # noqa: BLE001
        pass

CredentialProxyStatus(mode, running, healthy, socket_path, db_path, routes_path, routes_configured, credentials_stored) dataclass

Current state of the credential proxy.

mode instance-attribute

"systemd", "daemon", or "none".

running instance-attribute

Whether the proxy is active (systemd socket listening or daemon alive).

healthy instance-attribute

Whether the proxy responded to an HTTP health check.

socket_path instance-attribute

Configured Unix socket path.

db_path instance-attribute

Configured credential database path.

routes_path instance-attribute

Configured proxy routes JSON path.

routes_configured instance-attribute

Number of routes in routes.json (0 if missing or invalid).

credentials_stored instance-attribute

Provider names with stored credentials.

CheckVerdict(severity, detail, fixable=False) dataclass

Result of evaluating a single health check probe.

severity instance-attribute

"ok", "warn", or "error".

detail instance-attribute

Human-readable explanation.

fixable = False class-attribute instance-attribute

Whether fix_cmd should be offered to the operator.

DoctorCheck(category, label, probe_cmd, evaluate, fix_cmd=None, fix_description='', host_side=False) dataclass

A single health check to run inside (or against) a container.

The probe_cmd is executed via podman exec <cname> ... by the orchestrator. The evaluate callable interprets the result. If fix_cmd is set, the orchestrator may offer it when the check fails with fixable=True.

Dual execution modes:

  • Container mode (host_side=False): the orchestrator runs probe_cmd via podman exec and passes the result to evaluate. The standalone doctor command runs the same probe_cmd directly via subprocess on the host.
  • Host-side mode (host_side=True): the orchestrator bypasses probe_cmd entirely and performs the check via Python APIs (e.g. make_shield), then passes resolved state to evaluate. The standalone doctor command calls evaluate(0, "", "") and the function performs the check itself or reports a neutral result.

category instance-attribute

Grouping key: "bridge", "env", "mount", "network", "shield", "git".

label instance-attribute

Human-readable check name shown in output.

probe_cmd instance-attribute

Shell command to run inside the container via podman exec.

evaluate instance-attribute

(returncode, stdout, stderr) → CheckVerdict.

fix_cmd = None class-attribute instance-attribute

Optional remediation command for podman exec.

fix_description = '' class-attribute instance-attribute

Shown to the operator before applying the fix.

host_side = False class-attribute instance-attribute

If True, the check runs on the host (not via podman exec). The orchestrator calls evaluate(0, "", "") and the evaluate function performs the host-side check itself.

GateServerStatus(mode, running, port) dataclass

Current state of the gate server.

mode instance-attribute

"systemd", "daemon", or "none".

running instance-attribute

Whether the server is currently reachable.

port instance-attribute

Configured port.

GateStalenessInfo(branch, gate_head, upstream_head, is_stale, commits_behind, commits_ahead, last_checked, error) dataclass

Result of comparing gate vs upstream.

GitGate(*, project_id, gate_path, upstream_url=None, default_branch=None, ssh_host_dir=None, ssh_key_name=None, validate_gate_fn=None)

Repository + Gateway for a host-side git gate mirror.

Manages the bare git mirror that containers clone from. Provides operations for initial creation, incremental sync from upstream, selective branch fetching, and staleness detection.

Constructor takes plain parameters — no terok-specific types.

Initialise with plain parameters.

Parameters

project_id: Identifier for this gate's owner. gate_path: Path to the bare git mirror on the host. upstream_url: Git upstream URL to sync from. default_branch: Branch name used for staleness comparisons. ssh_host_dir: Explicit SSH directory for git operations. When None, falls back to SandboxConfig().ssh_keys_dir / project_id. ssh_key_name: Explicit SSH key filename. validate_gate_fn: Optional callback (project_id) -> None that validates no other project uses the same gate with a different upstream. Injected by the orchestration layer; omitted for standalone use.

Source code in src/terok_sandbox/git_gate.py
def __init__(
    self,
    *,
    project_id: str,
    gate_path: Path | str,
    upstream_url: str | None = None,
    default_branch: str | None = None,
    ssh_host_dir: Path | str | None = None,
    ssh_key_name: str | None = None,
    validate_gate_fn: Callable[[str], None] | None = None,
) -> None:
    """Initialise with plain parameters.

    Parameters
    ----------
    project_id:
        Identifier for this gate's owner.
    gate_path:
        Path to the bare git mirror on the host.
    upstream_url:
        Git upstream URL to sync from.
    default_branch:
        Branch name used for staleness comparisons.
    ssh_host_dir:
        Explicit SSH directory for git operations.  When ``None``,
        falls back to ``SandboxConfig().ssh_keys_dir / project_id``.
    ssh_key_name:
        Explicit SSH key filename.
    validate_gate_fn:
        Optional callback ``(project_id) -> None`` that validates no other
        project uses the same gate with a different upstream.  Injected by
        the orchestration layer; omitted for standalone use.
    """
    self._project_id = project_id
    self._gate_path = Path(gate_path)
    self._upstream_url = upstream_url
    self._default_branch = default_branch
    self._ssh_host_dir = Path(ssh_host_dir) if ssh_host_dir else None
    self._ssh_key_name = ssh_key_name
    self._validate_gate_fn = validate_gate_fn

sync(branches=None, force_reinit=False)

Sync the host-side git mirror gate.

  • Uses SSH configuration via GIT_SSH_COMMAND.
  • If gate doesn't exist (or force_reinit), performs a fresh git clone --mirror.
  • Always runs the sync logic afterward for consistent side effects.

Returns:

Type Description
GateSyncResult

Dict with keys: path, upstream_url, created (bool), success,

GateSyncResult

updated_branches, errors.

Source code in src/terok_sandbox/git_gate.py
def sync(
    self,
    branches: list[str] | None = None,
    force_reinit: bool = False,
) -> GateSyncResult:
    """Sync the host-side git mirror gate.

    - Uses SSH configuration via GIT_SSH_COMMAND.
    - If gate doesn't exist (or *force_reinit*), performs a fresh ``git clone --mirror``.
    - Always runs the sync logic afterward for consistent side effects.

    Returns:
        Dict with keys: path, upstream_url, created (bool), success,
        updated_branches, errors.
    """
    if not self._upstream_url:
        raise SystemExit("Project has no git.upstream_url configured")

    self._validate_gate()

    gate_dir = self._gate_path
    gate_exists = gate_dir.exists()
    gate_dir.parent.mkdir(parents=True, exist_ok=True)

    env = self._ssh_env()
    created = False
    if force_reinit and gate_exists:
        try:
            if gate_dir.is_dir():
                shutil.rmtree(gate_dir)
        except Exception as exc:
            logger.warning(f"Failed to remove gate dir {gate_dir}: {exc}")
        gate_exists = False

    if not gate_exists:
        _clone_gate_mirror(self._upstream_url, gate_dir, env)
        created = True

    sync_result = self.sync_branches(branches)
    return {
        "path": str(gate_dir),
        "upstream_url": self._upstream_url,
        "created": created,
        "success": sync_result["success"],
        "updated_branches": sync_result["updated_branches"],
        "errors": sync_result["errors"],
    }

sync_branches(branches=None)

Sync specific branches in the gate from upstream.

Parameters:

Name Type Description Default
branches list[str] | None

List of branches to sync (default: all via remote update)

None

Returns:

Type Description
BranchSyncResult

Dict with keys: success, updated_branches, errors

Source code in src/terok_sandbox/git_gate.py
def sync_branches(self, branches: list[str] | None = None) -> BranchSyncResult:
    """Sync specific branches in the gate from upstream.

    Args:
        branches: List of branches to sync (default: all via remote update)

    Returns:
        Dict with keys: success, updated_branches, errors
    """
    gate_dir = self._gate_path

    if not gate_dir.exists():
        return {"success": False, "updated_branches": [], "errors": ["Gate not initialized"]}

    self._validate_gate()

    env = self._ssh_env()
    errors: list[str] = []
    updated: list[str] = []

    try:
        cmd = ["git", "-C", str(gate_dir), "remote", "update", "--prune"]
        result = subprocess.run(cmd, capture_output=True, text=True, env=env, timeout=120)

        if result.returncode != 0:
            errors.append(f"remote update failed: {result.stderr}")
        else:
            updated = branches if branches else ["all"]

    except subprocess.TimeoutExpired:
        errors.append("Sync timed out")
    except Exception as e:
        errors.append(str(e))

    return {"success": len(errors) == 0, "updated_branches": updated, "errors": errors}

compare_vs_upstream(branch=None)

Compare gate HEAD vs upstream HEAD for a branch.

Parameters:

Name Type Description Default
branch str | None

Branch to compare (default: configured default_branch)

None

Returns:

Type Description
GateStalenessInfo

GateStalenessInfo with comparison results

Source code in src/terok_sandbox/git_gate.py
def compare_vs_upstream(self, branch: str | None = None) -> GateStalenessInfo:
    """Compare gate HEAD vs upstream HEAD for a branch.

    Args:
        branch: Branch to compare (default: configured default_branch)

    Returns:
        GateStalenessInfo with comparison results
    """
    branch = branch or self._default_branch
    now = datetime.now().isoformat()

    if not branch:
        return GateStalenessInfo(
            branch=None,
            gate_head=None,
            upstream_head=None,
            is_stale=False,
            commits_behind=None,
            commits_ahead=None,
            last_checked=now,
            error="No branch configured",
        )

    env = self._ssh_env()

    # Get gate HEAD
    gate_head = _get_gate_branch_head(self._gate_path, branch, env)
    if gate_head is None:
        return GateStalenessInfo(
            branch=branch,
            gate_head=None,
            upstream_head=None,
            is_stale=False,
            commits_behind=None,
            commits_ahead=None,
            last_checked=now,
            error="Gate not initialized",
        )

    # Get upstream HEAD
    if not self._upstream_url:
        return GateStalenessInfo(
            branch=branch,
            gate_head=gate_head,
            upstream_head=None,
            is_stale=False,
            commits_behind=None,
            commits_ahead=None,
            last_checked=now,
            error="No upstream URL configured",
        )

    upstream_info = _get_upstream_head(self._upstream_url, branch, env)
    if upstream_info is None:
        return GateStalenessInfo(
            branch=branch,
            gate_head=gate_head,
            upstream_head=None,
            is_stale=False,
            commits_behind=None,
            commits_ahead=None,
            last_checked=now,
            error="Could not reach upstream",
        )

    upstream_head = upstream_info["commit_hash"]
    is_stale = gate_head != upstream_head

    commits_behind = None
    commits_ahead = None
    if is_stale:
        commits_behind = _count_commits_range(self._gate_path, gate_head, upstream_head, env)
        commits_ahead = _count_commits_range(self._gate_path, upstream_head, gate_head, env)

    return GateStalenessInfo(
        branch=branch,
        gate_head=gate_head,
        upstream_head=upstream_head,
        is_stale=is_stale,
        commits_behind=commits_behind if is_stale else 0,
        commits_ahead=commits_ahead if is_stale else 0,
        last_checked=now,
        error=None,
    )

last_commit()

Get information about the last commit on the configured branch.

Returns None if the gate doesn't exist or is not accessible.

Source code in src/terok_sandbox/git_gate.py
def last_commit(self) -> CommitInfo | None:
    """Get information about the last commit on the configured branch.

    Returns ``None`` if the gate doesn't exist or is not accessible.
    """
    try:
        gate_dir = self._gate_path

        if not gate_dir.exists() or not gate_dir.is_dir():
            return None

        env = self._ssh_env()

        rev = f"refs/heads/{self._default_branch}" if self._default_branch else "HEAD"
        cmd = [
            "git",
            "-C",
            str(gate_dir),
            "log",
            "-1",
            rev,
            "--pretty=format:%H%x00%ad%x00%an%x00%s",
            "--date=iso",
        ]

        result = subprocess.run(cmd, capture_output=True, text=True, env=env)
        if result.returncode != 0 and self._default_branch:
            cmd[5] = "HEAD"
            result = subprocess.run(cmd, capture_output=True, text=True, env=env)
        if result.returncode != 0:
            return None

        parts = result.stdout.strip().split("\x00", 3)
        if len(parts) == 4:
            return {
                "commit_hash": parts[0],
                "commit_date": parts[1],
                "commit_author": parts[2],
                "commit_message": parts[3],
            }
        return None

    except Exception:
        return None

GpuConfigError(message, *, hint=_CDI_HINT)

Bases: RuntimeError

CDI/NVIDIA misconfiguration detected during container launch.

Store the CDI hint alongside the standard error message.

Source code in src/terok_sandbox/runtime.py
def __init__(self, message: str, *, hint: str = _CDI_HINT) -> None:
    """Store the CDI *hint* alongside the standard error *message*."""
    self.hint = hint
    super().__init__(message)

LifecycleHooks(pre_start=None, post_start=None, post_ready=None, post_stop=None) dataclass

Optional callbacks fired at container lifecycle transitions.

All slots are None by default. Sandbox.run() fires pre_start before podman run and post_start after a successful launch. post_ready and post_stop are available for callers to invoke at the appropriate time (e.g. after log streaming or container exit).

pre_start = None class-attribute instance-attribute

Fired before podman run.

post_start = None class-attribute instance-attribute

Fired after a successful podman run.

post_ready = None class-attribute instance-attribute

Fired when the container reports ready (caller responsibility).

post_stop = None class-attribute instance-attribute

Fired after the container exits (caller responsibility).

RunSpec(container_name, image, env, volumes, command, task_dir, gpu_enabled=False, extra_args=(), unrestricted=True) dataclass

Everything needed for a single podman run invocation.

container_name instance-attribute

Unique container name.

image instance-attribute

Image tag to run (e.g. terok-l1-cli:ubuntu-24.04).

env instance-attribute

Environment variables injected into the container.

volumes instance-attribute

Volume mount strings (host:container[:opts]).

command instance-attribute

Command to execute inside the container.

task_dir instance-attribute

Host-side task directory (for shield state, logs, etc.).

gpu_enabled = False class-attribute instance-attribute

Whether to pass GPU device args to podman.

extra_args = () class-attribute instance-attribute

Additional podman run arguments (e.g. port publishing).

unrestricted = True class-attribute instance-attribute

When False, adds --security-opt no-new-privileges.

Sandbox(config=None)

Stateless facade composing sandbox primitives.

All methods delegate to the module-level functions in this package, passing the stored :class:SandboxConfig. The existing function-level API remains the canonical interface — this class is a convenience for callers that manage a config instance.

Source code in src/terok_sandbox/sandbox.py
def __init__(self, config: SandboxConfig | None = None) -> None:
    self._cfg = config or SandboxConfig()

config property

Return the sandbox configuration.

ensure_gate()

Verify the gate server is running; raise SystemExit if not.

Source code in src/terok_sandbox/sandbox.py
def ensure_gate(self) -> None:
    """Verify the gate server is running; raise ``SystemExit`` if not."""
    from .gate_server import ensure_server_reachable

    ensure_server_reachable(self._cfg)

create_token(project_id, task_id)

Create a task-scoped gate access token.

Source code in src/terok_sandbox/sandbox.py
def create_token(self, project_id: str, task_id: str) -> str:
    """Create a task-scoped gate access token."""
    from .gate_tokens import create_token

    return create_token(task_id, project_id, self._cfg)

gate_url(repo_path, token)

Build an HTTP URL for gate access to repo_path.

Source code in src/terok_sandbox/sandbox.py
def gate_url(self, repo_path: Path, token: str) -> str:
    """Build an HTTP URL for gate access to *repo_path*."""
    port = self._cfg.gate_port
    base = self._cfg.gate_base_path
    rel = repo_path.relative_to(base).as_posix()
    return f"http://{token}@host.containers.internal:{port}/{rel}"

gate_status()

Return the current gate server status.

Source code in src/terok_sandbox/sandbox.py
def gate_status(self) -> GateServerStatus:
    """Return the current gate server status."""
    from .gate_server import get_server_status

    return get_server_status(self._cfg)

pre_start_args(container, task_dir)

Return extra podman args for shield integration.

Source code in src/terok_sandbox/sandbox.py
def pre_start_args(self, container: str, task_dir: Path) -> list[str]:
    """Return extra podman args for shield integration."""
    from .shield import pre_start

    return pre_start(container, task_dir, self._cfg)

shield_down(container, task_dir)

Remove shield rules for a container (allow all egress).

Source code in src/terok_sandbox/sandbox.py
def shield_down(self, container: str, task_dir: Path) -> None:
    """Remove shield rules for a container (allow all egress)."""
    from .shield import down

    down(container, task_dir, cfg=self._cfg)

run(spec, *, hooks=None)

Launch a detached container from spec.

Assembles and executes the podman run command, handling user namespace mapping, shield or bypass networking, GPU device args, environment and volume injection, CDI error detection, and lifecycle hook callbacks.

Fires hooks.pre_start before podman run and hooks.post_start after a successful launch. Raises :class:~.runtime.GpuConfigError when the launch fails due to NVIDIA CDI misconfiguration.

Source code in src/terok_sandbox/sandbox.py
def run(self, spec: RunSpec, *, hooks: LifecycleHooks | None = None) -> None:
    """Launch a detached container from *spec*.

    Assembles and executes the ``podman run`` command, handling user
    namespace mapping, shield or bypass networking, GPU device args,
    environment and volume injection, CDI error detection, and lifecycle
    hook callbacks.

    Fires *hooks.pre_start* before ``podman run`` and *hooks.post_start*
    after a successful launch.  Raises :class:`~.runtime.GpuConfigError`
    when the launch fails due to NVIDIA CDI misconfiguration.
    """
    from .runtime import (
        bypass_network_args,
        check_gpu_error,
        gpu_run_args,
        podman_userns_args,
        redact_env_args,
    )

    cmd: list[str] = ["podman", "run", "-d"]
    cmd += podman_userns_args()

    if not spec.unrestricted:
        cmd += ["--security-opt", "no-new-privileges"]

    if self._cfg.shield_bypass:
        print("\n!! SHIELD BYPASSED — egress firewall DISABLED (shield_bypass is set) !!\n")
        cmd += bypass_network_args(self._cfg.gate_port)
    else:
        try:
            from .shield import pre_start

            cmd += pre_start(spec.container_name, spec.task_dir, self._cfg)
        except SystemExit:
            raise  # ShieldNeedsSetup; let the caller handle it
        except (OSError, FileNotFoundError) as exc:
            import warnings

            warnings.warn(
                f"Shield setup failed ({exc}) — container will have unfiltered egress",
                stacklevel=2,
            )

    cmd += gpu_run_args(enabled=spec.gpu_enabled)

    if spec.extra_args:
        cmd += list(spec.extra_args)
    for vol in spec.volumes:
        cmd += ["-v", vol]
    for k, v in spec.env.items():
        cmd += ["-e", f"{k}={v}"]

    cmd += ["--name", spec.container_name, "-w", "/workspace", spec.image]
    cmd += list(spec.command)

    print("$", shlex.join(redact_env_args(cmd)))

    if hooks and hooks.pre_start:
        hooks.pre_start()

    try:
        subprocess.run(cmd, check=True, capture_output=True)
    except FileNotFoundError:
        raise SystemExit("podman not found; please install podman")
    except subprocess.CalledProcessError as exc:
        check_gpu_error(exc)
        stderr = (exc.stderr or b"").decode(errors="replace")
        msg = f"Container launch failed:\n{stderr.strip()}" if stderr else str(exc)
        raise SystemExit(msg) from exc

    if hooks and hooks.post_start:
        hooks.post_start()

stream_logs(container, *, timeout=None, ready_check=None)

Stream container logs until ready_check matches or timeout.

Source code in src/terok_sandbox/sandbox.py
def stream_logs(
    self,
    container: str,
    *,
    timeout: float | None = None,
    ready_check: Callable[[str], bool] | None = None,
) -> bool:
    """Stream container logs until *ready_check* matches or timeout."""
    from .runtime import stream_initial_logs

    check = ready_check or (lambda line: READY_MARKER in line)
    return stream_initial_logs(container, timeout, check)

wait_for_exit(container, timeout=None)

Block until container exits; return exit code.

Source code in src/terok_sandbox/sandbox.py
def wait_for_exit(self, container: str, timeout: float | None = None) -> int:
    """Block until container exits; return exit code."""
    from .runtime import wait_for_exit

    return wait_for_exit(container, timeout)

stop(containers)

Best-effort stop and remove containers.

Source code in src/terok_sandbox/sandbox.py
def stop(self, containers: list[str]) -> None:
    """Best-effort stop and remove containers."""
    from .runtime import stop_task_containers

    stop_task_containers(containers)

init_ssh(project_id)

Create an SSH manager for project_id.

Source code in src/terok_sandbox/sandbox.py
def init_ssh(self, project_id: str) -> SSHManager:
    """Create an SSH manager for *project_id*."""
    from .ssh import SSHManager

    return SSHManager(project_id=project_id)

SSHManager(*, project_id, ssh_host_dir=None, ssh_key_name=None, ssh_config_template=None)

SSH keypair generation and config directory management.

Handles the full SSH setup lifecycle: directory creation, keypair generation (ed25519 or RSA), config file rendering from templates, and permission hardening. Keys are stored under ssh_keys_dir/<project> and used by the credential proxy's SSH agent for container access.

Initialize with plain parameters.

Parameters

project_id: Identifier used for key naming and directory layout. ssh_host_dir: Explicit SSH directory (overrides default <ssh_keys_dir>/<id>). ssh_key_name: Explicit key filename (overrides derived id_<type>_<id>). ssh_config_template: Path to a user-provided SSH config template file.

Source code in src/terok_sandbox/ssh.py
def __init__(
    self,
    *,
    project_id: str,
    ssh_host_dir: Path | str | None = None,
    ssh_key_name: str | None = None,
    ssh_config_template: Path | str | None = None,
) -> None:
    """Initialize with plain parameters.

    Parameters
    ----------
    project_id:
        Identifier used for key naming and directory layout.
    ssh_host_dir:
        Explicit SSH directory (overrides default ``<ssh_keys_dir>/<id>``).
    ssh_key_name:
        Explicit key filename (overrides derived ``id_<type>_<id>``).
    ssh_config_template:
        Path to a user-provided SSH config template file.
    """
    self._project_id = project_id
    self._ssh_host_dir = Path(ssh_host_dir) if ssh_host_dir else None
    self._ssh_key_name = ssh_key_name
    self._ssh_config_template = Path(ssh_config_template) if ssh_config_template else None

key_name property

Return the effective SSH key name.

init(key_type='ed25519', key_name=None, force=False)

Initialize the SSH directory and generate a keypair.

Location resolution
  • If ssh_host_dir was provided, use that path.
  • Otherwise: <ssh_keys_dir>/<project_id>

Key name defaults to id_<type>_<project_id> (e.g. id_ed25519_proj).

Source code in src/terok_sandbox/ssh.py
def init(
    self,
    key_type: str = "ed25519",
    key_name: str | None = None,
    force: bool = False,
) -> SSHInitResult:
    """Initialize the SSH directory and generate a keypair.

    Location resolution:
      - If *ssh_host_dir* was provided, use that path.
      - Otherwise: ``<ssh_keys_dir>/<project_id>``

    Key name defaults to ``id_<type>_<project_id>`` (e.g. ``id_ed25519_proj``).
    """
    if key_type not in ("ed25519", "rsa"):
        raise SystemExit("Unsupported --key-type. Use 'ed25519' or 'rsa'.")

    target_dir = self._ssh_host_dir or (SandboxConfig().ssh_keys_dir / self._project_id)
    target_dir = Path(target_dir).expanduser().resolve()
    ensure_dir_writable(target_dir, "SSH host dir")

    if not key_name:
        key_name = effective_ssh_key_name(
            self._project_id, ssh_key_name=self._ssh_key_name, key_type=key_type
        )

    # Reject path-like or reserved key names
    _RESERVED_NAMES = {"config", "known_hosts", "authorized_keys"}
    key_path = Path(key_name)
    if key_path.is_absolute() or ".." in key_path.parts or "/" in key_name or "\\" in key_name:
        raise SystemExit(
            f"Invalid SSH key name {key_name!r}: must be a plain filename, "
            "not an absolute path or traversal sequence"
        )
    if key_name.lower() in _RESERVED_NAMES:
        raise SystemExit(
            f"Invalid SSH key name {key_name!r}: collides with reserved "
            f"filename (reserved: {', '.join(sorted(_RESERVED_NAMES))})"
        )

    priv_path = target_dir / key_name
    pub_path = target_dir / f"{key_name}.pub"
    cfg_path = target_dir / "config"

    # Refuse to reuse artifacts that are symlinks or non-regular files
    for p in (priv_path, pub_path, cfg_path):
        if p.exists() or p.is_symlink():
            if p.is_symlink() or not p.is_file():
                raise SystemExit(
                    f"Refusing to use {p}: expected a regular file but found "
                    f"{'a symlink' if p.is_symlink() else 'a non-regular file'}. "
                    "Remove it manually and retry."
                )

    if force or not priv_path.exists() or not pub_path.exists():
        self._generate_keypair(key_type, priv_path, pub_path, self._project_id)

    if force or not cfg_path.exists():
        self._render_config(
            cfg_path, key_name, priv_path, self._project_id, self._ssh_config_template
        )

    try:
        _harden_permissions(target_dir, priv_path, pub_path, cfg_path)
    except OSError as e:
        raise SystemExit(f"Failed to set SSH directory permissions on {target_dir}: {e}") from e
    _print_init_summary(target_dir, priv_path, pub_path, cfg_path)
    return SSHInitResult(
        dir=str(target_dir),
        private_key=str(priv_path),
        public_key=str(pub_path),
        config_path=str(cfg_path),
        key_name=key_name,
    )

ensure_proxy_reachable(cfg=None)

Verify the credential proxy is running and its TCP ports are up.

For systemd socket activation the service may not have started yet (e.g. after a fresh boot). This function triggers a start via systemctl --user start and waits for the HTTP and SSH agent TCP ports to become reachable via /-/health and raw TCP probes.

For daemon mode the /-/health endpoint is probed on the TCP port.

Raises SystemExit with an actionable message if the proxy is unreachable. Called before task creation when credential proxy is enabled.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def ensure_proxy_reachable(cfg: SandboxConfig | None = None) -> None:
    """Verify the credential proxy is running and its TCP ports are up.

    For **systemd** socket activation the service may not have started yet
    (e.g. after a fresh boot).  This function triggers a start via
    ``systemctl --user start`` and waits for the HTTP and SSH agent TCP
    ports to become reachable via ``/-/health`` and raw TCP probes.

    For **daemon** mode the ``/-/health`` endpoint is probed on the TCP port.

    Raises ``SystemExit`` with an actionable message if the proxy is
    unreachable.  Called before task creation when credential proxy is enabled.
    """
    c = _cfg(cfg)

    if not is_socket_active() and not is_daemon_running(cfg):
        hint = (
            "  terokctl credentials install   (systemd socket activation)\n"
            "  terokctl credentials start      (manual daemon)"
        )
        raise SystemExit(
            "Credential proxy is not reachable.\n"
            "\n"
            "The credential proxy injects real API credentials into container\n"
            "requests without exposing secrets to the container filesystem.\n"
            "\n"
            f"Start it with:\n{hint}\n"
            f"\n"
            f"Socket: {c.proxy_socket_path}\n"
            f"DB:     {c.proxy_db_path}\n"
        )

    # Systemd socket activation: the socket unit is active but the service
    # may be idle.  Explicitly start the service so the TCP ports come up.
    if is_socket_active():
        subprocess.run(
            ["systemctl", "--user", "start", _SERVICE_UNIT],
            check=False,
            timeout=10,
        )

    if not _wait_for_ready(c.proxy_port):
        raise SystemExit(
            f"Credential proxy service started but TCP port {c.proxy_port} "
            "is not reachable. Check: journalctl --user -u terok-credential-proxy"
        )

    if not _wait_for_tcp_port(c.ssh_agent_port):
        raise SystemExit(
            f"Credential proxy service started but SSH agent port {c.ssh_agent_port} "
            "is not reachable. Check: journalctl --user -u terok-credential-proxy"
        )

get_proxy_port(cfg=None)

Return the configured credential proxy TCP port.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def get_proxy_port(cfg: SandboxConfig | None = None) -> int:
    """Return the configured credential proxy TCP port."""
    return _cfg(cfg).proxy_port

get_proxy_status(cfg=None)

Return the current credential proxy status.

Populates route count from the routes JSON (0 if missing/invalid) and credential provider names from the database (empty if DB doesn't exist).

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def get_proxy_status(cfg: SandboxConfig | None = None) -> CredentialProxyStatus:
    """Return the current credential proxy status.

    Populates route count from the routes JSON (0 if missing/invalid) and
    credential provider names from the database (empty if DB doesn't exist).
    """
    c = _cfg(cfg)

    routes_count = 0
    if c.proxy_routes_path.is_file():
        try:
            import json

            routes_count = len(json.loads(c.proxy_routes_path.read_text()))
        except (json.JSONDecodeError, OSError):
            pass

    creds: tuple[str, ...] = ()
    if c.proxy_db_path.is_file():
        try:
            from .credential_db import CredentialDB

            db = CredentialDB(c.proxy_db_path)
            try:
                creds = tuple(db.list_credentials("default"))
            finally:
                db.close()
        except Exception as exc:  # noqa: BLE001
            log_warning(f"Failed to read credential DB for status: {exc}")

    # Systemd takes precedence: when units are installed, report mode="systemd"
    # even if the socket is inactive — the daemon's running state is ignored so
    # operators see the correct activation path and don't get mixed signals.
    if is_socket_installed():
        mode = "systemd"
        running = is_service_active()
        healthy = _probe_proxy(c.proxy_port) if running else False
    elif is_daemon_running(cfg):
        mode = "daemon"
        running = True
        healthy = _probe_proxy(c.proxy_port)
    else:
        mode = "none"
        running = False
        healthy = False

    return CredentialProxyStatus(
        mode=mode,
        running=running,
        healthy=healthy,
        socket_path=c.proxy_socket_path,
        db_path=c.proxy_db_path,
        routes_path=c.proxy_routes_path,
        routes_configured=routes_count,
        credentials_stored=creds,
    )

get_ssh_agent_port(cfg=None)

Return the configured SSH agent proxy TCP port.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def get_ssh_agent_port(cfg: SandboxConfig | None = None) -> int:
    """Return the configured SSH agent proxy TCP port."""
    return _cfg(cfg).ssh_agent_port

install_proxy_systemd(cfg=None)

Render and install systemd socket+service units, then enable+start the socket.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def install_systemd_units(cfg: SandboxConfig | None = None) -> None:
    """Render and install systemd socket+service units, then enable+start the socket."""
    import terok_sandbox.credential_proxy

    from ._util import render_template

    c = _cfg(cfg)
    unit_dir = _systemd_unit_dir()
    unit_dir.mkdir(parents=True, exist_ok=True)

    resource_dir = (
        Path(terok_sandbox.credential_proxy.__file__).resolve().parent / "resources" / "systemd"
    )
    variables = {
        "SOCKET_PATH": str(c.proxy_socket_path),
        "DB_PATH": str(c.proxy_db_path),
        "ROUTES_PATH": str(c.proxy_routes_path),
        "PORT": str(c.proxy_port),
        "SSH_AGENT_PORT": str(c.ssh_agent_port),
        "SSH_KEYS_FILE": str(c.ssh_keys_json_path),
        "BIN": shlex.join(_proxy_exec_prefix()),
        "UNIT_VERSION": str(_UNIT_VERSION),
    }

    for template_name in (_SOCKET_UNIT, _SERVICE_UNIT):
        template_path = resource_dir / template_name
        if not template_path.is_file():
            raise SystemExit(f"Missing systemd template: {template_path}")
        content = render_template(template_path, variables)
        (unit_dir / template_name).write_text(content, encoding="utf-8")

    c.proxy_socket_path.parent.mkdir(parents=True, exist_ok=True)
    subprocess.run(["systemctl", "--user", "daemon-reload"], check=True, timeout=10)
    subprocess.run(
        ["systemctl", "--user", "enable", "--now", _SOCKET_UNIT],
        check=True,
        timeout=10,
    )
    # Restart to apply updated unit configuration if socket was already active.
    subprocess.run(
        ["systemctl", "--user", "restart", _SOCKET_UNIT],
        check=True,
        timeout=10,
    )

is_proxy_running(cfg=None)

Check whether the managed proxy daemon is alive via its PID file.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def is_daemon_running(cfg: SandboxConfig | None = None) -> bool:
    """Check whether the managed proxy daemon is alive via its PID file."""
    pidfile = _pid_file(cfg)
    if not pidfile.is_file():
        return False
    try:
        pid = int(pidfile.read_text().strip())
        if not _is_managed_proxy(pid, cfg):
            return False
        os.kill(pid, 0)  # signal 0 = existence check
        return True
    except (ValueError, ProcessLookupError, PermissionError):
        return False

is_proxy_service_active()

Check whether the terok-credential-proxy.service unit is active.

Unlike :func:is_socket_active, this tells whether the proxy daemon itself is running (TCP ports bound), not just whether the socket is listening. Does not trigger socket activation.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def is_service_active() -> bool:
    """Check whether the ``terok-credential-proxy.service`` unit is active.

    Unlike :func:`is_socket_active`, this tells whether the proxy daemon
    itself is running (TCP ports bound), not just whether the socket is
    listening.  Does not trigger socket activation.
    """
    try:
        result = subprocess.run(
            ["systemctl", "--user", "is-active", _SERVICE_UNIT],
            capture_output=True,
            text=True,
            timeout=5,
        )
        return result.stdout.strip() == "active"
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return False

is_proxy_socket_active()

Check whether the terok-credential-proxy.socket unit is active.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def is_socket_active() -> bool:
    """Check whether the ``terok-credential-proxy.socket`` unit is active."""
    try:
        result = subprocess.run(
            ["systemctl", "--user", "is-active", _SOCKET_UNIT],
            capture_output=True,
            text=True,
            timeout=5,
        )
        return result.stdout.strip() == "active"
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return False

is_proxy_socket_installed()

Check whether the terok-credential-proxy.socket unit file exists.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def is_socket_installed() -> bool:
    """Check whether the ``terok-credential-proxy.socket`` unit file exists."""
    return (_systemd_unit_dir() / _SOCKET_UNIT).is_file()

is_proxy_systemd_available()

Check whether the systemd user session is reachable.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def is_systemd_available() -> bool:
    """Check whether the systemd user session is reachable."""
    try:
        result = subprocess.run(
            ["systemctl", "--user", "is-system-running"],
            capture_output=True,
            text=True,
            timeout=5,
        )
        return result.returncode in (0, 1)
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return False

start_proxy(cfg=None)

Start the credential proxy as a background daemon.

The proxy listens on a Unix socket and reads credentials from a sqlite3 database. A routes JSON file must exist at the configured path (generated by terok-agent from the YAML registry).

Writes a PID file to runtime_root() / "credential-proxy.pid".

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def start_daemon(cfg: SandboxConfig | None = None) -> None:
    """Start the credential proxy as a background daemon.

    The proxy listens on a Unix socket and reads credentials from a
    sqlite3 database.  A routes JSON file must exist at the configured
    path (generated by terok-agent from the YAML registry).

    Writes a PID file to ``runtime_root() / "credential-proxy.pid"``.
    """
    c = _cfg(cfg)
    sock_path = c.proxy_socket_path
    db_path = c.proxy_db_path
    routes_path = c.proxy_routes_path
    pidfile = _pid_file(cfg)

    sock_path.parent.mkdir(parents=True, exist_ok=True)
    pidfile.parent.mkdir(parents=True, exist_ok=True)

    routes_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        with routes_path.open("x", encoding="utf-8") as f:
            f.write("{}\n")
        import logging

        logging.getLogger(__name__).info(
            "Created empty routes file: %s — add routes via 'terokctl auth <provider>'",
            routes_path,
        )
    except FileExistsError:
        pass

    ssh_keys_path = c.ssh_keys_json_path
    ssh_keys_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        with ssh_keys_path.open("x", encoding="utf-8") as f:
            f.write("{}\n")
    except FileExistsError:
        pass

    log_file = c.state_dir / "proxy" / "credential-proxy.log"
    log_file.parent.mkdir(parents=True, exist_ok=True)

    cmd = [
        *_proxy_exec_prefix(),
        f"--socket-path={sock_path}",
        f"--db-path={db_path}",
        f"--routes-file={routes_path}",
        f"--pid-file={pidfile}",
        f"--port={c.proxy_port}",
        f"--ssh-agent-port={c.ssh_agent_port}",
        f"--ssh-keys-file={ssh_keys_path}",
        f"--log-file={log_file}",
        "--log-level=DEBUG",
    ]

    # Fork into background so the proxy survives shell exit.
    # The server writes its own PID file via --pid-file.
    # stderr=PIPE only for the startup-failure detection window; the pipe is
    # closed immediately after so the daemon's stderr does not block on a full buffer.
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.PIPE,
        start_new_session=True,
    )

    # Poll the /-/health endpoint until the server is actually ready.
    if _wait_for_ready(c.proxy_port):
        # Close our end of the pipe — the daemon logs to the log file, not stderr.
        proc.stderr.close()
        return

    # Timed out — check whether the process crashed or is just slow.
    ret = proc.poll()
    if ret is not None:
        stderr = (proc.stderr.read() or b"").decode(errors="replace").strip()
        msg = f"Credential proxy failed to start (exit {ret})"
        if stderr:
            msg += f":\n{stderr}"
        raise SystemExit(msg)
    proc.stderr.close()
    raise SystemExit(
        "Credential proxy process started but did not become ready within 5 s.\n"
        f"Check logs or try: curl http://127.0.0.1:{c.proxy_port}{_HEALTH_PATH}"
    )

stop_proxy(cfg=None)

Stop the managed proxy daemon by sending SIGTERM.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def stop_daemon(cfg: SandboxConfig | None = None) -> None:
    """Stop the managed proxy daemon by sending SIGTERM."""
    pidfile = _pid_file(cfg)
    if not pidfile.is_file():
        return
    try:
        pid = int(pidfile.read_text().strip())
        if _is_managed_proxy(pid, cfg):
            os.kill(pid, signal.SIGTERM)
    except (ValueError, ProcessLookupError, PermissionError):
        pass
    finally:
        if pidfile.is_file():
            pidfile.unlink()

uninstall_proxy_systemd(cfg=None)

Disable+stop the socket and remove unit files.

Source code in src/terok_sandbox/credential_proxy_lifecycle.py
def uninstall_systemd_units(cfg: SandboxConfig | None = None) -> None:  # noqa: ARG001
    """Disable+stop the socket and remove unit files."""
    unit_dir = _systemd_unit_dir()

    subprocess.run(
        ["systemctl", "--user", "disable", "--now", _SOCKET_UNIT],
        check=False,
        timeout=10,
    )

    for name in (_SOCKET_UNIT, _SERVICE_UNIT):
        unit_file = unit_dir / name
        if unit_file.is_file():
            unit_file.unlink()

    subprocess.run(["systemctl", "--user", "daemon-reload"], check=False, timeout=10)

sandbox_doctor_checks(*, proxy_port=None, ssh_agent_port=None, desired_shield_state=None)

Return sandbox-level health checks for in-container diagnostics.

Parameters:

Name Type Description Default
proxy_port int | None

Credential proxy TCP port (skip check if None).

None
ssh_agent_port int | None

SSH agent TCP port (skip check if None).

None
desired_shield_state str | None

Expected shield state from shield_desired_state file ("up", "down", "down_all", or None to skip).

None

Returns:

Type Description
list[DoctorCheck]

List of :class:DoctorCheck instances ready for orchestration.

Source code in src/terok_sandbox/doctor.py
def sandbox_doctor_checks(
    *,
    proxy_port: int | None = None,
    ssh_agent_port: int | None = None,
    desired_shield_state: str | None = None,
) -> list[DoctorCheck]:
    """Return sandbox-level health checks for in-container diagnostics.

    Args:
        proxy_port: Credential proxy TCP port (skip check if ``None``).
        ssh_agent_port: SSH agent TCP port (skip check if ``None``).
        desired_shield_state: Expected shield state from ``shield_desired_state``
            file (``"up"``, ``"down"``, ``"down_all"``, or ``None`` to skip).

    Returns:
        List of :class:`DoctorCheck` instances ready for orchestration.
    """
    checks: list[DoctorCheck] = []
    if proxy_port is not None:
        checks.append(_make_proxy_check(proxy_port))
    if ssh_agent_port is not None:
        checks.append(_make_ssh_agent_check(ssh_agent_port))
    checks.append(_make_shield_check(desired_shield_state))
    return checks

check_units_outdated(cfg=None)

Return a warning string if installed systemd units are stale, else None.

Checks both the unit version stamp and the baked --base-path against the current configuration. Useful for gate-server status and sickbay to surface upgrade hints without blocking task creation (that's ensure_server_reachable's job).

Source code in src/terok_sandbox/gate_server.py
def check_units_outdated(cfg: SandboxConfig | None = None) -> str | None:
    """Return a warning string if installed systemd units are stale, else ``None``.

    Checks both the unit version stamp and the baked ``--base-path`` against
    the current configuration.  Useful for ``gate-server status`` and
    ``sickbay`` to surface upgrade hints without blocking task creation
    (that's ``ensure_server_reachable``'s job).
    """
    if not is_socket_installed():
        return None
    installed = _installed_unit_version()
    if installed is None or installed < _UNIT_VERSION:
        installed_label = "unversioned" if installed is None else f"v{installed}"
        return (
            f"Systemd units are outdated (installed {installed_label}, expected v{_UNIT_VERSION})."
        )
    return _base_path_diverged(cfg)

ensure_server_reachable(cfg=None)

Verify the gate server is running and configured correctly.

Raises SystemExit if the server is down, systemd units are outdated, or the installed base path diverges from the current configuration. Called before task creation to fail early with an actionable message.

Source code in src/terok_sandbox/gate_server.py
def ensure_server_reachable(cfg: SandboxConfig | None = None) -> None:
    """Verify the gate server is running and configured correctly.

    Raises ``SystemExit`` if the server is down, systemd units are outdated,
    or the installed base path diverges from the current configuration.
    Called before task creation to fail early with an actionable message.
    """
    server_status = get_server_status(cfg)
    if server_status.running:
        if server_status.mode == "systemd":
            installed = _installed_unit_version()
            if installed is None or installed < _UNIT_VERSION:
                installed_label = "unversioned" if installed is None else f"v{installed}"
                raise SystemExit(
                    "Gate server systemd units are outdated "
                    f"(installed {installed_label}, expected v{_UNIT_VERSION})."
                )
            path_warning = _base_path_diverged(cfg)
            if path_warning:
                raise SystemExit(path_warning)
        return

    msg = (
        "Gate server is not running.\n"
        "\n"
        "The gate server serves git repos to task containers over the network,\n"
        "replacing the previous volume-mount approach.\n"
        "\n"
    )
    if is_systemd_available():
        msg += "Recommended: install and start the systemd socket.\n"
    else:
        msg += "Start the gate daemon.\n"
    raise SystemExit(msg)

get_gate_base_path(cfg=None)

Return the gate base path (public API).

Source code in src/terok_sandbox/gate_server.py
def get_gate_base_path(cfg: SandboxConfig | None = None) -> Path:
    """Return the gate base path (public API)."""
    return _get_gate_base_path(cfg)

get_gate_server_port(cfg=None)

Return the configured gate server port.

Source code in src/terok_sandbox/gate_server.py
def get_gate_server_port(cfg: SandboxConfig | None = None) -> int:
    """Return the configured gate server port."""
    return _get_port(cfg)

get_server_status(cfg=None)

Return the current gate server status.

Source code in src/terok_sandbox/gate_server.py
def get_server_status(cfg: SandboxConfig | None = None) -> GateServerStatus:
    """Return the current gate server status."""
    port = _get_port(cfg)

    if is_socket_installed():
        if is_socket_active():
            return GateServerStatus(mode="systemd", running=True, port=port)
        # Socket installed but inactive — check if the daemon fallback is running
        if is_daemon_running(cfg):
            return GateServerStatus(mode="daemon", running=True, port=port)
        return GateServerStatus(mode="systemd", running=False, port=port)

    if is_daemon_running(cfg):
        return GateServerStatus(mode="daemon", running=True, port=port)

    return GateServerStatus(mode="none", running=False, port=port)

install_systemd_units(cfg=None)

Render and install systemd socket+service units, then enable+start the socket.

Source code in src/terok_sandbox/gate_server.py
def install_systemd_units(cfg: SandboxConfig | None = None) -> None:
    """Render and install systemd socket+service units, then enable+start the socket."""
    import shutil

    import terok_sandbox.gate

    from ._util import render_template
    from .gate_tokens import token_file_path

    gate_bin = shutil.which("terok-gate")
    if not gate_bin:
        raise SystemExit(
            "Cannot find 'terok-gate' on PATH.\n"
            "Ensure terok-sandbox is installed (pip/pipx/poetry) and the binary is accessible."
        )

    unit_dir = _systemd_unit_dir()
    unit_dir.mkdir(parents=True, exist_ok=True)

    resource_dir = Path(terok_sandbox.gate.__file__).resolve().parent / "resources" / "systemd"
    variables = {
        "PORT": str(_get_port(cfg)),
        "GATE_BASE_PATH": str(_get_gate_base_path(cfg)),
        "TOKEN_FILE": str(token_file_path(cfg)),
        "UNIT_VERSION": str(_UNIT_VERSION),
        "TEROK_GATE_BIN": gate_bin,
    }

    for template_name in (_SOCKET_UNIT, "terok-gate@.service"):
        template_path = resource_dir / template_name
        if not template_path.is_file():
            raise SystemExit(f"Missing systemd template: {template_path}")
        content = render_template(template_path, variables)
        (unit_dir / template_name).write_text(content, encoding="utf-8")

    subprocess.run(["systemctl", "--user", "daemon-reload"], check=True, timeout=10)
    subprocess.run(
        ["systemctl", "--user", "enable", "--now", _SOCKET_UNIT],
        check=True,
        timeout=10,
    )

is_daemon_running(cfg=None)

Check whether the managed daemon process is alive via its PID file.

Source code in src/terok_sandbox/gate_server.py
def is_daemon_running(cfg: SandboxConfig | None = None) -> bool:
    """Check whether the managed daemon process is alive via its PID file."""
    pidfile = _pid_file(cfg)
    if not pidfile.is_file():
        return False
    try:
        pid = int(pidfile.read_text().strip())
        if not _is_managed_server(pid):
            return False
        os.kill(pid, 0)  # signal 0 = existence check
        return True
    except (ValueError, ProcessLookupError, PermissionError):
        return False

is_systemd_available()

Check whether systemctl --user is usable.

Uses is-system-running which returns well-defined exit codes: 0 = running, 1 = degraded/starting/stopping — both mean systemd is present. Any other code (or missing binary) means unavailable.

Source code in src/terok_sandbox/gate_server.py
def is_systemd_available() -> bool:
    """Check whether ``systemctl --user`` is usable.

    Uses ``is-system-running`` which returns well-defined exit codes:
    0 = running, 1 = degraded/starting/stopping — both mean systemd is
    present.  Any other code (or missing binary) means unavailable.
    """
    try:
        result = subprocess.run(
            ["systemctl", "--user", "is-system-running"],
            capture_output=True,
            text=True,
            timeout=5,
        )
        # "running" (0), "degraded" (1), "starting" (1), "stopping" (1)
        # all indicate a usable user session.
        return result.returncode in (0, 1)
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return False

start_daemon(port=None, cfg=None)

Start a terok-gate daemon process (non-systemd fallback).

Writes a PID file to runtime_root() / "gate-server.pid". If TEROK_GATE_ADMIN_TOKEN is set in the environment, it is forwarded to the daemon for host-level access to all repos.

Source code in src/terok_sandbox/gate_server.py
def start_daemon(port: int | None = None, cfg: SandboxConfig | None = None) -> None:
    """Start a ``terok-gate`` daemon process (non-systemd fallback).

    Writes a PID file to ``runtime_root() / "gate-server.pid"``.
    If ``TEROK_GATE_ADMIN_TOKEN`` is set in the environment, it is
    forwarded to the daemon for host-level access to all repos.
    """
    from .gate_tokens import token_file_path

    effective_port = port or _get_port(cfg)
    gate_base = _get_gate_base_path(cfg)
    gate_base.mkdir(parents=True, exist_ok=True)
    pidfile = _pid_file(cfg)
    pidfile.parent.mkdir(parents=True, exist_ok=True)

    cmd = [
        "terok-gate",
        f"--base-path={gate_base}",
        f"--token-file={token_file_path()}",
        f"--port={effective_port}",
        "--detach",
        f"--pid-file={pidfile}",
    ]
    admin_token = os.environ.get("TEROK_GATE_ADMIN_TOKEN")
    if admin_token:
        cmd.append(f"--admin-token={admin_token}")
    bind_addr = os.environ.get("TEROK_GATE_BIND")
    if bind_addr:
        cmd.append(f"--bind={bind_addr}")

    subprocess.run(cmd, check=True, timeout=10)

stop_daemon(cfg=None)

Stop the managed daemon by reading the PID file and sending SIGTERM.

Source code in src/terok_sandbox/gate_server.py
def stop_daemon(cfg: SandboxConfig | None = None) -> None:
    """Stop the managed daemon by reading the PID file and sending SIGTERM."""
    pidfile = _pid_file(cfg)
    if not pidfile.is_file():
        return
    try:
        pid = int(pidfile.read_text().strip())
        if _is_managed_server(pid):
            os.kill(pid, signal.SIGTERM)
    except (ValueError, ProcessLookupError, PermissionError):
        pass
    finally:
        if pidfile.is_file():
            pidfile.unlink()

uninstall_systemd_units(cfg=None)

Disable+stop the socket and remove unit files.

Source code in src/terok_sandbox/gate_server.py
def uninstall_systemd_units(cfg: SandboxConfig | None = None) -> None:  # noqa: ARG001
    """Disable+stop the socket and remove unit files."""
    unit_dir = _systemd_unit_dir()

    subprocess.run(
        ["systemctl", "--user", "disable", "--now", _SOCKET_UNIT],
        check=False,
        timeout=10,
    )
    subprocess.run(["systemctl", "--user", "daemon-reload"], check=False, timeout=10)

    for name in (_SOCKET_UNIT, "terok-gate@.service"):
        unit_file = unit_dir / name
        if unit_file.is_file():
            unit_file.unlink()

    subprocess.run(["systemctl", "--user", "daemon-reload"], check=False, timeout=10)

create_token(project_id, task_id, cfg=None)

Generate a 128-bit hex token, persist atomically, and return it.

Uses secrets.token_hex(16) for cryptographic randomness. Atomic write via tempfile + os.replace().

Source code in src/terok_sandbox/gate_tokens.py
def create_token(project_id: str, task_id: str, cfg: SandboxConfig | None = None) -> str:
    """Generate a 128-bit hex token, persist atomically, and return it.

    Uses ``secrets.token_hex(16)`` for cryptographic randomness.
    Atomic write via ``tempfile`` + ``os.replace()``.
    """
    token = f"terok-g-{secrets.token_hex(16)}"
    path = token_file_path(cfg)
    with _token_lock(path):
        tokens = _read_tokens(path)
        tokens[token] = {"project": project_id, "task": task_id}
        _write_tokens(path, tokens)
    return token

revoke_token_for_task(project_id, task_id, cfg=None)

Remove all tokens for the given project+task pair. Idempotent.

Source code in src/terok_sandbox/gate_tokens.py
def revoke_token_for_task(project_id: str, task_id: str, cfg: SandboxConfig | None = None) -> None:
    """Remove all tokens for the given project+task pair.  Idempotent."""
    path = token_file_path(cfg)
    with _token_lock(path):
        tokens = _read_tokens(path)
        to_remove = [
            t
            for t, info in tokens.items()
            if info.get("project") == project_id and info.get("task") == task_id
        ]
        if not to_remove:
            return
        for t in to_remove:
            del tokens[t]
        _write_tokens(path, tokens)

credentials_root()

Shared credentials directory used by all terok ecosystem packages.

Priority: TEROK_CREDENTIALS_DIR/var/lib/terok/credentials (root) → XDG data dir.

Source code in src/terok_sandbox/paths.py
def credentials_root() -> Path:
    """Shared credentials directory used by all terok ecosystem packages.

    Priority: ``TEROK_CREDENTIALS_DIR`` → ``/var/lib/terok/credentials`` (root)
    → XDG data dir.
    """
    env = os.getenv("TEROK_CREDENTIALS_DIR")
    if env:
        return Path(env).expanduser()
    if _is_root():
        return Path("/var/lib") / _UMBRELLA / _CRED_SUBDIR
    if _user_data_dir is not None:
        return Path(_user_data_dir(_UMBRELLA)) / _CRED_SUBDIR
    xdg = os.getenv("XDG_DATA_HOME")
    if xdg:
        return Path(xdg) / _UMBRELLA / _CRED_SUBDIR
    return Path.home() / ".local" / "share" / _UMBRELLA / _CRED_SUBDIR

umbrella_config_root()

Return the top-level terok config root (umbrella, not sandbox-scoped).

Used for cross-package paths like shield profiles that live under the shared ~/.config/terok/ umbrella rather than under any single package's config directory.

Source code in src/terok_sandbox/paths.py
def umbrella_config_root() -> Path:
    """Return the top-level terok config root (umbrella, not sandbox-scoped).

    Used for cross-package paths like shield profiles that live under
    the shared ``~/.config/terok/`` umbrella rather than under any single
    package's config directory.
    """
    env = os.getenv("TEROK_CONFIG_DIR")
    if env:
        return Path(env).expanduser()
    if _is_root():
        return Path("/etc") / _UMBRELLA
    if _user_config_dir is not None:
        return Path(_user_config_dir(_UMBRELLA))
    return Path.home() / ".config" / _UMBRELLA

bypass_network_args(gate_port)

Return podman network args for running without shield.

Replicates the networking that terok-shield's OCI hook normally provides (allowing the container to reach host.containers.internal for the gate server) but without nftables rules, annotations, or cap-drops.

This is a dangerous fallback for environments where shield can't run. All egress is unfiltered.

Source code in src/terok_sandbox/runtime.py
def bypass_network_args(gate_port: int) -> list[str]:
    """Return podman network args for running without shield.

    Replicates the networking that terok-shield's OCI hook normally provides
    (allowing the container to reach ``host.containers.internal`` for the gate
    server) but without nftables rules, annotations, or cap-drops.

    This is a **dangerous fallback** for environments where shield can't run.
    All egress is unfiltered.
    """
    if os.geteuid() == 0:
        return []
    if _detect_rootless_network_mode() == "slirp4netns":
        return [
            "--network",
            "slirp4netns:allow_host_loopback=true",
            "--add-host",
            f"host.containers.internal:{_SLIRP_GATEWAY}",
        ]
    return [
        "--network",
        f"pasta:--map-host-loopback,{_PASTA_HOST_LOOPBACK_MAP}",
        "--add-host",
        f"host.containers.internal:{_PASTA_HOST_LOOPBACK_MAP}",
    ]

find_free_port(host='127.0.0.1')

Find and return a free TCP port on host.

Releases the socket immediately — there is a small race window before the caller binds the port. This is the standard approach when passing a port number to an external process (e.g. podman run -p).

Source code in src/terok_sandbox/runtime.py
def find_free_port(host: str = "127.0.0.1") -> int:
    """Find and return a free TCP port on *host*.

    Releases the socket immediately — there is a small race window before
    the caller binds the port.  This is the standard approach when passing
    a port number to an external process (e.g. ``podman run -p``).
    """
    s, port = reserve_free_port(host)
    s.close()
    return port

get_container_state(cname)

Return container state ('running', 'exited', ...) or None if not found.

Source code in src/terok_sandbox/runtime.py
def get_container_state(cname: str) -> str | None:
    """Return container state ('running', 'exited', ...) or ``None`` if not found."""
    try:
        out = subprocess.check_output(
            ["podman", "inspect", "-f", "{{.State.Status}}", cname],
            stderr=subprocess.DEVNULL,
            text=True,
        ).strip()
        return out.lower() if out else None
    except (subprocess.CalledProcessError, FileNotFoundError):
        return None

get_project_container_states(name_prefix)

Return {container_name: state} for all containers matching name_prefix.

Uses a single podman ps -a call with a name filter instead of per-container podman inspect calls. Returns an empty dict when podman is unavailable.

Source code in src/terok_sandbox/runtime.py
def get_project_container_states(name_prefix: str) -> dict[str, str]:
    """Return ``{container_name: state}`` for all containers matching *name_prefix*.

    Uses a single ``podman ps -a`` call with a name filter instead of
    per-container ``podman inspect`` calls.  Returns an empty dict when
    podman is unavailable.
    """
    try:
        out = subprocess.check_output(
            [
                "podman",
                "ps",
                "-a",
                "--filter",
                f"name=^{name_prefix}-",
                "--format",
                "{{.Names}} {{.State}}",
                "--no-trunc",
            ],
            stderr=subprocess.DEVNULL,
            text=True,
        )
    except (subprocess.CalledProcessError, FileNotFoundError):
        return {}

    result: dict[str, str] = {}
    for line in out.strip().splitlines():
        parts = line.split(None, 1)
        if len(parts) == 2:
            result[parts[0]] = parts[1].lower()
    return result

gpu_run_args(*, enabled=False)

Return additional podman run args to enable NVIDIA GPU passthrough.

The caller is responsible for determining whether GPUs are enabled (e.g. by reading project configuration). This function only maps the boolean flag to the appropriate podman CLI arguments.

Source code in src/terok_sandbox/runtime.py
def gpu_run_args(*, enabled: bool = False) -> list[str]:
    """Return additional ``podman run`` args to enable NVIDIA GPU passthrough.

    The caller is responsible for determining whether GPUs are enabled
    (e.g. by reading project configuration).  This function only maps
    the boolean flag to the appropriate podman CLI arguments.
    """
    if not enabled:
        return []

    return [
        "--device",
        "nvidia.com/gpu=all",
        "-e",
        "NVIDIA_VISIBLE_DEVICES=all",
        "-e",
        "NVIDIA_DRIVER_CAPABILITIES=all",
    ]

is_container_running(cname)

Return True if the named container is currently running.

Source code in src/terok_sandbox/runtime.py
def is_container_running(cname: str) -> bool:
    """Return ``True`` if the named container is currently running."""
    try:
        out = subprocess.check_output(
            ["podman", "inspect", "-f", "{{.State.Running}}", cname],
            stderr=subprocess.DEVNULL,
            text=True,
        ).strip()
    except (subprocess.CalledProcessError, FileNotFoundError):
        return False
    return out.lower() == "true"

podman_userns_args()

Return user namespace args for rootless podman so UID 1000 maps correctly.

Maps the host user to container UID/GID 1000, the conventional non-root dev user in terok container images.

Source code in src/terok_sandbox/runtime.py
def podman_userns_args() -> list[str]:
    """Return user namespace args for rootless podman so UID 1000 maps correctly.

    Maps the host user to container UID/GID 1000, the conventional non-root
    ``dev`` user in terok container images.
    """
    if os.geteuid() == 0:
        return []
    return ["--userns=keep-id:uid=1000,gid=1000"]

redact_env_args(cmd)

Return a copy of cmd with sensitive -e KEY=VALUE args redacted.

Handles the two-arg form (-e KEY=VALUE) produced by :meth:~.sandbox.Sandbox.run. Does not handle --env, -e=KEY=VALUE, or --env=KEY=VALUE — callers passing sensitive values via extra_args must pre-redact them.

Source code in src/terok_sandbox/runtime.py
def redact_env_args(cmd: list[str]) -> list[str]:
    """Return a copy of *cmd* with sensitive ``-e KEY=VALUE`` args redacted.

    Handles the two-arg form (``-e KEY=VALUE``) produced by
    :meth:`~.sandbox.Sandbox.run`.  Does not handle ``--env``,
    ``-e=KEY=VALUE``, or ``--env=KEY=VALUE`` — callers passing sensitive
    values via ``extra_args`` must pre-redact them.
    """
    out: list[str] = []
    redact_next = False
    for arg in cmd:
        if redact_next:
            key, _, _val = arg.partition("=")
            if _SENSITIVE_KEY_RE.search(key) or key in _ALWAYS_REDACT_KEYS:
                out.append(f"{key}=<redacted>")
            else:
                out.append(arg)
            redact_next = False
        elif arg == "-e":
            out.append(arg)
            redact_next = True
        else:
            out.append(arg)
    return out

reserve_free_port(host='127.0.0.1')

Reserve a TCP port on host and return (socket, port).

The socket stays open — the caller holds the reservation until they close it (typically right before binding the actual service). Useful for Python-native servers that can accept a pre-bound socket.

Source code in src/terok_sandbox/runtime.py
def reserve_free_port(host: str = "127.0.0.1") -> tuple[socket.socket, int]:
    """Reserve a TCP port on *host* and return ``(socket, port)``.

    The socket stays open — the caller holds the reservation until they
    close it (typically right before binding the actual service).  Useful
    for Python-native servers that can accept a pre-bound socket.
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        s.bind((host, 0))
        return s, s.getsockname()[1]
    except BaseException:
        s.close()
        raise

stop_task_containers(container_names)

Best-effort podman rm -f of the given containers.

Ignores all errors so that task deletion succeeds even when podman is absent or the containers are already gone.

Source code in src/terok_sandbox/runtime.py
def stop_task_containers(container_names: list[str]) -> None:
    """Best-effort ``podman rm -f`` of the given containers.

    Ignores all errors so that task deletion succeeds even when podman is
    absent or the containers are already gone.
    """
    for name in container_names:
        try:
            log_debug(f"stop_containers: podman rm -f {name} (start)")
            subprocess.run(
                ["podman", "rm", "-f", name],
                check=False,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
                timeout=120,
            )
            log_debug(f"stop_containers: podman rm -f {name} (done)")
        except Exception as exc:
            log_debug(f"stop_containers: podman rm -f {name} failed: {exc}")

stream_initial_logs(container_name, timeout_sec, ready_check)

Stream logs until ready marker is seen or timeout.

Returns True if the ready marker was found, False on timeout.

Source code in src/terok_sandbox/runtime.py
def stream_initial_logs(
    container_name: str,
    timeout_sec: float | None,
    ready_check: Callable[[str], bool],
) -> bool:
    """Stream logs until ready marker is seen or timeout.

    Returns ``True`` if the ready marker was found, ``False`` on timeout.
    """
    import select
    import sys
    import threading
    import time

    holder: list[bool] = [False]
    stop_event = threading.Event()
    proc_holder: list[subprocess.Popen | None] = [None]

    def _stream_logs() -> None:
        """Follow container logs in a thread, setting *holder[0]* on ready."""
        try:
            proc = subprocess.Popen(
                ["podman", "logs", "-f", container_name],
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
            )
            proc_holder[0] = proc
            start_time = time.time()
            buf = b""

            while not stop_event.is_set():
                if timeout_sec is not None and time.time() - start_time >= timeout_sec:
                    break
                if proc.poll() is not None:
                    remaining = proc.stdout.read()
                    if remaining:
                        buf += remaining
                    break
                try:
                    ready, _, _ = select.select([proc.stdout], [], [], 0.2)
                    if not ready:
                        continue
                    chunk = proc.stdout.read1(4096) if hasattr(proc.stdout, "read1") else b""
                    if not chunk:
                        continue
                    buf += chunk
                except Exception as exc:
                    log_warning(f"_stream_initial_logs read error: {exc}")
                    break

                while b"\n" in buf:
                    raw_line, buf = buf.split(b"\n", 1)
                    line = raw_line.decode("utf-8", errors="replace").strip()
                    if line:
                        print(line, file=sys.stdout, flush=True)
                        if ready_check(line):
                            holder[0] = True
                            proc.terminate()
                            return

            if buf:
                line = buf.decode("utf-8", errors="replace").strip()
                if line:
                    print(line, file=sys.stdout, flush=True)
                    if ready_check(line):
                        holder[0] = True

            proc.terminate()
        except Exception as exc:
            log_warning(f"_stream_initial_logs error: {exc}")

    stream_thread = threading.Thread(target=_stream_logs)
    stream_thread.start()
    stream_thread.join(timeout_sec)

    if stream_thread.is_alive():
        stop_event.set()
        proc = proc_holder[0]
        if proc is not None:
            proc.terminate()
        stream_thread.join(timeout=5)

    return holder[0]

wait_for_exit(cname, timeout_sec=None)

Wait for a container to exit and return its exit code.

Returns 124 on timeout, 1 if podman is not found.

Source code in src/terok_sandbox/runtime.py
def wait_for_exit(cname: str, timeout_sec: float | None = None) -> int:
    """Wait for a container to exit and return its exit code.

    Returns 124 on timeout, 1 if podman is not found.
    """
    try:
        proc = subprocess.run(
            ["podman", "wait", cname],
            check=False,
            capture_output=True,
            timeout=timeout_sec,
        )
        stdout = proc.stdout.decode().strip() if isinstance(proc.stdout, bytes) else proc.stdout
        if stdout:
            return int(stdout)
        return proc.returncode
    except subprocess.TimeoutExpired:
        return 124
    except (FileNotFoundError, ValueError):
        return 1

check_environment(cfg=None)

Check the podman environment for shield compatibility.

Returns a synthetic :class:EnvironmentCheck with bypass info when the dangerous bypass override is active.

Source code in src/terok_sandbox/shield.py
def check_environment(cfg: SandboxConfig | None = None) -> EnvironmentCheck:
    """Check the podman environment for shield compatibility.

    Returns a synthetic :class:`EnvironmentCheck` with bypass info when the
    dangerous bypass override is active.
    """
    if _cfg(cfg).shield_bypass:
        return EnvironmentCheck(
            ok=False,
            health="bypass",
            issues=["bypass_firewall_no_protection is set — egress firewall disabled"],
        )
    with tempfile.TemporaryDirectory() as tmp:
        return make_shield(Path(tmp), cfg).check_environment()

down(container, task_dir, *, allow_all=False, cfg=None)

Set shield to bypass mode (allow egress) for a running container.

When allow_all is True, also permits private-range (RFC 1918) traffic.

Source code in src/terok_sandbox/shield.py
def down(
    container: str, task_dir: Path, *, allow_all: bool = False, cfg: SandboxConfig | None = None
) -> None:
    """Set shield to bypass mode (allow egress) for a running container.

    When *allow_all* is True, also permits private-range (RFC 1918) traffic.
    """
    if _cfg(cfg).shield_bypass:
        return
    make_shield(task_dir, cfg).down(container, allow_all=allow_all)

make_shield(task_dir, cfg=None)

Construct a per-task :class:Shield from sandbox configuration.

Builds a :class:ShieldConfig with state_dir scoped to task_dir.

Source code in src/terok_sandbox/shield.py
def make_shield(task_dir: Path, cfg: SandboxConfig | None = None) -> Shield:
    """Construct a per-task :class:`Shield` from sandbox configuration.

    Builds a :class:`ShieldConfig` with ``state_dir`` scoped to *task_dir*.
    """
    c = _cfg(cfg)
    config = ShieldConfig(
        state_dir=task_dir / "shield",
        mode=ShieldMode.HOOK,
        default_profiles=c.shield_profiles,
        loopback_ports=(c.gate_port, c.proxy_port, c.ssh_agent_port),
        audit_enabled=c.shield_audit,
        profiles_dir=c.shield_profiles_dir,
    )
    return Shield(config)

pre_start(container, task_dir, cfg=None)

Return extra podman run args for egress firewalling.

Returns an empty list (no firewall args) when the dangerous bypass_firewall_no_protection override is active.

Raises :class:SystemExit with setup instructions when the podman environment requires one-time hook installation.

Source code in src/terok_sandbox/shield.py
def pre_start(container: str, task_dir: Path, cfg: SandboxConfig | None = None) -> list[str]:
    """Return extra ``podman run`` args for egress firewalling.

    Returns an empty list (no firewall args) when the dangerous
    ``bypass_firewall_no_protection`` override is active.

    Raises :class:`SystemExit` with setup instructions when the
    podman environment requires one-time hook installation.
    """
    if _cfg(cfg).shield_bypass:
        warnings.warn(_BYPASS_WARNING, stacklevel=2)
        return []
    try:
        return make_shield(task_dir, cfg).pre_start(container)
    except ShieldNeedsSetup as exc:
        raise SystemExit(str(exc)) from None

run_setup(*, root=False, user=False)

Install global OCI hooks for shield egress firewalling.

Global hooks are required on all podman versions to survive container stop/start cycles (terok-shield#122).

Raises :class:SystemExit when neither --root nor --user is given.

Source code in src/terok_sandbox/shield.py
def run_setup(*, root: bool = False, user: bool = False) -> None:
    """Install global OCI hooks for shield egress firewalling.

    Global hooks are required on all podman versions to survive
    container stop/start cycles (terok-shield#122).

    Raises :class:`SystemExit` when neither ``--root`` nor ``--user`` is given.
    """
    if not root and not user:
        raise SystemExit(
            "Specify --root (system-wide, uses sudo) or --user (user-local).\n"
            "  shield setup --root   # /etc/containers/oci/hooks.d\n"
            "  shield setup --user   # ~/.local/share/containers/oci/hooks.d"
        )
    setup_hooks_direct(root=root)

setup_hooks_direct(*, root=False)

Install global hooks via the terok-shield Python API (no subprocess).

Suitable for TUI callers that need direct control. Installs hooks to the system directory (with sudo) when root is True, otherwise to the user directory.

Source code in src/terok_sandbox/shield.py
def setup_hooks_direct(*, root: bool = False) -> None:
    """Install global hooks via the terok-shield Python API (no subprocess).

    Suitable for TUI callers that need direct control.  Installs hooks
    to the system directory (with sudo) when *root* is True, otherwise
    to the user directory.
    """
    if root:
        target = system_hooks_dir()
        setup_global_hooks(target, use_sudo=True)
    else:
        target = Path(USER_HOOKS_DIR).expanduser()
        setup_global_hooks(target)
        ensure_containers_conf_hooks_dir(target)

state(container, task_dir, cfg=None)

Return the live shield state for a running container.

Queries actual nft state even when bypass is set, because containers started before bypass was enabled may still have active rules.

Source code in src/terok_sandbox/shield.py
def state(container: str, task_dir: Path, cfg: SandboxConfig | None = None) -> ShieldState:
    """Return the live shield state for a running container.

    Queries actual nft state even when bypass is set, because containers
    started *before* bypass was enabled may still have active rules.
    """
    return make_shield(task_dir, cfg).state(container)

status(cfg=None)

Return shield status dict from the sandbox configuration.

Source code in src/terok_sandbox/shield.py
def status(cfg: SandboxConfig | None = None) -> dict:
    """Return shield status dict from the sandbox configuration."""
    c = _cfg(cfg)
    result: dict = {
        "mode": "hook",
        "profiles": list(c.shield_profiles),
        "audit_enabled": c.shield_audit,
    }
    if c.shield_bypass:
        result["bypass_firewall_no_protection"] = True
    return result

up(container, task_dir, cfg=None)

Set shield to deny-all mode for a running container.

Source code in src/terok_sandbox/shield.py
def up(container: str, task_dir: Path, cfg: SandboxConfig | None = None) -> None:
    """Set shield to deny-all mode for a running container."""
    if _cfg(cfg).shield_bypass:
        return
    make_shield(task_dir, cfg).up(container)

generate_keypair(key_type, priv_path, pub_path, comment)

Generate an SSH keypair via ssh-keygen.

Removes any stale half-existing files first, then invokes ssh-keygen with the given comment embedded in the public key.

Source code in src/terok_sandbox/ssh.py
def generate_keypair(key_type: str, priv_path: Path, pub_path: Path, comment: str) -> None:
    """Generate an SSH keypair via ``ssh-keygen``.

    Removes any stale half-existing files first, then invokes
    ``ssh-keygen`` with the given *comment* embedded in the public key.
    """
    for p in (priv_path, pub_path):
        p.unlink(missing_ok=True)

    cmd = [
        "ssh-keygen",
        "-t",
        key_type,
        "-f",
        str(priv_path),
        "-N",
        "",
        "-C",
        comment,
    ]
    try:
        subprocess.run(cmd, check=True)
    except FileNotFoundError:
        raise SystemExit("ssh-keygen not found. Please install OpenSSH client tools.")
    except subprocess.CalledProcessError as e:
        raise SystemExit(f"ssh-keygen failed: {e}")

update_ssh_keys_json(keys_json_path, project_id, result)

Update the SSH key mapping JSON with a project's key paths.

The JSON file maps project IDs to their SSH key file paths, similar to how routes.json maps provider names to proxy routes. The credential proxy's SSH agent handler reads this file to locate the private key for signing requests.

Key management rules (keyed by private_key path):

  • No existing entry: write a single-dict entry (simple case).
  • Same private_key path: replace in-place (idempotent re-run of ssh-init).
  • Different private_key path: expand to / append to a list, so a project can hold multiple independent SSH keys (e.g. GitHub + GitLab).

Uses fcntl.flock to prevent concurrent ssh-init invocations from corrupting the file.

Source code in src/terok_sandbox/ssh.py
def update_ssh_keys_json(keys_json_path: Path, project_id: str, result: SSHInitResult) -> None:
    """Update the SSH key mapping JSON with a project's key paths.

    The JSON file maps project IDs to their SSH key file paths, similar
    to how ``routes.json`` maps provider names to proxy routes.  The
    credential proxy's SSH agent handler reads this file to locate the
    private key for signing requests.

    Key management rules (keyed by ``private_key`` path):

    - **No existing entry**: write a single-dict entry (simple case).
    - **Same private_key path**: replace in-place (idempotent re-run of ``ssh-init``).
    - **Different private_key path**: expand to / append to a list, so a project can
      hold multiple independent SSH keys (e.g. GitHub + GitLab).

    Uses ``fcntl.flock`` to prevent concurrent ``ssh-init`` invocations
    from corrupting the file.
    """
    new_entry: dict[str, str] = {
        "private_key": result["private_key"],
        "public_key": result["public_key"],
    }
    keys_json_path.parent.mkdir(parents=True, exist_ok=True)
    fd = os.open(str(keys_json_path), os.O_RDWR | os.O_CREAT, 0o600)
    try:
        fcntl.flock(fd, fcntl.LOCK_EX)
        chunks: list[bytes] = []
        while chunk := os.read(fd, 8192):
            chunks.append(chunk)
        raw = b"".join(chunks)
        mapping: dict = json.loads(raw) if raw.strip() else {}
        entries: list[dict[str, str]] = mapping.get(project_id) or []
        if not isinstance(entries, list):
            entries = []
        for i, entry in enumerate(entries):
            if isinstance(entry, dict) and entry.get("private_key") == new_entry["private_key"]:
                entries[i] = new_entry  # same path — idempotent update
                break
        else:
            entries.append(new_entry)  # new path — append
        mapping[project_id] = entries
        data = (json.dumps(mapping, indent=2) + "\n").encode("utf-8")
        os.lseek(fd, 0, os.SEEK_SET)
        os.ftruncate(fd, 0)
        os.write(fd, data)
    finally:
        fcntl.flock(fd, fcntl.LOCK_UN)
        os.close(fd)