Skip to content

api

api

Public library API — the one stable import boundary for presentation layers.

The CLI and TUI import everything domain-, type-, or config-related from this package rather than reaching into terok.lib.* internals — that keeps the consumer surface narrow and lets internals refactor freely.

The package is split into focused sub-modules (catalogs of re-exports from the appropriate adapter):

  • vault — vault status, daemon lifecycle, VaultManager
  • gate — gate-server lifecycle and status
  • shield — shield wrappers and the shield CLI registry
  • agents — providers, ACP, image build, instructions
  • clearanceNotification and the clearance CLI registry
  • setup — first-run setup, env check, sickbay primitives, uninstall
  • task — task lifecycle, runners, metadata, display tables
  • project — project entities, lifecycle, panic, SSH

This module owns the cross-cutting bits: the Config snapshot, the runtime peek get_container_state, a small number of shared sandbox types (SandboxConfig and the CLI CommandDef / CommandTree), and the ANSI helpers bold/red/yellow/stage_line.

For backward compatibility every sub-module's exports are also bound on this package (so existing from terok.lib.api import foo lines keep working) — see __all__ at the bottom.

Pure utilities (util.emoji, util.yaml, util.ansi, util.net, ui_utils.terminal) and core.version stay importable directly — they are genuinely cross-cutting and have no domain coupling worth funnelling.

AGENTS_QUESTION = Question(key='agents', kind='multichoice', prompt='Select agents to install', help="Which AI coding agents to bake into this project's image, overriding the global default. Pick 'All agents' to inherit future additions, or enumerate specific agents to freeze the set.", required=True, choices_loader=_load_agent_choices, validate=_validate_agents) module-attribute

QUESTIONS = (Question(key='security_class', kind='choice', prompt='Select security mode', choices=(tuple(SECURITY_CLASSES)), required=True), Question(key='base', kind='choice', prompt='Select base image', choices=(tuple(BASES)), required=True), Question(key='project_id', kind='text', prompt='Project ID', required=True, transform=_slugify_project_id, validate=_validate_project_id, placeholder='lowercase; letters, digits, hyphens, underscores'), Question(key='upstream_url', kind='text', prompt='Upstream git URL', help='Leave empty for a local-only project (no remote).', placeholder='git@github.com:org/repo.git or https://…', default_visible=True), Question(key='default_branch', kind='text', prompt='Default branch', help="Leave empty to use the remote's default (or ``main`` when no remote).", placeholder='main', default_visible=True), Question(key='user_snippet', kind='editor', prompt='Custom image snippet', help='Optional Dockerfile fragment appended to the project image. Use for extra packages, env vars, or setup commands.', default_visible=True)) module-attribute

CONTAINER_MODES = ('cli', 'web', 'run', 'toad') module-attribute

All valid container mode suffixes used in container naming.

GPU_DISPLAY = {True: ProjectBadge(emoji='🎮', label='GPU'), False: ProjectBadge(emoji='💿', label='CPU')} module-attribute

SECURITY_CLASS_DISPLAY = {'gatekeeping': ProjectBadge(emoji='🚪', label='gate'), 'online': ProjectBadge(emoji='🌐', label='online')} module-attribute

STATUS_DISPLAY = {'running': StatusInfo(label='running', emoji='🟢', color='green'), 'init': StatusInfo(label='init', emoji='🟡', color='yellow'), 'starting': StatusInfo(label='starting', emoji='⏳', color='yellow'), 'stopped': StatusInfo(label='stopped', emoji='🔴', color='red'), 'completed': StatusInfo(label='completed', emoji='✅', color='green'), 'failed': StatusInfo(label='failed', emoji='❌', color='red'), 'created': StatusInfo(label='created', emoji='🆕', color='yellow'), 'not found': StatusInfo(label='not found', emoji='❓', color='yellow'), 'deleting': StatusInfo(label='deleting', emoji='🧹', color='yellow')} module-attribute

__all__ = ['Config', 'get_config', 'make_sandbox_config', 'set_experimental', 'get_container_state', 'SandboxConfig', 'CommandDef', 'CommandTree', 'bold', 'red', 'yellow', 'stage_line', 'ACPEndpointStatus', 'AGENT_PROVIDERS', 'AUTH_PROVIDERS', 'AgentRunner', 'BuildError', 'DEFAULT_BASE_IMAGE', 'AgentRoster', 'Authenticator', 'EXECUTOR_COMMANDS', 'ExecutorConfigView', 'ImageBuilder', 'KrunHost', 'PROVIDER_NAMES', 'SharedMountStorageInfo', 'TaskStorageInfo', 'acp_socket_is_live', 'authenticate', 'build_images', 'bundled_default_instructions', 'ensure_sandbox_ready', 'generate_dockerfiles', 'get_provider', 'installed_agents', 'installed_agents_for_project', 'parse_md_agent', 'resolve_agent_config', 'resolve_instructions', 'CLEARANCE_COMMANDS', 'CLEARANCE_HUB_UNIT_NAME', 'CLEARANCE_NOTIFIER_UNIT_NAME', 'CallbackNotifier', 'EventSubscriber', 'Notification', 'HubService', 'NotifierService', 'check_clearance_units_outdated', 'clearance_outdated_summary', 'read_installed_notifier_unit_version', 'read_installed_unit_version', 'GateAuthNotConfigured', 'GateServerManager', 'GateServerStatus', 'GateStalenessInfo', 'make_git_gate', 'AGENTS_QUESTION', 'BrokenProject', 'DeleteProjectResult', 'Project', 'ProjectConfig', 'QUESTIONS', 'Question', 'cleanup_images', 'delete_project', 'derive_project', 'discover_projects', 'execute_panic', 'find_orphaned_images', 'find_projects_sharing_gate', 'format_panic_report', 'get_project', 'list_images', 'list_projects', 'load_project', 'panic_stop_containers', 'project_image_exists', 'remove_images', 'render_project_yaml', 'require_project_exists', 'set_project_image_agents', 'summarize_ssh_init', 'validate_answer', 'write_project_yaml', 'EnvironmentCheck', 'SERVICES_TCP_OPTOUT_YAML', 'SelinuxStatus', 'SetupVerdict', 'check_environment', 'check_selinux_status', 'is_ssh_url', 'namespace_state_dir', 'needs_setup', 'public_line_of', 'resolve_container_state_dir', 'sandbox_uninstall', 'selinux_install_command', 'selinux_install_script', 'systemd_creds_has_tpm2', 'yaml_update_section', 'ArgDef', 'ExecError', 'RecoveryStatus', 'SHIELD_COMMANDS', 'ShieldCommandDef', 'ShieldHooks', 'ShieldManager', 'installed_versions', 'read_stamp', 'stamp_path', 'CONTAINER_MODES', 'GPU_DISPLAY', 'HeadlessRunRequest', 'LogViewOptions', 'ModeInfo', 'SECURITY_CLASS_DISPLAY', 'STATUS_DISPLAY', 'StatusInfo', 'Task', 'TaskDeleteResult', 'TaskMeta', 'agent_config_dir', 'container_name', 'effective_status', 'generate_task_name', 'get_all_task_states', 'get_login_command', 'get_task_meta', 'get_tasks', 'get_workspace_git_diff', 'has_gpu', 'mark_task_deleting', 'mode_info', 'sanitize_task_name', 'task_archive_list', 'task_archive_logs', 'task_delete', 'task_followup_headless', 'task_list', 'task_login', 'task_logs', 'task_new', 'task_rename', 'task_restart', 'task_run_cli', 'task_run_headless', 'task_run_toad', 'task_status', 'task_stop', 'validate_task_name', 'wait_for_container_exit', 'NoPassphraseError', 'VaultManager', 'VaultStatus', 'WrongPassphraseError', 'handle_vault_seal', 'handle_vault_to_keyring', 'vault_db'] module-attribute

BrokenProject(id, config_path, error) dataclass

A project directory whose project.yml failed to load.

Carries just enough context for the TUI to render a row and show the validation error in the details pane, without forcing callers to re-run the failing load_project to rediscover the message.

id instance-attribute

config_path instance-attribute

error instance-attribute

DeleteProjectResult

Bases: TypedDict

Result of a project deletion.

deleted instance-attribute

skipped instance-attribute

archive instance-attribute

Project(config)

Rich project object — DDD Aggregate Root.

The primary domain object that callers interact with. Wraps a ProjectConfig value object and exposes all project-scoped operations through a natural OOP interface::

project = get_project("myproj")
task = project.create_task(name="fix-bug")
task.run_cli()
task.stop()
project.gate.sync()

Identity is based on project.id — two Project instances with the same ID compare equal and hash identically, so they work correctly in sets and dicts.

Subsystem access (gate, ssh, agents) uses lazy initialization: the service objects are created on first property access rather than at construction time. This avoids unnecessary I/O when only a subset of functionality is needed. Uses __slots__ for memory efficiency; cached_property is not available because it requires __dict__.

Obtain via get_project or list_projects.

Initialize with a resolved project configuration.

Source code in src/terok/lib/domain/project.py
def __init__(self, config: ProjectConfig) -> None:
    """Initialize with a resolved project configuration."""
    self._config = config
    self._gate: GitGate | None = None
    self._ssh: SSHManager | None = None
    self._agents: AgentManager | None = None

__slots__ = ('_config', '_gate', '_ssh', '_agents') class-attribute instance-attribute

id property

Return the project ID.

config property

Return the underlying configuration value object.

security_class property

Return the project's security class ('online' or 'gatekeeping').

tasks property

All tasks in this project — convenience for unfiltered iteration.

gate property

Return the project-scoped git gate manager (lazy-initialized).

ssh property

Return the project-scoped SSH manager (lazy-initialized).

needs_ssh_key_registration property

Return True when the upstream is SSH-scheme so a deploy key must be added.

Shared predicate used by the CLI pause helper and the TUI wizard's mid-flow "continue" gate — keeps the rule (SSH URLs need registration, HTTPS and no-upstream projects don't) in one place.

agents property

Return the project-scoped agent configuration manager (lazy-initialized).

__eq__(other)

Two projects are equal iff they share the same ID.

Source code in src/terok/lib/domain/project.py
def __eq__(self, other: object) -> bool:
    """Two projects are equal iff they share the same ID."""
    return isinstance(other, Project) and self.id == other.id

__hash__()

Hash by project ID for use in sets and dicts.

Source code in src/terok/lib/domain/project.py
def __hash__(self) -> int:
    """Hash by project ID for use in sets and dicts."""
    return hash(self.id)

create_task(*, name=None)

Create a new task and return a rich Task entity.

Source code in src/terok/lib/domain/project.py
def create_task(self, *, name: str | None = None) -> Task:
    """Create a new task and return a rich Task entity."""
    task_id = task_new(self._config.id, name=name)
    meta = get_task_meta(self._config.id, task_id)
    return Task(self._config, meta)

get_task(task_id)

Return a rich Task entity for an existing task.

Source code in src/terok/lib/domain/project.py
def get_task(self, task_id: str) -> Task:
    """Return a rich Task entity for an existing task."""
    meta = get_task_meta(self._config.id, task_id)
    return Task(self._config, meta)

list_tasks(*, status=None, mode=None)

Return all tasks, optionally filtered by status or mode.

Source code in src/terok/lib/domain/project.py
def list_tasks(self, *, status: str | None = None, mode: str | None = None) -> list[Task]:
    """Return all tasks, optionally filtered by status or mode."""
    metas = get_tasks(self._config.id)
    if mode:
        metas = [m for m in metas if m.mode == mode]
    # Hydrate live container state so status filtering is accurate
    live_states = get_all_task_states(self._config.id, metas)
    for m in metas:
        m.container_state = live_states.get(m.task_id)
    if status:
        metas = [m for m in metas if m.status == status]
    return [Task(self._config, m) for m in metas]

acp_endpoints()

Return one ACPEndpoint per running task.

Cheap discovery surface — walks running tasks, classifies each endpoint as active (daemon up, socket bound), ready (task running with at least one authed agent, daemon would spawn on first connect), or unsupported (no agents authed for this task's image; connect would fail).

No probing, no socket traffic — one credential-DB read for the whole listing, one Sandbox instance shared across tasks, and image-label lookups memoised by image-id (most tasks share an image). terok acp list and the TUI panel share this entry point.

Source code in src/terok/lib/domain/project.py
def acp_endpoints(self) -> list[ACPEndpoint]:
    """Return one [`ACPEndpoint`][terok.lib.domain.project.ACPEndpoint] per running task.

    Cheap discovery surface — walks running tasks, classifies each
    endpoint as ``active`` (daemon up, socket bound), ``ready``
    (task running with at least one authed agent, daemon would
    spawn on first connect), or ``unsupported`` (no agents authed
    for this task's image; connect would fail).

    No probing, no socket traffic — one credential-DB read for
    the whole listing, one ``Sandbox`` instance shared across
    tasks, and image-label lookups memoised by image-id (most
    tasks share an image).  ``terok acp list`` and the TUI panel
    share this entry point.
    """
    running = self.list_tasks(status="running")
    if not running:
        return []
    from terok.lib.integrations.sandbox import Sandbox

    # One DB read + one Sandbox + per-image label cache for the
    # whole listing.  Auth is global today — same set for every task.
    authed = set(list_authenticated_agents())
    sandbox = Sandbox(config=make_sandbox_config())
    label_cache: dict[str, set[str]] = {}
    out: list[ACPEndpoint] = []
    for task in running:
        sock = acp_socket_path(self._config.id, task.id)
        sock_exists = sock.exists()
        bound = _read_bound_agent(self._config.id, task.id) if sock_exists else None
        if sock_exists:
            status = ACPEndpointStatus.ACTIVE
        elif _task_has_any_authed_agent(
            self._config.id, task, authed, sandbox=sandbox, label_cache=label_cache
        ):
            status = ACPEndpointStatus.READY
        else:
            status = ACPEndpointStatus.UNSUPPORTED
        out.append(
            ACPEndpoint(
                project_id=self._config.id,
                task_id=task.id,
                socket_path=sock,
                status=status,
                bound_agent=bound,
            )
        )
    return out

run_headless(request)

Create and run a headless task atomically. Returns the Task.

Source code in src/terok/lib/domain/project.py
def run_headless(self, request: HeadlessRunRequest) -> Task:
    """Create and run a headless task atomically.  Returns the Task."""
    task_id = task_run_headless(request)
    meta = get_task_meta(self._config.id, task_id)
    return Task(self._config, meta)

followup_headless(task_id, prompt, follow=True)

Send a follow-up prompt to a completed headless task.

Source code in src/terok/lib/domain/project.py
def followup_headless(self, task_id: str, prompt: str, follow: bool = True) -> None:
    """Send a follow-up prompt to a completed headless task."""
    from ..orchestration.task_runners import task_followup_headless

    task_followup_headless(self._config.id, task_id, prompt, follow=follow)

delete()

Delete the project and all associated data.

Source code in src/terok/lib/domain/project.py
def delete(self) -> DeleteProjectResult:
    """Delete the project and all associated data."""
    return delete_project(self._config.id)

generate_dockerfiles()

Render and write Dockerfiles for this project.

Source code in src/terok/lib/domain/project.py
def generate_dockerfiles(self) -> None:
    """Render and write Dockerfiles for this project."""
    generate_dockerfiles(self._config.id)

build_images(*, include_dev=False, refresh_agents=False, full=False)

Build container images for this project.

Source code in src/terok/lib/domain/project.py
def build_images(
    self, *, include_dev: bool = False, refresh_agents: bool = False, full: bool = False
) -> None:
    """Build container images for this project."""
    build_images(
        self._config.id,
        include_dev=include_dev,
        refresh_agents=refresh_agents,
        full_rebuild=full,
    )

state(*, gate_commit_provider=None)

Return the project's infrastructure state snapshot.

gate_commit_provider is an optional callable that, given a project id, returns the last gate commit dict (or None). Used by the TUI to inject the live gate manager's last_commit lookup without reaching for it from inside the helper.

Source code in src/terok/lib/domain/project.py
def state(self, *, gate_commit_provider: Callable[[str], dict | None] | None = None) -> dict:
    """Return the project's infrastructure state snapshot.

    *gate_commit_provider* is an optional callable that, given a
    project id, returns the last gate commit dict (or ``None``).
    Used by the TUI to inject the live gate manager's ``last_commit``
    lookup without reaching for it from inside the helper.
    """
    from .project_state import get_project_state

    return get_project_state(
        self._config.id, gate_commit_provider=gate_commit_provider, project=self._config
    )

storage_detail()

Return a detailed view of this project's on-disk footprint.

Source code in src/terok/lib/domain/project.py
def storage_detail(self) -> ProjectDetail:
    """Return a detailed view of this project's on-disk footprint."""
    from .storage import get_project_storage_detail

    return get_project_storage_detail(self._config.id)

provision_ssh_key(*, key_type='ed25519', comment=None, force=False)

Mint a vault-backed keypair and bind it to this project's scope.

Opens a fresh SSHManager via the context-manager form so the credential DB closes after init, then assigns the new key_id to the project scope. Rendering the result is the caller's job — see summarize_ssh_init.

Source code in src/terok/lib/domain/project.py
def provision_ssh_key(
    self,
    *,
    key_type: str = "ed25519",
    comment: str | None = None,
    force: bool = False,
) -> SSHInitResult:
    """Mint a vault-backed keypair and bind it to this project's scope.

    Opens a fresh
    [`SSHManager`][terok_sandbox.SSHManager] via the context-manager
    form so the credential DB closes after init, then assigns the
    new ``key_id`` to the project scope.  Rendering the result is
    the caller's job — see
    [`summarize_ssh_init`][terok.lib.domain.ssh.summarize_ssh_init].
    """
    with make_ssh_manager(self._config) as ssh:
        result = ssh.init(key_type=key_type, comment=comment, force=force)
    self.register_ssh_key(result["key_id"])
    return result

register_ssh_key(key_id)

Bind an already-minted key_id to this project (idempotent).

Source code in src/terok/lib/domain/project.py
def register_ssh_key(self, key_id: int) -> None:
    """Bind an already-minted *key_id* to this project (idempotent)."""
    with vault_db() as db:
        db.assign_ssh_key(self._config.id, key_id)

pause_for_ssh_key_registration_if_needed()

Pause so the user can register the deploy key — only for SSH upstreams.

Source code in src/terok/lib/domain/project.py
def pause_for_ssh_key_registration_if_needed(self) -> None:
    """Pause so the user can register the deploy key — only for SSH upstreams."""
    if self.needs_ssh_key_registration:
        print("\n" + "=" * 60)
        print("ACTION REQUIRED: Add the public key shown above as a")
        print("deploy key (or to your SSH keys) on the git remote.")
        print("=" * 60)
        input("Press Enter once the key is registered... ")

list_presets()

Return available presets for this project.

Source code in src/terok/lib/domain/project.py
def list_presets(self) -> list[PresetInfo]:
    """Return available presets for this project."""
    return list_presets(self._config.id)

__repr__()

Return a developer-friendly string representation.

Source code in src/terok/lib/domain/project.py
def __repr__(self) -> str:
    """Return a developer-friendly string representation."""
    return f"Project(id={self.id!r}, security={self.security_class!r})"

ProjectConfig

Bases: BaseModel

Resolved project configuration loaded from project.yml.

Pure value object — holds configuration fields with no behavior beyond computed paths. The rich domain object Project wraps this and provides behavior.

model_config = ConfigDict(frozen=True) class-attribute instance-attribute

id instance-attribute

security_class instance-attribute

isolation = 'shared' class-attribute instance-attribute

upstream_url instance-attribute

default_branch instance-attribute

root instance-attribute

tasks_root instance-attribute

gate_path instance-attribute

gate_enabled = True class-attribute instance-attribute

staging_root instance-attribute

ssh_use_personal = False class-attribute instance-attribute

Opt in to the user's ~/.ssh keys for host-side gate-sync (default off).

expose_external_remote = False class-attribute instance-attribute

human_name = None class-attribute instance-attribute

human_email = None class-attribute instance-attribute

git_authorship = 'agent-human' class-attribute instance-attribute

upstream_polling_enabled = True class-attribute instance-attribute

upstream_polling_interval_minutes = 5 class-attribute instance-attribute

auto_sync_enabled = False class-attribute instance-attribute

auto_sync_branches = Field(default_factory=list) class-attribute instance-attribute

default_agent = None class-attribute instance-attribute

default_login = None class-attribute instance-attribute

agent_config = Field(default_factory=dict) class-attribute instance-attribute

shutdown_timeout = 10 class-attribute instance-attribute

memory = None class-attribute instance-attribute

Podman --memory value from run.memory in project.yml.

cpus = None class-attribute instance-attribute

Podman --cpus value from run.cpus in project.yml.

nested_containers = False class-attribute instance-attribute

Project runs podman/docker inside its container (see run.nested_containers).

runtime = None class-attribute instance-attribute

OCI runtime selector from run.runtime.

None (default) means "use the global default", which itself falls through to "crun" — the OCI runtime podman drives by default on every supported distro. "krun" selects KVM-microVM isolation; gated on the global experimental: true flag at runtime selection time so a typo never silently boots the experimental backend.

Sizing reuses the standard memory / cpus knobs — podman writes them into the OCI spec and the runtime reads them there; no krun-specific knob.

timezone = None class-attribute instance-attribute

IANA timezone for task containers (from run.timezone).

None lets terok-executor fall back to the host's timezone; pass an explicit string ("UTC", "Europe/Prague") to override — including to pin containers to UTC for reproducible runs.

task_name_categories = None class-attribute instance-attribute

shield_drop_on_task_run = True class-attribute instance-attribute

shield_on_task_restart = 'retain' class-attribute instance-attribute

hook_pre_start = None class-attribute instance-attribute

hook_post_start = None class-attribute instance-attribute

hook_post_ready = None class-attribute instance-attribute

hook_post_stop = None class-attribute instance-attribute

base_image = 'ubuntu:24.04' class-attribute instance-attribute

family = None class-attribute instance-attribute

Package family override for L0/L1 builds.

None lets terok-executor auto-detect from base_image; set explicitly when the auto-detect allowlist doesn't recognise the image (rocky, alma, suse, …).

agents = 'all' class-attribute instance-attribute

Comma-separated roster entries to install in L1 (or "all").

snippet_inline = None class-attribute instance-attribute

snippet_file = None class-attribute instance-attribute

shared_dir = None class-attribute instance-attribute

is_sealed property

Whether this project uses sealed isolation (zero bind mounts).

presets_dir property

Directory for preset config files for this project.

Question(key, kind, prompt, help='', choices=(), choices_loader=None, required=False, transform=None, validate=None, placeholder='', default_visible=False) dataclass

One wizard prompt — what to ask, how to validate, what shape the answer takes.

The presenter decides the visual treatment (numbered menu vs radio buttons, input() vs Textual Input, $EDITOR vs TextArea); the declaration here drives everything else.

key instance-attribute

Name of this field in the collected-values dict.

kind instance-attribute

Shape of the input — drives which widget / prompt style a presenter uses.

prompt instance-attribute

Short one-line question, used as both CLI prompt and TUI label.

help = '' class-attribute instance-attribute

Longer explanation, rendered next to the input in the TUI; unused in CLI.

choices = () class-attribute instance-attribute

Static (value, label) pairs for kind in {"choice", "multichoice"}.

choices_loader = None class-attribute instance-attribute

Runtime resolver for choices that aren't known at import time.

Set this when the option set lives in a sibling wheel (e.g. the agent roster) and can drift between releases. When set, takes precedence over choices.

required = False class-attribute instance-attribute

Reject empty answers with "<prompt> is required."

transform = None class-attribute instance-attribute

Optional normalisation applied before validation (e.g. str.lower).

validate = None class-attribute instance-attribute

Optional validator returning an error string or None when accepted.

placeholder = '' class-attribute instance-attribute

Hint string, rendered inside the Textual Input; unused in CLI.

default_visible = False class-attribute instance-attribute

When True, CLI prompt shows "(optional)" to telegraph "Enter is fine".

resolve_choices()

Return the effective option list — runtime loader wins over static.

Called by both presenters whenever they need to render or validate a choice / multichoice question. The loader is expected to be cheap; the executor's roster lookup is itself lru_cache'd so repeated calls are free.

Source code in src/terok/lib/domain/wizards/new_project.py
def resolve_choices(self) -> tuple[tuple[str, str], ...]:
    """Return the effective option list — runtime loader wins over static.

    Called by both presenters whenever they need to render or validate
    a ``choice`` / ``multichoice`` question.  The loader is expected
    to be cheap; the executor's roster lookup is itself ``lru_cache``'d
    so repeated calls are free.
    """
    return self.choices_loader() if self.choices_loader is not None else self.choices

HeadlessRunRequest(project_id, prompt, config_path=None, model=None, max_turns=None, timeout=None, follow=True, agents=None, preset=None, name=None, provider=None, instructions=None, unrestricted=None) dataclass

Groups all parameters for a headless (autopilot) agent run.

project_id instance-attribute

prompt instance-attribute

config_path = None class-attribute instance-attribute

model = None class-attribute instance-attribute

max_turns = None class-attribute instance-attribute

timeout = None class-attribute instance-attribute

follow = True class-attribute instance-attribute

agents = None class-attribute instance-attribute

preset = None class-attribute instance-attribute

name = None class-attribute instance-attribute

provider = None class-attribute instance-attribute

instructions = None class-attribute instance-attribute

unrestricted = None class-attribute instance-attribute

LogViewOptions(follow=False, raw=False, tail=None, streaming=True) dataclass

Display options for task log viewing.

follow = False class-attribute instance-attribute

Follow live output (-f).

raw = False class-attribute instance-attribute

Bypass formatting, show raw podman output.

tail = None class-attribute instance-attribute

Show only the last N lines.

streaming = True class-attribute instance-attribute

Enable partial streaming (typewriter effect) for supported formatters.

ModeInfo(emoji, label) dataclass

Display attributes for a task mode.

emoji instance-attribute

label instance-attribute

StatusInfo(label, emoji, color) dataclass

Display attributes for a task effective status.

label instance-attribute

emoji instance-attribute

color instance-attribute

Task(config, meta)

Rich task entity — DDD Entity with identity and lifecycle behavior.

Each task has a unique identity within its project, defined by the tuple (project_id, task_id). Two Task instances are equal iff they share this identity, regardless of metadata differences.

Obtained via get_task, create_task, or list_tasks. Delegates lifecycle operations to the underlying task service functions in tasks and task_runners.

Initialize with project config and task metadata snapshot.

Source code in src/terok/lib/domain/task.py
def __init__(self, config: ProjectConfig, meta: TaskMeta) -> None:
    """Initialize with project config and task metadata snapshot."""
    self._config = config
    self._meta = meta

__slots__ = ('_config', '_meta') class-attribute instance-attribute

id property

Return the task's ID.

name property

Return the task's human-readable name.

mode property

Return the task's mode ('cli', 'run', 'toad') or None.

status property

Return the effective status computed from container state + metadata.

container_state property

Live container state — running / exited / … or None.

Hydrated when the task is loaded; reflects what podman inspect reported at construction time. Refresh by reloading the parent Project view.

meta property

Return the underlying metadata value object.

__eq__(other)

Two tasks are equal iff they belong to the same project and share the same ID.

Source code in src/terok/lib/domain/task.py
def __eq__(self, other: object) -> bool:
    """Two tasks are equal iff they belong to the same project and share the same ID."""
    return (
        isinstance(other, Task) and self._config.id == other._config.id and self.id == other.id
    )

__hash__()

Hash by (project_id, task_id) for use in sets and dicts.

Source code in src/terok/lib/domain/task.py
def __hash__(self) -> int:
    """Hash by (project_id, task_id) for use in sets and dicts."""
    return hash((self._config.id, self.id))

run_cli(*, agents=None, preset=None)

Launch a CLI-mode task container.

Source code in src/terok/lib/domain/task.py
def run_cli(self, *, agents: list[str] | None = None, preset: str | None = None) -> None:
    """Launch a CLI-mode task container."""
    task_run_cli(self._config.id, self.id, agents=agents, preset=preset)

stop(*, timeout=None)

Gracefully stop the task container.

Source code in src/terok/lib/domain/task.py
def stop(self, *, timeout: int | None = None) -> None:
    """Gracefully stop the task container."""
    task_stop(self._config.id, self.id, timeout=timeout)

restart()

Restart the task container.

Source code in src/terok/lib/domain/task.py
def restart(self) -> None:
    """Restart the task container."""
    task_restart(self._config.id, self.id)

delete()

Delete the task (workspace, metadata, containers).

Source code in src/terok/lib/domain/task.py
def delete(self) -> None:
    """Delete the task (workspace, metadata, containers)."""
    task_delete(self._config.id, self.id)

rename(new_name)

Rename the task.

Source code in src/terok/lib/domain/task.py
def rename(self, new_name: str) -> None:
    """Rename the task."""
    task_rename(self._config.id, self.id, new_name)

followup(prompt, follow=True)

Send a follow-up prompt to a completed headless task.

Source code in src/terok/lib/domain/task.py
def followup(self, prompt: str, follow: bool = True) -> None:
    """Send a follow-up prompt to a completed headless task."""
    task_followup_headless(self._config.id, self.id, prompt, follow=follow)

logs(options=None)

View task logs.

Source code in src/terok/lib/domain/task.py
def logs(self, options: LogViewOptions | None = None) -> None:
    """View task logs."""
    task_logs(self._config.id, self.id, options or LogViewOptions())

login()

Open an interactive shell in the task container.

Source code in src/terok/lib/domain/task.py
def login(self) -> None:
    """Open an interactive shell in the task container."""
    task_login(self._config.id, self.id)

get_login_command()

Return the podman exec command for login.

Source code in src/terok/lib/domain/task.py
def get_login_command(self) -> list[str]:
    """Return the podman exec command for login."""
    return get_login_command(self._config.id, self.id)

get_workspace_diff(against='HEAD')

Get git diff from the task's workspace.

Source code in src/terok/lib/domain/task.py
def get_workspace_diff(self, against: str = "HEAD") -> str | None:
    """Get git diff from the task's workspace."""
    return get_workspace_git_diff(self._config.id, self.id, against=against)

show_status()

Print live task status with container state diagnostics.

CLI-flavoured: writes to stdout. Domain callers that want a structured snapshot should read :attr:meta directly.

Source code in src/terok/lib/domain/task.py
def show_status(self) -> None:
    """Print live task status with container state diagnostics.

    CLI-flavoured: writes to stdout.  Domain callers that want a
    structured snapshot should read :attr:`meta` directly.
    """
    from ..orchestration.tasks import task_status

    task_status(self._config.id, self.id)

wait_for_exit(*, timeout=7200)

Wait for this task's container to exit; record exit code in metadata.

Returns (exit_code, error_message). See wait_for_container_exit for the underlying timeout semantics.

Source code in src/terok/lib/domain/task.py
def wait_for_exit(self, *, timeout: int = 7200) -> tuple[int | None, str | None]:
    """Wait for this task's container to exit; record exit code in metadata.

    Returns ``(exit_code, error_message)``.  See
    [`wait_for_container_exit`][terok.lib.orchestration.tasks.lifecycle.wait_for_container_exit]
    for the underlying timeout semantics.
    """
    from ..core.task_state import container_name
    from ..orchestration.tasks import wait_for_container_exit

    if not self._meta.mode:
        raise RuntimeError(f"Task {self.id} has no mode — never started")
    cname = container_name(self._config.id, self._meta.mode, self.id)
    return wait_for_container_exit(cname, self._config.id, self.id, timeout=timeout)

capture_logs()

Capture this task's container logs to disk; return the log path.

Returns None if the task has no mode (never started) or the executor reports a capture failure.

Source code in src/terok/lib/domain/task.py
def capture_logs(self) -> Path | None:
    """Capture this task's container logs to disk; return the log path.

    Returns ``None`` if the task has no mode (never started) or the
    executor reports a capture failure.
    """
    from ..orchestration.tasks import capture_task_logs

    if not self._meta.mode:
        return None
    return capture_task_logs(self._config, self.id, self._meta.mode)

image_is_old()

Return whether the task's container image is outdated.

Compares the running container's image build-context hash against the project's current expected hash. Returns None when the comparison is indeterminate (image deleted, container not running, or mode is not cli) so callers can distinguish "definitely current" from "can't tell".

Source code in src/terok/lib/domain/task.py
def image_is_old(self) -> bool | None:
    """Return whether the task's container image is outdated.

    Compares the running container's image build-context hash against
    the project's current expected hash.  Returns ``None`` when the
    comparison is indeterminate (image deleted, container not
    running, or mode is not ``cli``) so callers can distinguish
    "definitely current" from "can't tell".
    """
    from .project_state import is_task_image_old

    return is_task_image_old(self._config.id, self._meta)

doctor(*, fix=False, reporter=None, label_prefix='')

Run every layered in-container health check against this task.

Convenience wrapper around ContainerDoctor.run — see that method for the full streaming / fix / label-prefix semantics.

Source code in src/terok/lib/domain/task.py
def doctor(
    self,
    *,
    fix: bool = False,
    reporter: object | None = None,
    label_prefix: str = "",
) -> list[tuple[str, str, str]]:
    """Run every layered in-container health check against this task.

    Convenience wrapper around
    [`ContainerDoctor.run`][terok.lib.orchestration.container_doctor.ContainerDoctor.run]
    — see that method for the full streaming / fix / label-prefix
    semantics.
    """
    from ..orchestration.container_doctor import ContainerDoctor

    return ContainerDoctor(self._config.id, self.id).run(
        fix=fix,
        reporter=reporter,  # type: ignore[arg-type]
        label_prefix=label_prefix,
    )

__repr__()

Return a developer-friendly string representation.

Source code in src/terok/lib/domain/task.py
def __repr__(self) -> str:
    """Return a developer-friendly string representation."""
    return f"Task(id={self.id!r}, name={self.name!r}, mode={self.mode!r})"

TaskDeleteResult(task_id, warnings) dataclass

Outcome of a task deletion — always completes, collects warnings.

The task is considered deleted regardless (metadata and workspace removed), but individual cleanup steps may fail. warnings carries human-readable descriptions of any steps that did not complete cleanly.

task_id instance-attribute

warnings instance-attribute

TaskMeta(container_state=None, exit_code=None, deleting=False, initialized=False, starting=False, *, task_id, project_id='', mode, workspace, web_port, web_token=None, backend=None, preset=None, name='', provider=None, unrestricted=None, work_status=None, work_message=None, shield_state=None, created_at=None) dataclass

Bases: TaskState

Lightweight metadata snapshot for a single task.

Inherits lifecycle fields (container_state, exit_code, deleting, initialized) from TaskState.

task_id instance-attribute

project_id = '' class-attribute instance-attribute

Project the task belongs to.

Carried in the meta JSON so consumers that don't already know the project (e.g. terok-shield's host-side dossier resolver, which only has the meta-path pointer) can render full project/task identity without re-deriving it from the on-disk path. Empty string for pre-this-release meta files; the lifecycle backfills it on the next mutation.

mode instance-attribute

workspace instance-attribute

web_port instance-attribute

web_token = None class-attribute instance-attribute

backend = None class-attribute instance-attribute

preset = None class-attribute instance-attribute

name = '' class-attribute instance-attribute

provider = None class-attribute instance-attribute

unrestricted = None class-attribute instance-attribute

work_status = None class-attribute instance-attribute

work_message = None class-attribute instance-attribute

shield_state = None class-attribute instance-attribute

created_at = None class-attribute instance-attribute

status property

Compute effective status from live container state + metadata.

load(project_id, task_id) classmethod

Load a TaskMeta from disk, with live container state hydrated.

Raises SystemExit if the task metadata file is not found. Equivalent to the free-fn get_task_meta, kept as the canonical entry point so callers reach the class through its own factory.

Source code in src/terok/lib/orchestration/tasks/query.py
@classmethod
def load(cls, project_id: str, task_id: str) -> TaskMeta:
    """Load a TaskMeta from disk, with live container state hydrated.

    Raises ``SystemExit`` if the task metadata file is not found.
    Equivalent to the free-fn
    [`get_task_meta`][terok.lib.orchestration.tasks.query.get_task_meta],
    kept as the canonical entry point so callers reach the class
    through its own factory.
    """
    return get_task_meta(project_id, task_id)

Config(config_root, core_state_dir, runtime_dir, archive_dir, vault_dir, user_projects_dir, global_config_path, public_host, shield_bypass_firewall_no_protection, tui_default_tmux, tui_external_editor, shield_security_hint) dataclass

Snapshot of the global config values consumers read at startup.

Bundles the paths, feature flags, and presentation hints that the TUI (and CLI) previously pulled from a dozen scattered getters in terok.lib.core.config and terok.lib.core.paths. Capture once with get_config; pass the result around.

Side-effecting helpers — set_experimental, make_sandbox_config — stay as functions on the api module; they don't belong on a frozen value object.

config_root instance-attribute

core_state_dir instance-attribute

runtime_dir instance-attribute

archive_dir instance-attribute

vault_dir instance-attribute

user_projects_dir instance-attribute

global_config_path instance-attribute

public_host instance-attribute

shield_bypass_firewall_no_protection instance-attribute

tui_default_tmux instance-attribute

tui_external_editor instance-attribute

shield_security_hint instance-attribute

authenticate(provider, project_id=None)

Run the auth flow for provider, host-wide by default.

When project_id is given, the project's L2 CLI image is reused — the escape hatch for users who want project-scoped credentials or happen to have a project image handy. When omitted, terok resolves an L1 image (shared across projects that build on the same base) and offers to build one if none exists — the "fresh install, no project yet" path.

Image resolution is deferred: the executor only invokes the resolver after the user has chosen the OAuth path from the OAuth-vs-API-key prompt, so picking API key never triggers an L1 build. Vault storage is provider-scoped in both modes, so switching from a per-project auth to a host-wide one later (or vice versa) does not duplicate or overwrite credentials.

Source code in src/terok/lib/domain/auth.py
def authenticate(provider: str, project_id: str | None = None) -> None:
    """Run the auth flow for *provider*, host-wide by default.

    When *project_id* is given, the project's L2 CLI image is reused — the
    escape hatch for users who want project-scoped credentials or happen to
    have a project image handy.  When omitted, terok resolves an L1 image
    (shared across projects that build on the same base) and offers to
    build one if none exists — the "fresh install, no project yet" path.

    Image resolution is **deferred**: the executor only invokes the
    resolver after the user has chosen the OAuth path from the
    OAuth-vs-API-key prompt, so picking API key never triggers an L1
    build.  Vault storage is provider-scoped in both modes, so switching
    from a per-project auth to a host-wide one later (or vice versa)
    does not duplicate or overwrite credentials.
    """
    from ..core.config import (
        is_claude_oauth_exposed,
        is_codex_oauth_exposed,
        is_oauth_enabled_for,
        sandbox_live_mounts_dir,
    )

    expose = (provider == "claude" and is_claude_oauth_exposed()) or (
        provider == "codex" and is_codex_oauth_exposed()
    )

    image: str | Callable[[], str]
    if project_id is None:
        image = lambda: _resolve_host_auth_image(provider)  # noqa: E731 — lazy by design
    else:
        image = project_cli_image(project_id)

    # The roster declares which auth modes a provider supports; terok's
    # config can disable the OAuth path (experimental flag + per-provider
    # ``allow_oauth``).  The listing screen already filters on this; pass
    # the same gate into the executor's auth flow so the per-provider
    # prompt agrees.
    Authenticator(provider).run(
        project_id,
        mounts_dir=sandbox_live_mounts_dir(),
        image=image,
        expose_token=expose,
        oauth_enabled=is_oauth_enabled_for(provider),
    )

build_images(project_id, include_dev=False, refresh_agents=False, full_rebuild=False, agents=None)

Shim around ProjectImage.build.

Source code in src/terok/lib/orchestration/image.py
def build_images(
    project_id: str,
    include_dev: bool = False,
    refresh_agents: bool = False,
    full_rebuild: bool = False,
    agents: str | None = None,
) -> None:
    """Shim around [`ProjectImage.build`][terok.lib.orchestration.image.ProjectImage.build]."""
    ProjectImage.load(project_id).build(
        include_dev=include_dev,
        refresh_agents=refresh_agents,
        full_rebuild=full_rebuild,
        agents=agents,
    )

generate_dockerfiles(project_id, *, family=None)

Shim around ProjectImage.generate_dockerfiles.

Source code in src/terok/lib/orchestration/image.py
def generate_dockerfiles(project_id: str, *, family: str | None = None) -> None:
    """Shim around [`ProjectImage.generate_dockerfiles`][terok.lib.orchestration.image.ProjectImage.generate_dockerfiles]."""
    image = ProjectImage.load(project_id)
    if family is not None:
        object.__setattr__(image, "family", family)
    image.generate_dockerfiles()

installed_agents(image_tag) cached

Return the set of agent names baked into image_tag.

Reads the ai.terok.agents OCI label written by terok-executor's L1 build (a sorted comma-separated list). Result is cached per image tag, since the label is fixed for the life of an image.

When the image is not present locally, or the label is missing (e.g. a legacy image built before selectable agents), returns an empty set — callers treat empty as "unknown / unrestricted" so older images keep working.

Source code in src/terok/lib/core/images.py
@lru_cache(maxsize=64)
def installed_agents(image_tag: str) -> frozenset[str]:
    """Return the set of agent names baked into *image_tag*.

    Reads the ``ai.terok.agents`` OCI label written by terok-executor's L1
    build (a sorted comma-separated list).  Result is cached per image
    tag, since the label is fixed for the life of an image.

    When the image is not present locally, or the label is missing
    (e.g. a legacy image built before selectable agents), returns an
    empty set — callers treat empty as "unknown / unrestricted" so older
    images keep working.
    """
    # Image label reads are runtime-agnostic — podman's image store is the
    # same regardless of which OCI runtime ends up booting it.
    csv = (PodmanRuntime().image(image_tag).labels().get(AGENTS_LABEL) or "").strip()
    if not csv:
        return frozenset()
    return frozenset(name.strip() for name in csv.split(",") if name.strip())

installed_agents_for_project(project)

Return the agents installed in project's L1 image.

Convenience over installed_agents for the very common installed_agents(agent_cli_image(project.base_image)) pattern.

Source code in src/terok/lib/core/images.py
def installed_agents_for_project(project: ProjectConfig) -> frozenset[str]:
    """Return the agents installed in *project*'s L1 image.

    Convenience over [`installed_agents`][terok.lib.core.images.installed_agents] for the very common
    ``installed_agents(agent_cli_image(project.base_image))`` pattern.
    """
    return installed_agents(agent_cli_image(project.base_image))

resolve_agent_config(project_id, *, agent_config=None, project_root=None, preset=None, cli_overrides=None)

Build config stack and return the merged agent config dict.

Convenience wrapper around build_agent_config_stack for callers that only need the final resolved dict (e.g. task runners).

Source code in src/terok/lib/orchestration/agent_config.py
def resolve_agent_config(
    project_id: str,
    *,
    agent_config: dict[str, Any] | None = None,
    project_root: Path | None = None,
    preset: str | None = None,
    cli_overrides: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Build config stack and return the merged agent config dict.

    Convenience wrapper around [`build_agent_config_stack`][terok.lib.orchestration.agent_config.build_agent_config_stack] for callers
    that only need the final resolved dict (e.g. task runners).
    """
    return build_agent_config_stack(
        project_id,
        agent_config=agent_config,
        project_root=project_root,
        preset=preset,
        cli_overrides=cli_overrides,
    ).resolve()

check_clearance_units_outdated()

Drift summary across hub + notifier — shim around outdated_summary.

Source code in src/terok/lib/api/clearance.py
def check_clearance_units_outdated() -> str | None:
    """Drift summary across hub + notifier — shim around ``outdated_summary``."""
    return clearance_outdated_summary()

read_installed_notifier_unit_version()

Notifier unit version — shim around NotifierService.installed_version.

Source code in src/terok/lib/api/clearance.py
def read_installed_notifier_unit_version() -> int | None:
    """Notifier unit version — shim around ``NotifierService.installed_version``."""
    return NotifierService.installed_version()

read_installed_unit_version()

Hub unit version — shim around HubService.installed_version.

Source code in src/terok/lib/api/clearance.py
def read_installed_unit_version() -> int | None:
    """Hub unit version — shim around ``HubService.installed_version``."""
    return HubService.installed_version()

make_git_gate(config, *, use_personal_ssh=None)

Construct a GitGate from a ProjectConfig (adapter factory).

Injects validate_gate_upstream_match as the gate validation callback. The use_personal_ssh flag resolves per-invocation override (e.g. terok gate-sync --use-personal-ssh) > per-project YAML (ssh.use_personal) > default False.

Source code in src/terok/lib/domain/project.py
def make_git_gate(config: ProjectConfig, *, use_personal_ssh: bool | None = None) -> GitGate:
    """Construct a `GitGate` from a [`ProjectConfig`][terok.cli.commands.sickbay.ProjectConfig] (adapter factory).

    Injects ``validate_gate_upstream_match`` as the gate validation callback.
    The ``use_personal_ssh`` flag resolves per-invocation override (e.g.
    ``terok gate-sync --use-personal-ssh``) > per-project YAML
    (``ssh.use_personal``) > default ``False``.
    """
    effective = use_personal_ssh if use_personal_ssh is not None else config.ssh_use_personal
    return GitGate(
        scope=config.id,
        gate_path=config.gate_path,
        upstream_url=config.upstream_url,
        default_branch=config.default_branch,
        use_personal_ssh=effective,
        validate_gate_fn=validate_gate_upstream_match,
        clone_cache_base=make_sandbox_config().clone_cache_base_path,
    )

cleanup_images(dry_run=False)

Find and remove orphaned terok images in one shot.

Thin convenience over find_orphaned_images + remove_images for callers that don't need to inspect the list first.

Parameters:

Name Type Description Default
dry_run bool

If True, only report what would be removed without removing.

False

Returns:

Type Description
CleanupResult

CleanupResult with lists of removed and failed image display names.

Source code in src/terok/lib/domain/image_cleanup.py
def cleanup_images(dry_run: bool = False) -> CleanupResult:
    """Find and remove orphaned terok images in one shot.

    Thin convenience over [`find_orphaned_images`][terok.lib.domain.image_cleanup.find_orphaned_images]
    + [`remove_images`][terok.lib.domain.image_cleanup.remove_images] for callers that
    don't need to inspect the list first.

    Args:
        dry_run: If True, only report what would be removed without removing.

    Returns:
        CleanupResult with lists of removed and failed image display names.
    """
    return remove_images(find_orphaned_images(), dry_run=dry_run)

delete_project(project_id)

Delete a project and all its associated data.

Removes task workspaces, task metadata, build artifacts, SSH credentials, the git gate (if not shared with other projects), and the project config directory.

Source code in src/terok/lib/domain/project.py
def delete_project(project_id: str) -> DeleteProjectResult:
    """Delete a project and all its associated data.

    Removes task workspaces, task metadata, build artifacts, SSH credentials,
    the git gate (if not shared with other projects), and the project config
    directory.
    """
    archive_path = _archive_project(project_id)
    if archive_path is None:
        raise SystemExit(
            f"Project archiving failed for '{project_id}'; aborting deletion to prevent data loss."
        )

    project = load_project(project_id)
    pid = project.id
    deleted: list[str] = []
    skipped: list[str] = []

    # 1. Stop + remove all tasks
    for task in get_tasks(pid):
        try:
            task_delete(pid, task.task_id)
        except Exception as exc:
            _logger.warning("Failed to delete task %s: %s", task.task_id, exc)

    # 2. Remove tasks root (may be user-configured path)
    _rmtree_managed(project.tasks_root, "Tasks root", deleted, skipped)

    # 3-4. Remove state dir, build artifacts, and any remaining task archives
    for d in (core_state_dir() / "projects" / pid, build_dir() / pid, archive_dir() / pid):
        if d.is_dir():
            shutil.rmtree(d)
            deleted.append(str(d))

    # 5. SSH credentials — unassign from vault; orphan keys cascade-delete.
    _unassign_vault_ssh_keys(pid, deleted, skipped)

    # 6. Git gate (skip if shared with other projects)
    sharing = find_projects_sharing_gate(project.gate_path, exclude_project=pid)
    if sharing:
        names = ", ".join(p for p, _ in sharing)
        skipped.append(f"Gate {project.gate_path} shared with: {names}")
    else:
        _rmtree_managed(project.gate_path, "Gate", deleted, skipped)

    # 7. Staging root (gatekeeping mode, may be user-configured path)
    if project.staging_root:
        _rmtree_managed(project.staging_root, "Staging root", deleted, skipped)

    # 8. Project config directory
    if project.root.is_dir():
        shutil.rmtree(project.root)
        deleted.append(str(project.root))

    return DeleteProjectResult(deleted=deleted, skipped=skipped, archive=archive_path)

derive_project(source_id, new_id)

Copy source_id's gate mirror and vault SSH assignments under new_id.

Source code in src/terok/lib/domain/project.py
def derive_project(source_id: str, new_id: str) -> Project:
    """Copy *source_id*'s gate mirror and vault SSH assignments under *new_id*."""
    _derive_project(source_id, new_id)
    _share_ssh_key_assignments(source_id, new_id)
    return Project(load_project(new_id))

discover_projects()

Load every project on disk, splitting successes from config-level failures.

The broken list lets the TUI render damaged projects alongside healthy ones (issue #565) — silently hiding them turns "project vanished" into a mystery. _parse_project_yaml wraps every config error (bad YAML, schema drift, filesystem issues) in SystemExit with a human-readable message; anything else propagates as a genuine bug.

Source code in src/terok/lib/core/projects.py
def discover_projects() -> tuple[list[ProjectConfig], list[BrokenProject]]:
    """Load every project on disk, splitting successes from config-level failures.

    The broken list lets the TUI render damaged projects alongside healthy
    ones (issue #565) — silently hiding them turns "project vanished" into
    a mystery.  ``_parse_project_yaml`` wraps every config error (bad YAML,
    schema drift, filesystem issues) in ``SystemExit`` with a human-readable
    message; anything else propagates as a genuine bug.
    """
    paths_by_id = _discover_project_paths()
    valid: list[ProjectConfig] = []
    broken: list[BrokenProject] = []
    for pid in sorted(paths_by_id):
        try:
            valid.append(load_project(pid))
        except SystemExit as exc:
            msg = _sanitize_for_tty(str(exc))
            broken.append(BrokenProject(id=pid, config_path=paths_by_id[pid], error=msg))
    return valid, broken

execute_panic(*, stop_containers=False)

Execute the full panic sequence.

Discovers every running container, then raises shields, stops vault and gate — all in parallel. If stop_containers, also kills the containers afterwards (SIGKILL; they are not removed).

Source code in src/terok/lib/domain/panic.py
def execute_panic(
    *,
    stop_containers: bool = False,
) -> PanicResult:
    """Execute the full panic sequence.

    Discovers every running container, then raises shields, stops vault
    and gate — all in parallel.  If *stop_containers*, also kills the
    containers afterwards (SIGKILL; they are not removed).
    """
    result = PanicResult()
    targets = _discover_targets()
    result.total_running = len(targets)
    result.shield_bypassed = get_shield_bypass_firewall_no_protection()

    _phase1_lockdown(result, targets)
    _write_panic_lock()

    # Phase 2: optional container kill
    if stop_containers and targets:
        result.containers_stopped, result.container_stop_errors = _stop_containers(targets)

    return result

find_orphaned_images()

Find terok images that are orphaned and safe to remove.

Orphaned images include: - Dangling images (<none>:<none>) from terok layer rebuilds - L2 project images whose project no longer exists in the config

Source code in src/terok/lib/domain/image_cleanup.py
def find_orphaned_images() -> list[ImageInfo]:
    """Find terok images that are orphaned and safe to remove.

    Orphaned images include:
    - Dangling images (``<none>:<none>``) from terok layer rebuilds
    - L2 project images whose project no longer exists in the config
    """
    known_ids = _known_project_ids()

    # Dangling images that descended from terok base layers
    dangling = _find_dangling_terok_images()

    # L2 images for projects that no longer exist (skip if discovery failed)
    orphaned_l2: list[ImageInfo] = []
    if known_ids is not None:
        all_images = list_images()
        orphaned_l2 = [
            img
            for img in all_images
            if _is_terok_l2_image(img.repository, img.tag)
            and img.project_key not in known_ids
            and _is_terok_built_image(img.image_id)
        ]

    # Combine, dedup by image ID
    seen_ids: set[str] = set()
    result: list[ImageInfo] = []
    for img in [*dangling, *orphaned_l2]:
        if img.image_id not in seen_ids:
            seen_ids.add(img.image_id)
            result.append(img)
    return result

find_projects_sharing_gate(gate_path, exclude_project=None)

Find all projects configured to use the same gate path.

Parameters:

Name Type Description Default
gate_path Path

The gate path to check for

required
exclude_project str | None

Project ID to exclude from results (usually the current project)

None

Returns:

Type Description
list[tuple[str, str | None]]

List of (project_id, upstream_url) tuples for projects sharing this gate

Source code in src/terok/lib/domain/project.py
def find_projects_sharing_gate(
    gate_path: Path, exclude_project: str | None = None
) -> list[tuple[str, str | None]]:
    """Find all projects configured to use the same gate path.

    Args:
        gate_path: The gate path to check for
        exclude_project: Project ID to exclude from results (usually the current project)

    Returns:
        List of (project_id, upstream_url) tuples for projects sharing this gate
    """
    from ..core.projects import list_projects as _list_projects

    gate_path = gate_path.resolve()
    return [
        (project.id, project.upstream_url)
        for project in _list_projects()
        if project.id != exclude_project and project.gate_path.resolve() == gate_path
    ]

format_panic_report(result)

Format a human-readable summary of the panic result.

Source code in src/terok/lib/domain/panic.py
def format_panic_report(result: PanicResult) -> str:
    """Format a human-readable summary of the panic result."""
    lines = [
        f"Containers found: {result.total_running}",
        _format_shield_status(result),
        f"Vault: {'locked + stopped' if result.vault_stopped else 'FAILED'}",
        f"Gate:  {'stopped' if result.gate_stopped else 'FAILED'}",
    ]

    if result.containers_stopped:
        lines.append(f"Containers killed: {len(result.containers_stopped)}")

    if result.has_errors:
        lines += ["", "Errors:", *_format_errors(result)]

    return "\n".join(lines)

get_project(project_id)

Load a project by ID and return a rich Project aggregate.

Source code in src/terok/lib/domain/project.py
def get_project(project_id: str) -> Project:
    """Load a project by ID and return a rich [`Project`][terok.lib.domain.project.Project] aggregate."""
    return Project(load_project(project_id))

list_images(project_id=None)

List terok-managed images, optionally filtered by project.

Parameters:

Name Type Description Default
project_id str | None

If given, only show images for this project.

None

Returns:

Type Description
list[ImageInfo]

List of ImageInfo objects for matching images.

Source code in src/terok/lib/domain/image_cleanup.py
def list_images(project_id: str | None = None) -> list[ImageInfo]:
    """List terok-managed images, optionally filtered by project.

    Args:
        project_id: If given, only show images for this project.

    Returns:
        List of ImageInfo objects for matching images.
    """
    images: list[ImageInfo] = []
    for image in PodmanRuntime().images():
        repo = image.repository.removeprefix("localhost/")
        if not _is_terok_image(repo, image.tag):
            continue
        if project_id is not None:
            # Filter: L2 images must match the project; L0/L1 always shown
            if _is_terok_l2_image(repo, image.tag) and repo != project_id:
                continue
        images.append(ImageInfo.from_image(image))
    return images

list_projects()

Return all known projects as rich Project aggregates.

Source code in src/terok/lib/domain/project.py
def list_projects() -> list[Project]:
    """Return all known projects as rich [`Project`][terok.lib.domain.project.Project] aggregates."""
    return [Project(cfg) for cfg in _list_projects()]

load_project(project_id)

Load and return a fully resolved ProjectConfig from project_id.

Source code in src/terok/lib/core/projects.py
def load_project(project_id: str) -> ProjectConfig:
    """Load and return a fully resolved [`ProjectConfig`][terok.cli.commands.sickbay.ProjectConfig] from *project_id*."""
    root = _find_project_root(project_id)
    cfg_path = root / _PROJECT_YML
    if not cfg_path.is_file():
        raise SystemExit(f"Missing {_PROJECT_YML} in {root}")

    raw = _parse_project_yaml(cfg_path)

    # Git identity resolved via ConfigStack: git-global → terok-global → project.yml
    git_dict = raw.git.model_dump(exclude_none=True)
    identity_stack = ConfigStack()
    identity_stack.push(ConfigScope("git-global", None, _git_global_identity()))
    identity_stack.push(ConfigScope("terok-global", None, _validated_global_git_section()))
    identity_stack.push(ConfigScope("project", cfg_path, git_dict))
    identity = identity_stack.resolve()

    try:
        return _build_project_config(raw, identity, root, project_id)
    except ValidationError as exc:
        # Identity values come from merged sources (git config, global config,
        # project.yml).  Include provenance in the error so the user knows
        # where to look.
        sources = ", ".join(s.level for s in identity_stack.scopes if s.data)
        raise SystemExit(
            _format_validation_error(exc, cfg_path) + f"\n  (git identity merged from: {sources})"
        )

panic_stop_containers()

Discover and SIGKILL all running containers (Phase 2 standalone).

Source code in src/terok/lib/domain/panic.py
def panic_stop_containers() -> tuple[list[str], list[tuple[str, str]]]:
    """Discover and SIGKILL all running containers (Phase 2 standalone)."""
    return _stop_containers(_discover_targets())

project_image_exists(project_id)

Return True when the project's L2 CLI image is present locally.

Source code in src/terok/lib/domain/project.py
def project_image_exists(project_id: str) -> bool:
    """Return ``True`` when the project's L2 CLI image is present locally."""
    return image_exists(project_cli_image(project_id))

remove_images(images, *, dry_run=False)

Remove a pre-computed set of images (or just report under dry_run).

Split out from cleanup_images so the CLI can present the orphan list, prompt for confirmation, and only then act — without paying the discovery cost twice.

Parameters:

Name Type Description Default
images Iterable[ImageInfo]

The images to remove. Iterated once.

required
dry_run bool

If True, only report names without invoking the runtime.

False

Returns:

Type Description
CleanupResult

CleanupResult with lists of removed and failed image display names.

Source code in src/terok/lib/domain/image_cleanup.py
def remove_images(images: Iterable[ImageInfo], *, dry_run: bool = False) -> CleanupResult:
    """Remove a pre-computed set of images (or just report under *dry_run*).

    Split out from [`cleanup_images`][terok.lib.domain.image_cleanup.cleanup_images]
    so the CLI can present the orphan list, prompt for confirmation, and only
    then act — without paying the discovery cost twice.

    Args:
        images: The images to remove.  Iterated once.
        dry_run: If True, only report names without invoking the runtime.

    Returns:
        CleanupResult with lists of removed and failed image display names.
    """
    removed: list[str] = []
    failed: list[str] = []
    for img in images:
        if dry_run:
            removed.append(img.full_name)
            continue
        try:
            if PodmanRuntime().image(img.image_id).remove():
                removed.append(img.full_name)
            else:
                failed.append(img.full_name)
        except Exception as exc:  # noqa: BLE001 — one bad image shouldn't abort the sweep
            from ..util.logging_utils import log_warning

            log_warning(f"Image cleanup failed for {img.full_name}: {exc}")
            failed.append(img.full_name)
    return CleanupResult(removed=removed, failed=failed, dry_run=dry_run)

render_project_yaml(values)

Render project.yml without writing it — used by the TUI review screen.

Source code in src/terok/lib/domain/wizards/new_project.py
def render_project_yaml(values: dict) -> str:
    """Render ``project.yml`` without writing it — used by the TUI review screen."""
    variables = {
        "PROJECT_ID": values["project_id"],
        "UPSTREAM_URL": values["upstream_url"],
        "DEFAULT_BRANCH": values["default_branch"],
        "USER_SNIPPET": values["user_snippet"],
        "SECURITY_CLASS": values["security_class"],
        "BASE": values["base"],
        "BASE_IMAGE": BASE_IMAGES[values["base"]],
        # Empty string suppresses the ``agents:`` line via the
        # template's ``{% if AGENTS %}`` gate — the project then
        # inherits the global default written by ``terok agents set``.
        "AGENTS": values.get("agents", ""),
    }
    with resources.as_file(_TEMPLATE_DIR / _TEMPLATE_NAME) as template_path:
        # ``StrictUndefined`` upgrades silent ``{{TYPO}}`` to a hard
        # error; ``autoescape=False`` because YAML output would be
        # corrupted by HTML escaping.  The wizard's template uses
        # ``{% if %}`` blocks and the ``| indent`` filter — Jinja2
        # control flow, not just ``{{VAR}}`` substitution.
        env = jinja2.Environment(  # nosec B701 — see comment above  # noqa: S701
            loader=jinja2.FileSystemLoader(str(template_path.parent)),
            keep_trailing_newline=True,
            undefined=jinja2.StrictUndefined,
            autoescape=False,
        )
        return env.get_template(template_path.name).render(**variables)

require_project_exists(project_id)

Raise SystemExit unless project_id names a known project.

Cheap stat-based check — no YAML parse, no pydantic validation. Use this in CLI entry points that want to fail before any user-visible side effect (interactive prompt, status print, image build offer). The downstream load_project call still catches malformed YAML.

Source code in src/terok/lib/core/projects.py
def require_project_exists(project_id: str) -> None:
    """Raise [`SystemExit`][SystemExit] unless *project_id* names a known project.

    Cheap stat-based check — no YAML parse, no pydantic validation.  Use
    this in CLI entry points that want to fail before any user-visible
    side effect (interactive prompt, status print, image build offer).
    The downstream [`load_project`][terok.lib.core.projects.load_project]
    call still catches malformed YAML.
    """
    _find_project_root(project_id)

set_project_image_agents(project_id, selection)

Write selection into the project's project.yml under image.agents.

Caller validates selection up-front; on success returns the project.yml path written.

Source code in src/terok/lib/core/projects.py
def set_project_image_agents(project_id: str, selection: str) -> Path:
    """Write *selection* into the project's ``project.yml`` under ``image.agents``.

    Caller validates *selection* up-front; on success returns the
    project.yml path written.
    """
    from terok.lib.integrations.sandbox import yaml_update_section

    cfg_path = _find_project_root(project_id) / _PROJECT_YML
    yaml_update_section(cfg_path, "image", {"agents": selection})
    return cfg_path

summarize_ssh_init(result)

Render an ssh-init result for the terminal.

Source code in src/terok/lib/domain/ssh.py
def summarize_ssh_init(result: SSHInitResult) -> None:
    """Render an ``ssh-init`` result for the terminal."""
    print(f"  id:          {result['key_id']}")
    print(f"  type:        {result['key_type']}")
    print(f"  fingerprint: {result['fingerprint']}")
    print(f"  comment:     {result['comment']}")
    print("Public key (register as a deploy key on the remote):")
    print(f"  {result['public_line']}")

validate_answer(question, raw)

Normalise and validate a raw answer for question.

Returns (value, error_or_None) — the normalised value and an error message if the answer was rejected. Both presenters call this so validation semantics stay identical regardless of UI.

Normalisation, in order:

  1. Strip surrounding whitespace (copy-paste leftovers, accidental trailing spaces). All-whitespace input is indistinguishable from empty for the required/optional check.
  2. Apply question.transform if set (e.g. str.lower).
  3. Enforce the required flag against the final value.
  4. For kind="choice", the value must be one of the declared slugs — defensive against presenter bugs that might submit a label, index, or free-form typo.
  5. Run question.validate for field-specific rules.
Source code in src/terok/lib/domain/wizards/new_project.py
def validate_answer(question: Question, raw: str) -> tuple[str, str | None]:
    """Normalise and validate a raw answer for *question*.

    Returns ``(value, error_or_None)`` — the normalised value and an
    error message if the answer was rejected.  Both presenters call
    this so validation semantics stay identical regardless of UI.

    Normalisation, in order:

    1. Strip surrounding whitespace (copy-paste leftovers, accidental
       trailing spaces).  All-whitespace input is indistinguishable
       from empty for the required/optional check.
    2. Apply ``question.transform`` if set (e.g. ``str.lower``).
    3. Enforce the required flag against the final value.
    4. For ``kind="choice"``, the value must be one of the declared
       slugs — defensive against presenter bugs that might submit a
       label, index, or free-form typo.
    5. Run ``question.validate`` for field-specific rules.
    """
    value = raw.strip()
    if question.transform:
        value = question.transform(value)
    if question.required and not value:
        return value, f"{question.prompt} is required."
    if question.kind == "choice" and value:
        valid_slugs = {slug for slug, _label in question.resolve_choices()}
        if value not in valid_slugs:
            allowed = ", ".join(sorted(valid_slugs))
            return value, f"{question.prompt} must be one of: {allowed}"
    if question.validate:
        err = question.validate(value)
        if err:
            return value, err
    return value, None

write_project_yaml(project_id, rendered, *, overwrite=False)

Write rendered YAML to <user_projects_dir>/<project_id>/project.yml.

The TUI reviews YAML in a TextArea before writing, so this is the write half of generate_config — kept separate so the TUI can pass tweaked content without re-rendering the template.

Source code in src/terok/lib/domain/wizards/new_project.py
def write_project_yaml(project_id: str, rendered: str, *, overwrite: bool = False) -> Path:
    """Write *rendered* YAML to ``<user_projects_dir>/<project_id>/project.yml``.

    The TUI reviews YAML in a ``TextArea`` before writing, so this is the
    write half of [`generate_config`][terok.lib.domain.wizards.new_project.generate_config] — kept separate so the TUI can
    pass tweaked content without re-rendering the template.
    """
    project_dir = user_projects_dir() / project_id
    ensure_dir_writable(project_dir, "Project")
    config_path = project_dir / "project.yml"
    if config_path.exists() and not overwrite:
        return config_path
    config_path.write_text(rendered, encoding="utf-8")
    return config_path

agent_config_dir(project_id, task_id)

Host path of the agent-config dir bind-mounted at CONTAINER_TEROK_CONFIG.

Source code in src/terok/lib/orchestration/tasks/meta.py
def agent_config_dir(project_id: str, task_id: str) -> Path:
    """Host path of the agent-config dir bind-mounted at `CONTAINER_TEROK_CONFIG`."""
    _reject_unsafe_id(project_id, "project_id")
    _reject_unsafe_id(task_id, "task_id")
    return load_project(project_id).tasks_root / str(task_id) / "agent-config"

container_name(project_id, mode, task_id)

Return the canonical container name for a task.

Source code in src/terok/lib/core/task_state.py
def container_name(project_id: str, mode: str, task_id: str) -> str:
    """Return the canonical container name for a task."""
    return f"{project_id}-{mode}-{task_id}"

effective_status(task)

Compute the display status from task lifecycle state.

Reads the following fields from a TaskState instance:

  • container_state (str | None): live podman state, or None
  • exit_code (int | None): process exit code, or None
  • deleting (bool): persisted to YAML before deletion starts
  • initialized (bool): True once ready_at is persisted to YAML

Returns one of: "deleting", "running", "init", "starting", "stopped", "completed", "failed", "created", "not found".

Source code in src/terok/lib/core/task_state.py
def effective_status(task: TaskState) -> str:
    """Compute the display status from task lifecycle state.

    Reads the following fields from a ``TaskState`` instance:

    - ``container_state`` (str | None): live podman state, or None
    - ``exit_code`` (int | None): process exit code, or None
    - ``deleting`` (bool): persisted to YAML before deletion starts
    - ``initialized`` (bool): True once ``ready_at`` is persisted to YAML

    Returns one of: ``"deleting"``, ``"running"``, ``"init"``,
    ``"starting"``, ``"stopped"``, ``"completed"``, ``"failed"``,
    ``"created"``, ``"not found"``.
    """
    if task.deleting:
        return "deleting"

    cs = task.container_state

    if cs == "running":
        return "running" if task.initialized else "init"

    if cs is not None:
        return _exit_code_status(task.exit_code) or "stopped"

    # No container yet — ``starting`` fills the launch-worker gap
    # before podman has created the container.  Once it's up, the
    # ``cs == "running"`` branch above takes over with ``init``.
    if task.starting:
        return "starting"
    if not task.initialized:
        return "created"
    return _exit_code_status(task.exit_code) or "not found"

generate_task_name(project_id=None)

Generate a random human-readable task name (e.g. talented-toucan).

When project_id is given, name categories are resolved from config: project tasks.name_categories → global tasks.name_categories → deterministic 3-category selection based on project ID hash.

Source code in src/terok/lib/orchestration/tasks/naming.py
def generate_task_name(project_id: str | None = None) -> str:
    """Generate a random human-readable task name (e.g. ``talented-toucan``).

    When *project_id* is given, name categories are resolved from config:
    project ``tasks.name_categories`` → global ``tasks.name_categories``
    → deterministic 3-category selection based on project ID hash.
    """
    import namer

    categories = _resolve_name_categories(project_id) if project_id else None
    return namer.generate(separator="-", category=categories)

get_all_task_states(project_id, tasks)

Map each task to its live container state via a single batch query.

Parameters:

Name Type Description Default
project_id str

The project whose containers to query.

required
tasks list[TaskMeta]

List of TaskMeta instances (must have task_id and mode).

required

Returns:

Type Description
dict[str, str | None]

{task_id: container_state_or_None} dict.

Source code in src/terok/lib/orchestration/tasks/query.py
def get_all_task_states(
    project_id: str,
    tasks: list[TaskMeta],
) -> dict[str, str | None]:
    """Map each task to its live container state via a single batch query.

    Args:
        project_id: The project whose containers to query.
        tasks: List of ``TaskMeta`` instances (must have ``task_id`` and ``mode``).

    Returns:
        ``{task_id: container_state_or_None}`` dict.
    """
    # Use PodmanRuntime directly: this is a `podman ps` enumeration that
    # doesn't differ across OCI runtimes.
    from terok.lib.integrations.sandbox import PodmanRuntime

    container_states = PodmanRuntime().container_states(project_id)
    result: dict[str, str | None] = {}
    for t in tasks:
        if t.mode:
            cname = container_name(project_id, t.mode, str(t.task_id))
            result[str(t.task_id)] = container_states.get(cname)
        else:
            result[str(t.task_id)] = None
    return result

get_login_command(project_id, task_id)

Return the podman exec command to log into a task container.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def get_login_command(project_id: str, task_id: str) -> list[str]:
    """Return the podman exec command to log into a task container."""
    return _get_login_command(load_project(project_id), task_id)

get_task_meta(project_id, task_id)

Return metadata for a single task with live container state.

Hydrates container_state from the running container so that TaskMeta.status reflects current reality rather than stale YAML. Raises SystemExit if the task metadata file is not found.

Source code in src/terok/lib/orchestration/tasks/query.py
def get_task_meta(project_id: str, task_id: str) -> TaskMeta:
    """Return metadata for a single task with live container state.

    Hydrates ``container_state`` from the running container so that
    ``TaskMeta.status`` reflects current reality rather than stale YAML.
    Raises ``SystemExit`` if the task metadata file is not found.
    """
    meta_dir = tasks_meta_dir(project_id)
    raw = read_task_meta(meta_dir, task_id)
    if raw is None:
        raise SystemExit(f"Unknown task {task_id}")
    mode = raw.get("mode")
    # Fall back to the caller-known task_id rather than a blank identity
    # when the on-disk record predates the field.
    tid = str(raw.get("task_id") or task_id)
    # Hydrate live container state only for tasks that have actually been started
    # (state probe is runtime-agnostic — see ``get_task_container_state``).
    live_state: str | None = None
    if mode is not None:
        try:
            from terok.lib.integrations.sandbox import PodmanRuntime

            cname = container_name(project_id, mode, task_id)
            live_state = PodmanRuntime().container(cname).state
        except Exception:
            pass
    # Hydrate work status from agent-config (same logic as _get_tasks)
    ws_status: str | None = None
    ws_message: str | None = None
    if tid:
        project = load_project(project_id)
        try:
            agent_cfg = project.tasks_root / tid / "agent-config"
            ws = read_work_status(agent_cfg)
            ws_status = ws.status
            ws_message = ws.message
        except Exception:  # noqa: BLE001 — best-effort; agent-config may not exist yet
            pass
    return TaskMeta(
        task_id=tid,
        # ``or`` (not the dict default) so a migrated record carrying an
        # empty string still falls back to the path-derived project_id.
        project_id=raw.get("project_id") or project_id,
        mode=mode,
        workspace=raw.get("workspace", ""),
        web_port=raw.get("web_port"),
        web_token=raw.get("web_token"),
        backend=raw.get("backend"),
        container_state=live_state,
        exit_code=raw.get("exit_code"),
        deleting=bool(raw.get("deleting")),
        initialized=_is_initialized(raw),
        preset=raw.get("preset"),
        name=raw["name"],
        provider=raw.get("provider"),
        unrestricted=raw.get("unrestricted"),
        work_status=ws_status,
        work_message=ws_message,
        created_at=raw.get("created_at"),
    )

get_tasks(project_id, reverse=False)

Return all task metadata for project_id, sorted by task ID.

Source code in src/terok/lib/orchestration/tasks/query.py
def get_tasks(project_id: str, reverse: bool = False) -> list[TaskMeta]:
    """Return all task metadata for *project_id*, sorted by task ID."""
    return _get_tasks(project_id, reverse=reverse)

get_workspace_git_diff(project_id, task_id, against='HEAD')

Get git diff from a task's workspace via container exec.

Runs git diff inside the task container rather than on the host, so that even poisoned git hooks only execute within the container sandbox.

Parameters:

Name Type Description Default
project_id str

The project ID

required
task_id str

The task ID

required
against str

What to diff against ("HEAD" or "PREV")

'HEAD'

Returns:

Type Description
str | None

The git diff output as a string, or None if failed

Source code in src/terok/lib/orchestration/tasks/query.py
def get_workspace_git_diff(project_id: str, task_id: str, against: str = "HEAD") -> str | None:
    """Get git diff from a task's workspace via container exec.

    Runs ``git diff`` **inside** the task container rather than on the host,
    so that even poisoned git hooks only execute within the container sandbox.

    Args:
        project_id: The project ID
        task_id: The task ID
        against: What to diff against (``"HEAD"`` or ``"PREV"``)

    Returns:
        The git diff output as a string, or ``None`` if failed
    """
    try:
        load_project(project_id)  # validate project exists
        meta_dir = tasks_meta_dir(project_id)
        meta = read_task_meta(meta_dir, task_id)
        if meta is None:
            return None
        mode = meta.get("mode")
        if not mode:
            return None

        if against == "PREV":
            return container_git_diff(project_id, task_id, mode, "HEAD~1", "HEAD")
        return container_git_diff(project_id, task_id, mode, "HEAD")

    except (Exception, SystemExit):
        return None

has_gpu(project)

True when the project's project.yml opts into GPU passthrough.

Accepts any object with a root attribute pointing to the project directory (typically a Project instance). Returns False on any I/O or parse error.

Source code in src/terok/lib/core/task_state.py
def has_gpu(project: Any) -> bool:
    """True when the project's ``project.yml`` opts into GPU passthrough.

    Accepts any object with a ``root`` attribute pointing to the project
    directory (typically a ``Project`` instance).  Returns ``False`` on
    any I/O or parse error.
    """
    root = getattr(project, "root", None)
    if root is None:
        return False
    try:
        cfg = _yaml_load((root / "project.yml").read_text()) or {}
    except (OSError, TypeError, AttributeError, YAMLError):
        return False
    gpus = (cfg.get("run") or {}).get("gpus")
    if isinstance(gpus, str):
        return gpus.lower() == "all"
    if isinstance(gpus, bool):
        return gpus
    return False

mark_task_deleting(project_id, task_id)

Persist deleting: true to the task's metadata file.

Source code in src/terok/lib/orchestration/tasks/meta.py
def mark_task_deleting(project_id: str, task_id: str) -> None:
    """Persist ``deleting: true`` to the task's metadata file."""
    try:
        meta_dir = tasks_meta_dir(project_id)
        meta = read_task_meta(meta_dir, task_id)
        if meta is None:
            return
        meta["deleting"] = True
        write_task_meta(dossier_path(meta_dir, task_id), meta)
    except Exception as e:
        _log_debug(f"mark_task_deleting: failed project_id={project_id} task_id={task_id}: {e}")

mode_info(mode)

Return the display info for a task mode string.

Source code in src/terok/lib/core/task_display.py
def mode_info(mode: str | None) -> ModeInfo:
    """Return the display info for a task mode string."""
    info = MODE_DISPLAY.get(mode if isinstance(mode, str) else None)
    return info if info else MODE_DISPLAY[None]

sanitize_task_name(raw)

Sanitize a raw task name into a slug-style identifier.

Strips whitespace, lowercases, replaces spaces with hyphens, removes characters outside [a-z0-9_-], collapses consecutive hyphens, strips trailing hyphens, and truncates to TASK_NAME_MAX_LEN. Returns None if the result is empty.

Leading hyphens are preserved so callers can detect and reject them (a name starting with - looks like a CLI flag).

Source code in src/terok/lib/orchestration/tasks/naming.py
def sanitize_task_name(raw: str | None) -> str | None:
    """Sanitize a raw task name into a slug-style identifier.

    Strips whitespace, lowercases, replaces spaces with hyphens,
    removes characters outside ``[a-z0-9_-]``, collapses consecutive
    hyphens, strips trailing hyphens, and truncates to
    ``TASK_NAME_MAX_LEN``.  Returns ``None`` if the result is empty.

    Leading hyphens are preserved so callers can detect and reject them
    (a name starting with ``-`` looks like a CLI flag).
    """
    if raw is None:
        return None
    name = raw.strip().lower()
    name = name.replace(" ", "-")
    name = re.sub(r"[^a-z0-9_-]", "", name)
    name = re.sub(r"-{2,}", "-", name)
    name = name.rstrip("-")
    name = name[:TASK_NAME_MAX_LEN]
    return name or None

task_archive_list(project_id)

Print archived tasks for project_id.

Source code in src/terok/lib/orchestration/tasks/archive.py
def task_archive_list(project_id: str) -> None:
    """Print archived tasks for *project_id*."""
    archived = list_archived_tasks(project_id)
    if not archived:
        print("No archived tasks found")
        return
    for a in archived:
        extra = []
        if a.mode:
            extra.append(f"mode={a.mode}")
        if a.exit_code is not None:
            extra.append(f"exit={a.exit_code}")
        extra_s = f" [{'; '.join(extra)}]" if extra else ""
        print(f"- {a.archived_at} #{a.task_id}: {a.name}{extra_s}")

task_archive_logs(project_id, archive_id)

Return the log file path for an archived task identified by archive_id.

archive_id is matched against archive directory names (prefix match). Returns the log file path if found, or None.

Source code in src/terok/lib/orchestration/tasks/archive.py
def task_archive_logs(project_id: str, archive_id: str) -> Path | None:
    """Return the log file path for an archived task identified by *archive_id*.

    *archive_id* is matched against archive directory names (prefix match).
    Returns the log file path if found, or ``None``.
    """
    archive_root = tasks_archive_dir(project_id)
    if not archive_root.is_dir():
        return None
    for entry in sorted(archive_root.iterdir(), reverse=True):
        if entry.is_dir() and entry.name.startswith(archive_id):
            log_file = entry / "logs" / "container.log"
            if log_file.is_file():
                return log_file
    return None

task_delete(project_id, task_id)

Delete a task's workspace, metadata, and any associated containers.

Before removal, captures container logs and archives the task metadata and logs to archive/<project_id>/tasks/. Containers are stopped best-effort via podman using the <project.id>-<mode>-<task_id> naming scheme. Returns a TaskDeleteResult so the caller can present any warnings from cleanup steps that failed.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_delete(project_id: str, task_id: str) -> TaskDeleteResult:
    """Delete a task's workspace, metadata, and any associated containers.

    Before removal, captures container logs and archives the task metadata
    and logs to ``archive/<project_id>/tasks/``.  Containers are stopped
    best-effort via podman using the ``<project.id>-<mode>-<task_id>``
    naming scheme.  Returns a [`TaskDeleteResult`][terok.lib.orchestration.tasks.TaskDeleteResult] so the caller can
    present any warnings from cleanup steps that failed.
    """
    return _task_delete(load_project(project_id), task_id)

task_followup_headless(project_id, task_id, prompt, follow=True)

Send a follow-up prompt to a completed/failed headless task.

Replaces prompt.txt with the new prompt (so the agent only sees the current instruction) and archives the previous content to prompt-history.txt. Restarts the stopped container via podman start. Session context is automatically restored for providers that support it:

  • Claude: resumes via --resume <session-id> (captured by a SessionStart hook that writes claude-session.txt).
  • OpenCode / Blablador: resumes via --session <id> (captured by the opencode-session-plugin.mjs plugin that writes the session file on session.created events).
  • Vibe: resumes via --resume <id> (session ID parsed post-run from ~/.vibe/logs/session/ metadata).
  • Codex / Copilot: no session resume support — follow-ups start a fresh session with the new prompt only.

Per-run flags (model, max_turns, timeout) carry forward from the original task_run_headless invocation since podman start re-executes the same container command.

Source code in src/terok/lib/orchestration/task_runners/headless.py
def task_followup_headless(
    project_id: str,
    task_id: str,
    prompt: str,
    follow: bool = True,
) -> None:
    """Send a follow-up prompt to a completed/failed headless task.

    Replaces prompt.txt with the new prompt (so the agent only sees the
    current instruction) and archives the previous content to
    ``prompt-history.txt``.  Restarts the stopped container via
    ``podman start``.  Session context is
    automatically restored for providers that support it:

    - **Claude**: resumes via ``--resume <session-id>`` (captured by a
      ``SessionStart`` hook that writes ``claude-session.txt``).
    - **OpenCode / Blablador**: resumes via ``--session <id>`` (captured by
      the ``opencode-session-plugin.mjs`` plugin that writes the session
      file on ``session.created`` events).
    - **Vibe**: resumes via ``--resume <id>`` (session ID parsed post-run
      from ``~/.vibe/logs/session/`` metadata).
    - **Codex / Copilot**: no session resume support — follow-ups start a
      fresh session with the new prompt only.

    Per-run flags (model, max_turns, timeout) carry forward from the
    original ``task_run_headless`` invocation since ``podman start``
    re-executes the same container command.
    """
    from terok.lib.integrations.executor import AGENT_PROVIDERS

    project = load_project(project_id)
    meta, meta_path = load_task_meta(project.id, task_id)

    mode = meta.get("mode")
    if mode != "run":
        raise SystemExit(
            f"Task {task_id} is not a headless task (mode={mode!r}). "
            f"Follow-up is only supported for autopilot (mode='run') tasks."
        )

    cname = container_name(project.id, "run", task_id)
    container_state = _rt.resolve_runtime(project).container(cname).state
    if container_state == "running":
        raise SystemExit(
            f"Container {cname} is still running. "
            f"Wait for it to finish or stop it before sending a follow-up."
        )
    if container_state is None:
        raise SystemExit(
            f"Container {cname} not found. Cannot follow up — the container may have been removed."
        )

    # Resolve provider from task metadata
    provider_name = meta.get("provider", "claude")
    resolved = AGENT_PROVIDERS.get(provider_name)
    if resolved is None:
        import warnings

        warnings.warn(
            f"Unknown provider {provider_name!r} in task metadata; session resume check skipped.",
            stacklevel=2,
        )
    label = resolved.label if resolved else provider_name

    if resolved and not resolved.supports_session_resume:
        print(
            f"Note: {label} does not support session resume. "
            f"Follow-up will start a fresh session with the new prompt."
        )

    task_dir = project.tasks_root / str(task_id)
    _inject_followup_prompt(
        is_sealed=project.is_sealed,
        cname=cname,
        agent_config_dir=task_dir / "agent-config",
        prompt=prompt,
    )

    # Ensure the vault is reachable before restarting — after a
    # host reboot the systemd socket may be active but the service idle.
    ensure_vault()

    # Restart the existing container (re-runs the original bash command,
    # which reads prompt.txt and session files from the volume)
    _podman_start(project, cname)
    _assert_running(project, cname)
    run_hook(
        "post_start",
        project.hook_post_start,
        project_id=project.id,
        task_id=task_id,
        mode="run",
        cname=cname,
        task_dir=task_dir,
        meta_path=meta_path,
    )
    _apply_shield_policy(project, cname, task_dir, is_restart=True)

    # Clear previous exit_code so effective_status shows "running" until new exit
    meta["exit_code"] = None
    write_task_meta(meta_path, meta)

    _report_headless_result(
        project_id=project.id,
        task_id=task_id,
        cname=cname,
        task_dir=task_dir,
        follow=follow,
        label=label,
        detached_label="Follow-up started (detached).",
    )

task_list(project_id, *, status=None, mode=None, agent=None)

List tasks for a project, optionally filtered by status, mode, or agent preset.

Status is computed live from podman container state + task metadata.

Source code in src/terok/lib/orchestration/tasks/query.py
def task_list(
    project_id: str,
    *,
    status: str | None = None,
    mode: str | None = None,
    agent: str | None = None,
) -> None:
    """List tasks for a project, optionally filtered by status, mode, or agent preset.

    Status is computed live from podman container state + task metadata.
    """
    tasks = get_tasks(project_id)

    # Pre-filter by mode/agent before the podman query to reduce work
    if mode:
        tasks = [t for t in tasks if t.mode == mode]
    if agent:
        tasks = [t for t in tasks if t.preset == agent]

    if not tasks:
        print("No tasks found")
        return

    # Batch-query podman for all container states in one call
    live_states = get_all_task_states(project_id, tasks)
    for t in tasks:
        t.container_state = live_states.get(t.task_id)

    # Filter by effective status (computed live)
    if status:
        tasks = [t for t in tasks if effective_status(t) == status]

    if not tasks:
        print("No tasks found")
        return

    for t in tasks:
        t_status = effective_status(t)
        extra = []
        if t.mode:
            extra.append(f"mode={t.mode}")
        if t.web_port:
            extra.append(f"port={t.web_port}")
        if t.work_status:
            extra.append(f"work={t.work_status}")
        extra_s = f" [{'; '.join(extra)}]" if extra else ""
        print(f"- {t.task_id}: {t.name} {t_status}{extra_s}")

task_login(project_id, task_id)

Open an interactive shell in a running task container.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_login(project_id: str, task_id: str) -> None:
    """Open an interactive shell in a running task container."""
    _task_login(load_project(project_id), task_id)

task_logs(project_id, task_id, options=None)

View formatted logs for a task container.

Works on both running and exited containers (podman logs supports both).

Parameters:

Name Type Description Default
project_id str

The project ID.

required
task_id str

The task ID.

required
options LogViewOptions | None

Display options (follow, raw, tail, streaming).

None
Source code in src/terok/lib/domain/task_logs.py
def task_logs(
    project_id: str,
    task_id: str,
    options: LogViewOptions | None = None,
) -> None:
    """View formatted logs for a task container.

    Works on both running and exited containers (podman logs supports both).

    Args:
        project_id: The project ID.
        task_id: The task ID.
        options: Display options (follow, raw, tail, streaming).
    """
    if options is None:
        options = LogViewOptions()
    import select
    import signal

    project = load_project(project_id)
    meta_dir = tasks_meta_dir(project.id)
    meta = read_task_meta(meta_dir, task_id)
    if meta is None:
        raise SystemExit(f"Unknown task {task_id}")

    mode = meta.get("mode")
    if not mode:
        raise SystemExit(
            f"Task {task_id} has never been run (no mode set).\n"
            f"  Start a fresh task: terok task run {project_id}\n"
            f"  Or run this stub:   terokctl task attach {project_id} {task_id} --mode cli"
        )

    # Validate --tail early so both live and persisted paths behave consistently
    if options.tail is not None and options.tail < 0:
        raise SystemExit("--tail must be >= 0")

    cname = container_name(project.id, mode, task_id)

    # Verify container exists (running or exited)
    state = _rt.resolve_runtime(project).container(cname).state
    if state is None:
        # Fall back to persisted log files on the host
        task_dir = project.tasks_root / str(task_id)
        log_file = task_dir / "logs" / "container.log"
        if log_file.is_file():
            _show_persisted_logs(
                log_file,
                tail=options.tail,
                streaming=options.streaming,
                mode=mode,
                provider=meta.get("provider"),
            )
            return
        raise SystemExit(
            f"Container {cname} does not exist and no persisted logs found. "
            f"Run 'terok task restart {project_id} {task_id}' first."
        )

    runner = AgentRunner()

    if options.raw:
        # Raw mode: exec podman directly, no formatting.  os.execvp replaces
        # this process — no executor-layer wrapping is appropriate.
        cmd = _build_raw_logs_cmd(cname, follow=options.follow, tail=options.tail)
        try:
            os.execvp(cmd[0], cmd)
        except FileNotFoundError:
            raise SystemExit("podman not found; please install podman")

    # Formatted mode: pipe through formatter
    provider = meta.get("provider")
    formatter = auto_detect_formatter(mode, streaming=options.streaming, provider=provider)

    try:
        proc = runner.stream_logs_process(cname, follow=options.follow, tail=options.tail)
    except FileNotFoundError:
        raise SystemExit("podman not found; please install podman")
    except OSError as exc:
        raise SystemExit(f"failed to launch podman logs: {exc}")
    if proc.stdout is None or proc.stderr is None:  # the factory uses PIPE for both
        raise SystemExit("podman logs streaming did not provide stdout/stderr pipes")

    # Handle Ctrl+C gracefully
    interrupted = False
    original_sigint = signal.getsignal(signal.SIGINT)

    def _sigint_handler(signum: int, frame: Any) -> None:
        """Set the interrupted flag on Ctrl+C."""
        nonlocal interrupted
        interrupted = True

    signal.signal(signal.SIGINT, _sigint_handler)

    try:
        buf = b""
        while not interrupted:
            if proc.poll() is not None:
                # Process exited — drain remaining output
                remaining = proc.stdout.read()
                if remaining:
                    buf += remaining
                break

            try:
                ready, _, _ = select.select([proc.stdout], [], [], 0.2)
                if not ready:
                    continue
                # read1 lives on BufferedReader, not IO[bytes] — Popen pipes
                # are buffered so the call works at runtime.
                chunk = proc.stdout.read1(4096) if hasattr(proc.stdout, "read1") else b""
                if not chunk:
                    continue
                buf += chunk
            except (OSError, ValueError):
                break

            # Process complete lines
            while b"\n" in buf:
                raw_line, buf = buf.split(b"\n", 1)
                line = raw_line.decode("utf-8", errors="replace")
                formatter.feed_line(line)

        # Flush any trailing partial line
        if buf:
            line = buf.decode("utf-8", errors="replace")
            if line.strip():
                formatter.feed_line(line)
    finally:
        signal.signal(signal.SIGINT, original_sigint)
        stderr_output = b""
        if proc.poll() is None:
            proc.terminate()
        try:
            proc.wait(timeout=2)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait()
        try:
            stderr_output = proc.stderr.read() or b""
        except (OSError, ValueError):
            pass
        formatter.finish()

    # Report podman errors if process failed and wasn't interrupted
    if not interrupted and proc.returncode and proc.returncode != 0:
        stderr_text = stderr_output.decode("utf-8", errors="replace").strip()
        if stderr_text:
            print(f"Warning: podman logs exited with code {proc.returncode}: {stderr_text}")

    if interrupted:
        print()

task_new(project_id, *, name=None)

Create a new task with a fresh workspace for a project.

Parameters:

Name Type Description Default
project_id str

The project to create the task under.

required
name str | None

Optional human-readable name. Allowed characters are lowercase letters, digits, hyphens, and underscores. If None, a random slug-style name is generated via generate_task_name.

None
Workspace Initialization Protocol:

Each task gets its own workspace directory that persists across container runs. When a container starts, the init script (init-ssh-and-repo.sh) needs to know whether this is:

  1. A NEW task that should be reset to the latest remote HEAD
  2. A RESTARTED task where local changes should be preserved

We use a marker file (.new-task-marker) to signal intent:

  • task_new() creates the marker in the workspace directory
  • init-ssh-and-repo.sh checks for the marker:
  • If marker exists: reset to origin/HEAD, then delete marker
  • If no marker: fetch only, preserve local state
  • Subsequent container runs on the same task won't see the marker, so local work is preserved

This handles edge cases like: - Stale workspace from incompletely deleted previous task with same ID - Ensuring new tasks always start with latest code

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_new(project_id: str, *, name: str | None = None) -> str:
    """Create a new task with a fresh workspace for a project.

    Args:
        project_id: The project to create the task under.
        name: Optional human-readable name.  Allowed characters are
            lowercase letters, digits, hyphens, and underscores.
            If ``None``, a random slug-style name is generated via
            [`generate_task_name`][terok.lib.orchestration.tasks.generate_task_name].

    Workspace Initialization Protocol:
    ----------------------------------
    Each task gets its own workspace directory that persists across container
    runs. When a container starts, the init script (init-ssh-and-repo.sh) needs
    to know whether this is:

    1. A NEW task that should be reset to the latest remote HEAD
    2. A RESTARTED task where local changes should be preserved

    We use a marker file (.new-task-marker) to signal intent:

    - task_new() creates the marker in the workspace directory
    - init-ssh-and-repo.sh checks for the marker:
      - If marker exists: reset to origin/HEAD, then delete marker
      - If no marker: fetch only, preserve local state
    - Subsequent container runs on the same task won't see the marker,
      so local work is preserved

    This handles edge cases like:
    - Stale workspace from incompletely deleted previous task with same ID
    - Ensuring new tasks always start with latest code
    """
    return _task_new(load_project(project_id), name=name)

task_rename(project_id, task_id, new_name)

Rename a task by updating its metadata file.

Sanitizes new_name and writes the result to the task's metadata file. Raises SystemExit if the task is unknown or the sanitized name is invalid.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_rename(project_id: str, task_id: str, new_name: str) -> None:
    """Rename a task by updating its metadata file.

    Sanitizes *new_name* and writes the result to the task's metadata file.
    Raises ``SystemExit`` if the task is unknown or the sanitized name is invalid.
    """
    _task_rename(load_project(project_id), task_id, new_name)

task_restart(project_id, task_id)

Restart a task's container.

Semantics: stop the container if running, then start it. If the container doesn't exist (e.g. because it was deleted out-of-band), raise SystemExit with an actionable pointer to terok task run — "restart" only means restart, not re-run.

Source code in src/terok/lib/orchestration/task_runners/restart.py
def task_restart(project_id: str, task_id: str) -> None:
    """Restart a task's container.

    Semantics: stop the container if running, then start it.  If the
    container doesn't exist (e.g. because it was deleted out-of-band),
    raise ``SystemExit`` with an actionable pointer to ``terok task run``
    — "restart" only means restart, not re-run.
    """
    project = load_project(project_id)
    meta, meta_path = load_task_meta(project.id, task_id)

    mode = meta.get("mode")
    if not mode:
        raise SystemExit(f"Task {task_id} has never been run (no mode set)")

    cname = container_name(project.id, mode, task_id)
    container_state = _rt.resolve_runtime(project).container(cname).state

    print(f"Restarting task {project_id}/{task_id} ({mode})...")
    ensure_vault()

    if container_state is None:
        # Container is gone — restart can't recreate it.  User must start
        # a fresh task with ``task run``.
        raise SystemExit(
            f"Container {cname} no longer exists.  Restart requires a running "
            f"or stopped container.  Create a new task with:\n"
            f"  terok task run {project_id}"
            + (' "<prompt>" --mode headless' if mode == "run" else "")
        )

    _validate_restart_preconditions(project, task_id, mode, meta, cname)
    if container_state == "running":
        _stop_running_container(project, task_id, mode, cname, meta_path)
    _start_and_report_restart(project, task_id, mode, cname, meta, meta_path)

task_run_cli(project_id, task_id, agents=None, preset=None, unrestricted=None)

Launch a CLI-mode task container and wait for its readiness marker.

Creates (or reattaches to) a detached Podman container for interactive CLI access. After the container reports ready the task metadata is marked running and the user is shown login instructions.

Source code in src/terok/lib/orchestration/task_runners/cli.py
def task_run_cli(
    project_id: str,
    task_id: str,
    agents: list[str] | None = None,
    preset: str | None = None,
    unrestricted: bool | None = None,
) -> None:
    """Launch a CLI-mode task container and wait for its readiness marker.

    Creates (or reattaches to) a detached Podman container for interactive
    CLI access.  After the container reports ready the task metadata is
    marked ``running`` and the user is shown login instructions.
    """
    project = load_project(project_id)
    meta, meta_path = load_task_meta(project.id, task_id, "cli")

    cname = container_name(project.id, "cli", task_id)
    container_state = _rt.resolve_runtime(project).container(cname).state

    # If container already exists, handle it
    if container_state is not None:
        ensure_vault()
        color_enabled = _supports_color()
        if container_state == "running":
            print(f"Container {_green(cname, color_enabled)} is already running.")
            _print_login_instructions(project.id, task_id, cname, color_enabled)
            return
        # Container exists but is stopped/exited - start it
        print(f"Starting existing container {_green(cname, color_enabled)}...")
        _podman_start(project, cname)
        _assert_running(project, cname)
        task_dir = project.tasks_root / str(task_id)
        run_hook(
            "post_start",
            project.hook_post_start,
            project_id=project.id,
            task_id=task_id,
            mode="cli",
            cname=cname,
            task_dir=task_dir,
            meta_path=meta_path,
        )
        _apply_shield_policy(project, cname, task_dir, is_restart=True)
        meta["mode"] = "cli"
        meta["ready_at"] = datetime.now(UTC).isoformat()
        write_task_meta(meta_path, meta)
        print("Container started.")
        _print_login_instructions(project.id, task_id, cname, color_enabled)
        return

    env, volumes = build_task_env_and_volumes(project, task_id)

    # Resolve layered agent config (global → project → preset → CLI overrides)
    agent_config_dir = _prepare_agent_config(project, project_id, task_id, agents, preset)
    volumes.append(VolumeSpec(agent_config_dir, CONTAINER_TEROK_CONFIG, sharing=Sharing.PRIVATE))

    # Resolve unrestricted mode: CLI flag → config → default (True)
    if unrestricted is None:
        _effective = resolve_agent_config(
            project_id,
            agent_config=project.agent_config,
            project_root=project.root,
            preset=preset,
        )
        _cfg_val = resolve_provider_value(
            "unrestricted", _effective, project.default_agent or "claude"
        )
        unrestricted = _cfg_val is None or _str_to_bool(_cfg_val)
    if unrestricted:
        _apply_unrestricted_env(env)

    # Run detached and keep the container alive so users can exec into it later
    # Note: We intentionally do NOT use --rm so containers persist after stopping.
    # This allows `task restart` to quickly resume stopped containers.
    task_dir = project.tasks_root / str(task_id)
    run_hook(
        "pre_start",
        project.hook_pre_start,
        project_id=project.id,
        task_id=task_id,
        mode="cli",
        cname=cname,
        task_dir=task_dir,
        meta_path=meta_path,
    )
    _run_container(
        cname=cname,
        image=project_cli_image(project.id),
        env=env,
        volumes=volumes,
        project=project,
        task_id=task_id,
        task_dir=task_dir,
        # Ensure init runs and then keep the container alive even without a TTY
        # init-ssh-and-repo.sh now prints a readiness marker we can watch for
        command=["bash", "-lc", "init-ssh-and-repo.sh && echo __CLI_READY__; tail -f /dev/null"],
    )
    _apply_shield_policy(project, cname, task_dir, is_restart=False)
    run_hook(
        "post_start",
        project.hook_post_start,
        project_id=project.id,
        task_id=task_id,
        mode="cli",
        cname=cname,
        task_dir=task_dir,
        meta_path=meta_path,
    )

    # Stream initial logs until ready marker is seen (or timeout), then detach
    _rt.resolve_runtime(project).container(cname).stream_initial_logs(
        ready_check=lambda line: "__CLI_READY__" in line or ">> init complete" in line,
        timeout_sec=60.0,
    )

    # Verify the container is still alive after log streaming
    _assert_running(project, cname)
    run_hook(
        "post_ready",
        project.hook_post_ready,
        project_id=project.id,
        task_id=task_id,
        mode="cli",
        cname=cname,
        task_dir=task_dir,
        meta_path=meta_path,
    )

    meta["mode"] = "cli"
    meta["ready_at"] = datetime.now(UTC).isoformat()
    meta["unrestricted"] = unrestricted
    if preset:
        meta["preset"] = preset
    write_task_meta(meta_path, meta)

    color_enabled = _supports_color()
    print(
        f"\nCLI container is running in the background.\n- Name:     {_green(cname, color_enabled)}"
    )
    _print_login_instructions(project.id, task_id, cname, color_enabled)
    print(f"- To stop:  {_red(f'podman stop {cname}', color_enabled)}\n")

task_run_headless(request)

Run an agent headlessly (autopilot mode) in a new task container.

Creates a new task, prepares the agent-config directory with the provider's wrapper function and filtered subagents, then launches a detached container that runs init-ssh-and-repo.sh followed by the agent command.

Parameters:

Name Type Description Default
request HeadlessRunRequest

All per-run options bundled in a HeadlessRunRequest.

required

Returns the task_id.

Source code in src/terok/lib/orchestration/task_runners/headless.py
def task_run_headless(request: HeadlessRunRequest) -> str:
    """Run an agent headlessly (autopilot mode) in a new task container.

    Creates a new task, prepares the agent-config directory with the provider's
    wrapper function and filtered subagents, then launches a detached container
    that runs init-ssh-and-repo.sh followed by the agent command.

    Args:
        request: All per-run options bundled in a [`HeadlessRunRequest`][terok.lib.orchestration.task_runners.HeadlessRunRequest].

    Returns the task_id.
    """
    from terok.lib.integrations.executor import (
        CLIOverrides,
        get_provider,
    )

    project = load_project(request.project_id)
    resolved = get_provider(request.provider, default_agent=project.default_agent)
    require_agent_installed(project, resolved.name)

    # Resolve layered agent config (global → project → preset → CLI overrides)
    cli_overrides = _build_cli_overrides(request.config_path)
    effective = resolve_agent_config(
        request.project_id,
        agent_config=project.agent_config,
        project_root=project.root,
        preset=request.preset,
        cli_overrides=cli_overrides or None,
    )

    # Resolve instructions: CLI --instructions overrides config stack
    instr_text = (
        request.instructions
        if request.instructions is not None
        else resolve_instructions(effective, resolved.name, project_root=project.root)
    )

    # Apply provider-aware config resolution with best-effort feature mapping.
    # CLI flags override config values; unsupported features produce warnings
    # or prompt augmentation.
    pcfg = resolved.apply_config(
        effective,
        CLIOverrides(
            model=request.model,
            max_turns=request.max_turns,
            timeout=request.timeout,
            instructions=instr_text,
        ),
    )

    # Print warnings about unsupported features
    for warning in pcfg.warnings:
        print(f"Warning: {warning}")

    # Augment prompt with best-effort feature analogues (e.g. max-turns guidance)
    effective_prompt = request.prompt
    if pcfg.prompt_extra:
        effective_prompt = f"{request.prompt}\n\n{pcfg.prompt_extra}"

    # Create a new task
    task_id = task_new(request.project_id, name=request.name)

    # Collect subagents from resolved config
    subagents = tuple(effective.get("subagents") or ())

    # Prepare agent-config dir with wrapper, agents.json, prompt.txt, instructions.md
    task_dir = project.tasks_root / str(task_id)
    agent_config_dir = prepare_agent_config_dir(
        AgentConfigSpec(
            tasks_root=project.tasks_root,
            task_id=task_id,
            subagents=subagents,
            selected_agents=tuple(request.agents) if request.agents is not None else None,
            prompt=effective_prompt,
            provider=resolved.name,
            instructions=instr_text,
            default_agent=project.default_agent,
            mounts_base=sandbox_live_mounts_dir(),
        )
    )

    # Resolve unrestricted mode: CLI flag → config → default (True)
    unrestricted = request.unrestricted
    if unrestricted is None:
        cfg_val = resolve_provider_value("unrestricted", effective, resolved.name)
        unrestricted = _str_to_bool(cfg_val) if cfg_val is not None else True

    # Build env and volumes
    env, volumes = build_task_env_and_volumes(project, task_id)

    # Set TEROK_UNRESTRICTED for the wrapper functions inside the container
    if unrestricted:
        _apply_unrestricted_env(env)

    # Mount agent-config dir to /home/dev/.terok
    volumes.append(VolumeSpec(agent_config_dir, CONTAINER_TEROK_CONFIG, sharing=Sharing.PRIVATE))

    # Build headless command via provider registry
    headless_cmd = resolved.build_headless_command(
        timeout=pcfg.timeout,
        model=pcfg.model,
        max_turns=pcfg.max_turns,
    )

    # Build podman command (DETACHED)
    cname = container_name(project.id, "run", task_id)

    meta, meta_path = load_task_meta(project.id, task_id)
    run_hook(
        "pre_start",
        project.hook_pre_start,
        project_id=project.id,
        task_id=task_id,
        mode="run",
        cname=cname,
        task_dir=task_dir,
        meta_path=meta_path,
    )
    _run_container(
        cname=cname,
        image=project_cli_image(project.id),
        env=env,
        volumes=volumes,
        project=project,
        task_id=task_id,
        task_dir=task_dir,
        command=["bash", "-lc", headless_cmd],
    )
    _apply_shield_policy(project, cname, task_dir, is_restart=False)
    run_hook(
        "post_start",
        project.hook_post_start,
        project_id=project.id,
        task_id=task_id,
        mode="run",
        cname=cname,
        task_dir=task_dir,
        meta_path=meta_path,
    )

    # Update task metadata
    meta["mode"] = "run"
    meta["ready_at"] = datetime.now(UTC).isoformat()
    meta["provider"] = resolved.name
    meta["unrestricted"] = unrestricted
    if request.preset:
        meta["preset"] = request.preset
    write_task_meta(meta_path, meta)

    _report_headless_result(
        project_id=project.id,
        task_id=task_id,
        cname=cname,
        task_dir=task_dir,
        follow=request.follow,
        label=resolved.label,
        detached_label=f"Headless {resolved.label} task started (detached).",
    )

    return task_id

task_run_toad(project_id, task_id, agents=None, preset=None, unrestricted=None)

Launch the Toad multi-agent TUI behind Caddy for token-gated browser access.

Same CLI image as interactive tasks, but the container entrypoint is terok-toad-entry: it starts Caddy on the published port, toad on an internal loopback port, and emits TEROK_READY once both are listening. Caddy enforces the per-task token (see _ensure_toad_token) on every request.

Source code in src/terok/lib/orchestration/task_runners/toad.py
def task_run_toad(
    project_id: str,
    task_id: str,
    agents: list[str] | None = None,
    preset: str | None = None,
    unrestricted: bool | None = None,
) -> None:
    """Launch the Toad multi-agent TUI behind Caddy for token-gated browser access.

    Same CLI image as interactive tasks, but the container entrypoint is
    ``terok-toad-entry``: it starts Caddy on the published port, toad on
    an internal loopback port, and emits ``TEROK_READY`` once both are
    listening.  Caddy enforces the per-task token (see
    `_ensure_toad_token`) on every request.
    """
    project = load_project(project_id)
    meta, meta_path = load_task_meta(project.id, task_id, "toad")

    cname = container_name(project.id, "toad", task_id)
    container_state = _rt.resolve_runtime(project).container(cname).state

    pub_host = get_public_host()

    if container_state is not None:
        _resume_toad_container(
            project=project,
            task_id=task_id,
            cname=cname,
            container_state=container_state,
            meta=meta,
            meta_path=meta_path,
            pub_host=pub_host,
        )
        return

    # New container — allocate a fresh port.
    port = assign_web_port(project.id, task_id)
    meta["web_port"] = port

    env, volumes = build_task_env_and_volumes(project, task_id)

    agent_config_dir = _prepare_agent_config(project, project_id, task_id, agents, preset)
    volumes.append(VolumeSpec(agent_config_dir, CONTAINER_TEROK_CONFIG, sharing=Sharing.PRIVATE))

    token = _ensure_toad_token(agent_config_dir)
    meta["web_token"] = token

    env["TOAD_PUBLIC_PORT"] = str(_TOAD_PUBLIC_PORT)
    env["TOAD_INTERNAL_PORT"] = str(_TOAD_INTERNAL_PORT)

    # Resolve unrestricted mode: CLI flag → config → default (True)
    if unrestricted is None:
        _effective = resolve_agent_config(
            project_id,
            agent_config=project.agent_config,
            project_root=project.root,
            preset=preset,
        )
        _cfg_val = resolve_provider_value(
            "unrestricted", _effective, project.default_agent or "claude"
        )
        unrestricted = _cfg_val is None or _str_to_bool(_cfg_val)
    if unrestricted:
        _apply_unrestricted_env(env)

    meta["mode"] = "toad"
    meta["unrestricted"] = unrestricted
    if preset:
        meta["preset"] = preset
    write_task_meta(meta_path, meta)

    # Preserve the address family when the public host is a loopback — binding
    # ::1 to 127.0.0.1 would make the URL we print (``http://[::1]:…``)
    # unreachable.  LAN exposure still goes to ``0.0.0.0``.
    if pub_host == "::1":
        bind_addr = "[::1]"
    elif pub_host in _LOOPBACK_HOSTS:
        bind_addr = _LOCALHOST
    else:
        bind_addr = "0.0.0.0"  # nosec B104

    task_dir = project.tasks_root / str(task_id)
    # ``terok-toad-entry`` (from the caddy/toad roster entries) owns the
    # in-container choreography: it starts Caddy on ``_TOAD_PUBLIC_PORT``,
    # launches toad on loopback ``_TOAD_INTERNAL_PORT``, waits for both to
    # bind, and emits the ``TEROK_READY`` readiness marker.
    toad_cmd = f"terok-toad-entry --public-url http://{url_host(pub_host)}:{port} /workspace"
    run_hook(
        "pre_start",
        project.hook_pre_start,
        project_id=project.id,
        task_id=task_id,
        mode="toad",
        cname=cname,
        web_port=port,
        task_dir=task_dir,
        meta_path=meta_path,
    )
    _run_container(
        cname=cname,
        image=project_cli_image(project.id),
        env=env,
        volumes=volumes,
        project=project,
        task_id=task_id,
        task_dir=task_dir,
        extra_args=["-p", f"{bind_addr}:{port}:{_TOAD_PUBLIC_PORT}"],
        command=["bash", "-lc", toad_cmd],
    )
    _apply_shield_policy(project, cname, task_dir, is_restart=False)
    run_hook(
        "post_start",
        project.hook_post_start,
        project_id=project.id,
        task_id=task_id,
        mode="toad",
        cname=cname,
        web_port=port,
        task_dir=task_dir,
        meta_path=meta_path,
    )

    def _toad_ready(line: str) -> bool:
        """Return True when the supervisor wrapper reports both listeners are up."""
        return "TEROK_READY" in line

    runtime = _rt.resolve_runtime(project)
    ready = runtime.container(cname).stream_initial_logs(
        ready_check=_toad_ready,
        timeout_sec=None,
    )

    if not ready or not runtime.container(cname).running:
        print(f"Toad failed to start. Check logs: podman logs {cname}")
        raise SystemExit(1)

    run_hook(
        "post_ready",
        project.hook_post_ready,
        project_id=project.id,
        task_id=task_id,
        mode="toad",
        cname=cname,
        web_port=port,
        task_dir=task_dir,
        meta_path=meta_path,
    )

    meta["ready_at"] = datetime.now(UTC).isoformat()
    write_task_meta(meta_path, meta)

    color_enabled = _supports_color()
    url = _toad_browser_url(pub_host, port, token)
    print(
        f"\n>> Toad is serving."
        f"\n- Name: {_green(cname, color_enabled)}"
        f"\n- URL:  {_hyperlink(_blue(url, color_enabled), url, enabled=color_enabled)}"
        f"\n- Logs: {_yellow(f'podman logs -f {cname}', color_enabled)}"
        f"\n- Stop: {_red(f'podman stop {cname}', color_enabled)}"
    )

task_status(project_id, task_id)

Show live task status with container state diagnostics.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_status(project_id: str, task_id: str) -> None:
    """Show live task status with container state diagnostics."""
    project = load_project(project_id)
    meta_dir = tasks_meta_dir(project.id)
    meta = read_task_meta(meta_dir, task_id)
    if meta is None:
        raise SystemExit(f"Unknown task {task_id}")

    mode = meta.get("mode")
    web_port = meta.get("web_port")
    exit_code = meta.get("exit_code")

    color_enabled = _supports_color()

    # Query live container state
    cname = None
    cs = None
    if mode:
        cname = container_name(project.id, mode, task_id)
        cs = _rt.resolve_runtime(project).container(cname).state

    # Build TaskMeta for effective_status / mode_emoji computation
    task = TaskMeta(
        task_id=task_id,
        mode=mode,
        workspace=meta.get("workspace", ""),
        web_port=web_port,
        web_token=meta.get("web_token"),
        backend=meta.get("backend"),
        exit_code=exit_code,
        deleting=bool(meta.get("deleting")),
        initialized=_is_initialized(meta),
        container_state=cs,
        name=meta["name"],
        provider=meta.get("provider"),
        unrestricted=meta.get("unrestricted"),
        created_at=meta.get("created_at"),
    )
    status = effective_status(task)
    info = STATUS_DISPLAY.get(status, STATUS_DISPLAY["created"])

    status_color = {"green": _green, "yellow": _yellow, "red": _red}.get(info.color, _yellow)
    m = mode_info(task.mode)
    m_emoji = render_emoji(m)

    print(f"Task {task_id}:")
    print(f"  Name:            {task.name}")
    print(f"  Status:          {render_emoji(info)} {status_color(info.label, color_enabled)}")
    print(f"  Mode:            {m_emoji} {m.label or 'not set'}")
    if cname:
        print(f"  Container:       {cname}")
    if cs:
        state_color = _green if cs == "running" else _yellow
        print(f"  Container state: {state_color(cs, color_enabled)}")
    elif mode:
        print(f"  Container state: {_red('not found', color_enabled)}")
    if task.unrestricted is not None:
        perm_label = "unrestricted" if task.unrestricted else "restricted"
        print(f"  Permissions:     {perm_label}")
    if exit_code is not None:
        print(f"  Exit code:       {exit_code}")
    if web_port:
        print(f"  Web port:        {web_port}")
    # Work status from agent
    tasks_root = project.tasks_root
    agent_cfg = tasks_root / task_id / "agent-config"
    ws = read_work_status(agent_cfg)
    if ws.status:
        print(f"  Work status:     {ws.status}")
        if ws.message:
            print(f"  Work message:    {ws.message}")

task_stop(project_id, task_id, *, timeout=None)

Gracefully stop a running task container.

Uses podman stop --time <N> to give the container timeout seconds before SIGKILL. When timeout is None the project's run.shutdown_timeout setting is used (default 10 s).

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_stop(project_id: str, task_id: str, *, timeout: int | None = None) -> None:
    """Gracefully stop a running task container.

    Uses ``podman stop --time <N>`` to give the container *timeout* seconds
    before SIGKILL.  When *timeout* is ``None`` the project's
    ``run.shutdown_timeout`` setting is used (default 10 s).
    """
    _task_stop(load_project(project_id), task_id, timeout=timeout)

validate_task_name(sanitized)

Return an error message if sanitized is not a valid task name, else None.

A name is invalid if it starts with a hyphen (looks like a CLI flag). Callers should first check for None from sanitize_task_name (which indicates the name was empty after sanitization).

Source code in src/terok/lib/orchestration/tasks/naming.py
def validate_task_name(sanitized: str) -> str | None:
    """Return an error message if *sanitized* is not a valid task name, else ``None``.

    A name is invalid if it starts with a hyphen (looks like a CLI flag).
    Callers should first check for ``None`` from [`sanitize_task_name`][terok.lib.orchestration.tasks.sanitize_task_name]
    (which indicates the name was empty after sanitization).
    """
    if sanitized.startswith("-"):
        return "name must not start with a hyphen"
    return None

wait_for_container_exit(container_name, project_id, task_id, timeout=7200)

Wait for container_name to exit and record its code in task metadata.

Returns (exit_code, error_message). On a successful wait error_message is None and the real exit code is persisted — including a legitimate exit code of 124, which is no longer conflated with the watcher's own timeout. On timeout exit_code is None and the error message describes it.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def wait_for_container_exit(
    container_name: str,
    project_id: str,
    task_id: str,
    timeout: int = 7200,
) -> tuple[int | None, str | None]:
    """Wait for *container_name* to exit and record its code in task metadata.

    Returns ``(exit_code, error_message)``.  On a successful wait
    *error_message* is ``None`` and the real exit code is persisted
    — including a legitimate exit code of 124, which is no longer
    conflated with the watcher's own timeout.  On timeout *exit_code*
    is ``None`` and the error message describes it.
    """
    from .meta import update_task_exit_code

    try:
        exit_code = AgentRunner().wait_for_exit(container_name, timeout=float(timeout))
    except TimeoutError:
        return None, "Watcher timed out"
    except Exception as e:
        return None, str(e)

    update_task_exit_code(project_id, task_id, exit_code)
    return exit_code, None

vault_db(*, prompt_on_tty=False)

Open the shared vault CredentialDB and close it on exit.

Routes through SandboxConfig.open_credential_db so the four-tier passphrase resolution chain (session-unlock file → keyring → config fallback → optional prompt) runs. Daemons and background workers leave prompt_on_tty=False so a locked vault fails fast with a clear NoPassphraseError instead of stalling on stdin; CLI front-ends pass True to unlock the interactive last-resort prompt.

Source code in src/terok/lib/domain/vault.py
@contextmanager
def vault_db(*, prompt_on_tty: bool = False) -> Iterator[CredentialDB]:
    """Open the shared vault ``CredentialDB`` and close it on exit.

    Routes through ``SandboxConfig.open_credential_db`` so the four-tier
    passphrase resolution chain (session-unlock file → keyring → config
    fallback → optional prompt) runs.  Daemons and background workers
    leave ``prompt_on_tty=False`` so a locked vault fails fast with a
    clear ``NoPassphraseError`` instead of stalling on stdin; CLI
    front-ends pass ``True`` to unlock the interactive last-resort
    prompt.
    """
    from ..core.config import make_sandbox_config

    db = make_sandbox_config().open_credential_db(prompt_on_tty=prompt_on_tty)
    try:
        yield db
    finally:
        db.close()

make_sandbox_config()

Construct a SandboxConfig for sandbox operations.

Bridges terok's config layer (env vars → config.yml → XDG defaults) to sandbox's plain dataclass. Sandbox uses its own state_dir default (~/.local/share/terok/sandbox/) — terok no longer overrides it.

Port fields pass None (unresolved — consumers that launch services call cfg.with_resolved_ports() to allocate via sandbox's shared port registry) or an explicit int from config.yml. This factory is side-effect-free, matching sandbox's R2 contract: config inspection and sickbay paths must not silently bind sockets.

This is the single source of truth for config bridging — every SandboxConfig field that terok controls must be set here.

terok's config-equality promise

For every field with a representation in the shared config.yml schema that sandbox/executor own, the cfg this function constructs matches what standalone sandbox would have read from the same file — modulo user-supplied --flag / env overrides. Runtime-only ambient context with no config-file representation may be added additively but must not shadow a schema field. Sandbox/executor accept any SandboxConfig without enforcement; third-party orchestrators choose their own contracts.

Diff terok executor show-config against standalone terok-executor show-config over the same config.yml to verify.

Source code in src/terok/lib/core/config.py
def make_sandbox_config() -> "SandboxConfig":
    """Construct a `SandboxConfig` for sandbox operations.

    Bridges terok's config layer (env vars → config.yml → XDG defaults) to
    sandbox's plain dataclass.  Sandbox uses its own ``state_dir`` default
    (``~/.local/share/terok/sandbox/``) — terok no longer overrides it.

    Port fields pass ``None`` (unresolved — consumers that launch services
    call ``cfg.with_resolved_ports()`` to allocate via sandbox's shared
    port registry) or an explicit ``int`` from config.yml.  This factory
    is side-effect-free, matching sandbox's R2 contract: config inspection
    and sickbay paths must not silently bind sockets.

    This is the **single source of truth** for config bridging — every
    [`SandboxConfig`][terok_sandbox.SandboxConfig] field that terok
    controls must be set here.

    ## terok's config-equality promise

    For every field with a representation in the shared ``config.yml``
    schema that sandbox/executor own, the cfg this function constructs
    matches what standalone sandbox would have read from the same file
    — modulo user-supplied ``--flag`` / env overrides.  Runtime-only
    ambient context with no config-file representation may be added
    additively but must not shadow a schema field.  Sandbox/executor
    accept any ``SandboxConfig`` without enforcement; third-party
    orchestrators choose their own contracts.

    Diff ``terok executor show-config`` against standalone
    ``terok-executor show-config`` over the same config.yml to verify.
    """
    from terok.lib.integrations.sandbox import SandboxConfig

    return SandboxConfig(
        vault_dir=vault_dir(),
        gate_port=get_gate_server_port(),
        token_broker_port=get_vault_token_broker_port(),
        ssh_signer_port=get_vault_ssh_signer_port(),
        shield_bypass=get_shield_bypass_firewall_no_protection(),
        shield_audit=get_shield_audit(),
        services_mode=get_services_mode(),
    )

set_experimental(value)

Enable or disable experimental features globally.

Source code in src/terok/lib/core/config.py
def set_experimental(value: bool) -> None:
    """Enable or disable experimental features globally."""
    global _experimental  # noqa: PLW0603
    _experimental = value

get_config()

Snapshot the global config into a single Config value.

Source code in src/terok/lib/api/__init__.py
def get_config() -> Config:
    """Snapshot the global config into a single [`Config`][terok.lib.api.Config] value."""
    return Config(
        config_root=_paths.config_root(),
        core_state_dir=_paths.core_state_dir(),
        runtime_dir=_paths.runtime_dir(),
        archive_dir=_config.archive_dir(),
        vault_dir=_config.vault_dir(),
        user_projects_dir=_config.user_projects_dir(),
        global_config_path=_config.global_config_path(),
        public_host=_config.get_public_host(),
        shield_bypass_firewall_no_protection=_config.get_shield_bypass_firewall_no_protection(),
        tui_default_tmux=_config.get_tui_default_tmux(),
        tui_external_editor=_config.get_tui_external_editor(),
        shield_security_hint=_config.SHIELD_SECURITY_HINT,
    )

get_container_state(cname)

Return the live podman state for a container, or None if not found.

Thin wrapper for one-shot state lookups. The probe goes through plain PodmanRuntime because container-state reads are runtime-agnostic (podman inspect returns the same shape under every OCI runtime), and the caller may not have project context in scope to resolve the per-project runtime.

Source code in src/terok/lib/api/__init__.py
def get_container_state(cname: str) -> str | None:
    """Return the live podman state for a container, or ``None`` if not found.

    Thin wrapper for one-shot state lookups.  The probe goes through
    plain ``PodmanRuntime`` because container-state reads are
    runtime-agnostic (``podman inspect`` returns the same shape under
    every OCI runtime), and the caller may not have project context
    in scope to resolve the per-project runtime.
    """
    return PodmanRuntime().container(cname).state