Skip to content

tasks

tasks

Task metadata, lifecycle, and query operations.

This package is the single import surface for task operations — from terok.lib.orchestration.tasks import … resolves every public name below. The implementation is split into focused submodules:

  • meta — the on-disk I/O boundary: the dossier/bookkeeping split-write protocol, path helpers, and the directories task state lives in.
  • identity — task ID generation, validation, and prefix resolution.
  • naming — task name sanitization and random-name generation.
  • query — the TaskMeta read model and the functions that hydrate it from disk + live container state.
  • lifecycle — create, rename, delete (with archive-on-delete), stop, login, and status.
  • archive — reading the immutable snapshots a deletion leaves behind.

Container runner functions (task_run_cli, task_run_headless, task_restart) live in the companion task_runners module. Status computation and domain value objects live in task_state; their presentation tables live in task_display. Log viewing lives in task_logs.

CONTAINER_MODES = ('cli', 'web', 'run', 'toad') module-attribute

All valid container mode suffixes used in container naming.

CONTAINER_TEROK_CONFIG = '/home/dev/.terok' module-attribute

In-container mount point for the per-task agent-config dir.

TASK_NAME_MAX_LEN = 60 module-attribute

Maximum length of a sanitized task name.

__all__ = ['ArchivedTask', 'CONTAINER_MODES', 'CONTAINER_TEROK_CONFIG', 'TASK_NAME_MAX_LEN', 'TaskDeleteResult', 'TaskMeta', 'agent_config_dir', 'capture_task_logs', 'container_name', 'dossier_path', 'generate_task_name', 'get_all_task_states', 'get_login_command', 'get_task_container_state', 'get_task_meta', 'get_tasks', 'get_workspace_git_diff', 'is_task_id', 'iter_task_ids', 'list_archived_tasks', 'load_task_meta', 'lookup_container_by_pt', 'mark_task_deleting', 'meta_path', 'normalize_task_id_input', 'read_task_meta', 'resolve_task_id', 'sanitize_task_name', 'task_archive_list', 'task_archive_logs', 'task_delete', 'task_exists', 'task_list', 'task_login', 'task_new', 'task_rename', 'task_status', 'task_stop', 'tasks_archive_dir', 'tasks_meta_dir', 'update_task_exit_code', 'validate_task_name', 'wait_for_container_exit', 'write_task_meta'] module-attribute

ArchivedTask(archive_dir, archived_at, task_id, name, mode, exit_code) dataclass

Metadata snapshot of an archived (deleted) task.

archive_dir instance-attribute

archived_at instance-attribute

task_id instance-attribute

name instance-attribute

mode instance-attribute

exit_code instance-attribute

TaskDeleteResult(task_id, warnings) dataclass

Outcome of a task deletion — always completes, collects warnings.

The task is considered deleted regardless (metadata and workspace removed), but individual cleanup steps may fail. warnings carries human-readable descriptions of any steps that did not complete cleanly.

task_id instance-attribute

warnings instance-attribute

TaskMeta(container_state=None, exit_code=None, deleting=False, initialized=False, starting=False, *, task_id, project_id='', mode, workspace, web_port, web_token=None, backend=None, preset=None, name='', provider=None, unrestricted=None, work_status=None, work_message=None, shield_state=None, created_at=None) dataclass

Bases: TaskState

Lightweight metadata snapshot for a single task.

Inherits lifecycle fields (container_state, exit_code, deleting, initialized) from TaskState.

task_id instance-attribute

project_id = '' class-attribute instance-attribute

Project the task belongs to.

Carried in the meta JSON so consumers that don't already know the project (e.g. terok-shield's host-side dossier resolver, which only has the meta-path pointer) can render full project/task identity without re-deriving it from the on-disk path. Empty string for pre-this-release meta files; the lifecycle backfills it on the next mutation.

mode instance-attribute

workspace instance-attribute

web_port instance-attribute

web_token = None class-attribute instance-attribute

backend = None class-attribute instance-attribute

preset = None class-attribute instance-attribute

name = '' class-attribute instance-attribute

provider = None class-attribute instance-attribute

unrestricted = None class-attribute instance-attribute

work_status = None class-attribute instance-attribute

work_message = None class-attribute instance-attribute

shield_state = None class-attribute instance-attribute

created_at = None class-attribute instance-attribute

status property

Compute effective status from live container state + metadata.

load(project_id, task_id) classmethod

Load a TaskMeta from disk, with live container state hydrated.

Raises SystemExit if the task metadata file is not found. Equivalent to the free-fn get_task_meta, kept as the canonical entry point so callers reach the class through its own factory.

Source code in src/terok/lib/orchestration/tasks/query.py
@classmethod
def load(cls, project_id: str, task_id: str) -> TaskMeta:
    """Load a TaskMeta from disk, with live container state hydrated.

    Raises ``SystemExit`` if the task metadata file is not found.
    Equivalent to the free-fn
    [`get_task_meta`][terok.lib.orchestration.tasks.query.get_task_meta],
    kept as the canonical entry point so callers reach the class
    through its own factory.
    """
    return get_task_meta(project_id, task_id)

container_name(project_id, mode, task_id)

Return the canonical container name for a task.

Source code in src/terok/lib/core/task_state.py
def container_name(project_id: str, mode: str, task_id: str) -> str:
    """Return the canonical container name for a task."""
    return f"{project_id}-{mode}-{task_id}"

list_archived_tasks(project_id)

Return archived tasks for project_id, sorted newest-first.

Source code in src/terok/lib/orchestration/tasks/archive.py
def list_archived_tasks(project_id: str) -> list[ArchivedTask]:
    """Return archived tasks for *project_id*, sorted newest-first."""
    archive_root = tasks_archive_dir(project_id)
    if not archive_root.is_dir():
        return []
    results: list[ArchivedTask] = []
    for entry in sorted(archive_root.iterdir(), reverse=True):
        if not entry.is_dir():
            continue
        meta = _load_archived_task_meta(entry)
        if meta is None:
            continue
        # Parse archive timestamp from directory name: <timestamp>_<task_id>[_<name>]
        parts = entry.name.split("_", 2)
        archived_at = parts[0] if parts else entry.name
        results.append(
            ArchivedTask(
                archive_dir=entry,
                archived_at=archived_at,
                task_id=str(meta.get("task_id", "")),
                name=meta.get("name", ""),
                mode=meta.get("mode"),
                exit_code=meta.get("exit_code"),
            )
        )
    return results

task_archive_list(project_id)

Print archived tasks for project_id.

Source code in src/terok/lib/orchestration/tasks/archive.py
def task_archive_list(project_id: str) -> None:
    """Print archived tasks for *project_id*."""
    archived = list_archived_tasks(project_id)
    if not archived:
        print("No archived tasks found")
        return
    for a in archived:
        extra = []
        if a.mode:
            extra.append(f"mode={a.mode}")
        if a.exit_code is not None:
            extra.append(f"exit={a.exit_code}")
        extra_s = f" [{'; '.join(extra)}]" if extra else ""
        print(f"- {a.archived_at} #{a.task_id}: {a.name}{extra_s}")

task_archive_logs(project_id, archive_id)

Return the log file path for an archived task identified by archive_id.

archive_id is matched against archive directory names (prefix match). Returns the log file path if found, or None.

Source code in src/terok/lib/orchestration/tasks/archive.py
def task_archive_logs(project_id: str, archive_id: str) -> Path | None:
    """Return the log file path for an archived task identified by *archive_id*.

    *archive_id* is matched against archive directory names (prefix match).
    Returns the log file path if found, or ``None``.
    """
    archive_root = tasks_archive_dir(project_id)
    if not archive_root.is_dir():
        return None
    for entry in sorted(archive_root.iterdir(), reverse=True):
        if entry.is_dir() and entry.name.startswith(archive_id):
            log_file = entry / "logs" / "container.log"
            if log_file.is_file():
                return log_file
    return None

is_task_id(text)

Return True if text is a well-formed task ID.

Source code in src/terok/lib/orchestration/tasks/identity.py
def is_task_id(text: str) -> bool:
    """Return True if *text* is a well-formed task ID."""
    return bool(_TASK_ID_CROCKFORD_4_5_RE.fullmatch(text))

normalize_task_id_input(raw)

Collapse user-input variants to the canonical lowercase form.

Strips hyphens, lowercases, and applies the Crockford I/L → 1, O → 0 substitutions. The result is still subject to _TASK_ID_PREFIX_RE downstream — this only widens what we accept, never what we emit.

Call-site discipline: only call this at user-interactive CLI boundaries — argparse dispatch handlers and argcomplete completers. Internal code paths (lib/*, tui/*, TUI pickers, clearance, anything reading task IDs from disk, OCI annotations, or runtime state) always work with canonical lowercase IDs and must never re-normalise. Leaking this tolerance inward quietly defeats the "we encode only in canonical form" invariant.

Source code in src/terok/lib/orchestration/tasks/identity.py
def normalize_task_id_input(raw: str) -> str:
    """Collapse user-input variants to the canonical lowercase form.

    Strips hyphens, lowercases, and applies the Crockford
    ``I/L → 1``, ``O → 0`` substitutions.  The result is still subject
    to `_TASK_ID_PREFIX_RE` downstream — this only widens what
    we accept, never what we emit.

    **Call-site discipline:** only call this at user-interactive CLI
    boundaries — argparse dispatch handlers and argcomplete completers.
    Internal code paths (``lib/*``, ``tui/*``, TUI pickers, clearance,
    anything reading task IDs from disk, OCI annotations, or runtime
    state) always work with canonical lowercase IDs and must *never*
    re-normalise.  Leaking this tolerance inward quietly defeats the
    "we encode only in canonical form" invariant.
    """
    return raw.replace("-", "").lower().translate(_TASK_ID_INPUT_TRANSLATE)

resolve_task_id(project_id, prefix)

Resolve a (possibly partial) task ID to its full form.

The prefix is first run through normalize_task_id_input, so callers may pass uppercase, hyphenated, or ambiguous-letter variants (K3-V8H, k3v8I) — they collapse to the canonical lowercase form before validation and lookup.

Raises SystemExit with an actionable message on zero or multiple matches.

Source code in src/terok/lib/orchestration/tasks/identity.py
def resolve_task_id(project_id: str, prefix: str) -> str:
    """Resolve a (possibly partial) task ID to its full form.

    The *prefix* is first run through [`normalize_task_id_input`][terok.lib.orchestration.tasks.normalize_task_id_input],
    so callers may pass uppercase, hyphenated, or ambiguous-letter
    variants (``K3-V8H``, ``k3v8I``) — they collapse to the canonical
    lowercase form before validation and lookup.

    Raises ``SystemExit`` with an actionable message on zero or multiple matches.
    """
    # Head position is letter-only, so an ``I/L/O`` there can't be a
    # Crockford body substitution — surface that as a specific error
    # instead of normalising it into a digit and letting downstream
    # mistake it for a legacy-hex prefix.
    stripped = prefix.replace("-", "").lower()
    if stripped[:1] in _TASK_ID_AMBIGUOUS_LETTERS:
        raise SystemExit(
            f"Invalid task ID prefix {prefix!r}; "
            f"task IDs never start with I, L, or O — "
            f"expected one of {_TASK_ID_HEAD_CHARS!r}"
        )
    prefix = stripped.translate(_TASK_ID_INPUT_TRANSLATE)
    _validate_task_id_prefix(prefix)
    meta_dir = tasks_meta_dir(project_id)
    if not meta_dir.is_dir():
        raise SystemExit(f"No tasks found for project {project_id}")
    if task_exists(project_id, prefix):
        return prefix
    matches = [tid for tid in iter_task_ids(meta_dir) if is_task_id(tid) and tid.startswith(prefix)]
    if len(matches) == 1:
        return matches[0]
    if not matches:
        raise SystemExit(f"No task matching '{prefix}' in project {project_id}")
    raise SystemExit(f"Ambiguous task ID '{prefix}' — matches: {', '.join(sorted(matches))}")

capture_task_logs(project, task_id, mode)

Capture container logs to the task's logs/ directory on the host.

Writes stdout/stderr from podman logs to <tasks_root>/<task_id>/logs/container.log. Returns the log file path on success, or None if the container doesn't exist or podman fails.

project may be a ProjectConfig or a project-ID string (the string form loads the config internally for backward compat).

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def capture_task_logs(project: ProjectConfig | str, task_id: str, mode: str) -> Path | None:
    """Capture container logs to the task's ``logs/`` directory on the host.

    Writes stdout/stderr from ``podman logs`` to
    ``<tasks_root>/<task_id>/logs/container.log``.  Returns the log file
    path on success, or ``None`` if the container doesn't exist or podman
    fails.

    *project* may be a [`ProjectConfig`][terok.cli.commands.sickbay.ProjectConfig] or a project-ID string
    (the string form loads the config internally for backward compat).
    """
    if isinstance(project, str):
        project = load_project(project)
    task_dir = project.tasks_root / str(task_id)
    logs_dir = task_dir / "logs"
    ensure_dir(logs_dir)
    log_file = logs_dir / "container.log"

    cname = container_name(project.id, mode, task_id)
    if AgentRunner().capture_logs(cname, log_file, timestamps=True, timeout=60.0):
        return log_file
    return None

get_login_command(project_id, task_id)

Return the podman exec command to log into a task container.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def get_login_command(project_id: str, task_id: str) -> list[str]:
    """Return the podman exec command to log into a task container."""
    return _get_login_command(load_project(project_id), task_id)

task_delete(project_id, task_id)

Delete a task's workspace, metadata, and any associated containers.

Before removal, captures container logs and archives the task metadata and logs to archive/<project_id>/tasks/. Containers are stopped best-effort via podman using the <project.id>-<mode>-<task_id> naming scheme. Returns a TaskDeleteResult so the caller can present any warnings from cleanup steps that failed.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_delete(project_id: str, task_id: str) -> TaskDeleteResult:
    """Delete a task's workspace, metadata, and any associated containers.

    Before removal, captures container logs and archives the task metadata
    and logs to ``archive/<project_id>/tasks/``.  Containers are stopped
    best-effort via podman using the ``<project.id>-<mode>-<task_id>``
    naming scheme.  Returns a [`TaskDeleteResult`][terok.lib.orchestration.tasks.TaskDeleteResult] so the caller can
    present any warnings from cleanup steps that failed.
    """
    return _task_delete(load_project(project_id), task_id)

task_login(project_id, task_id)

Open an interactive shell in a running task container.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_login(project_id: str, task_id: str) -> None:
    """Open an interactive shell in a running task container."""
    _task_login(load_project(project_id), task_id)

task_new(project_id, *, name=None)

Create a new task with a fresh workspace for a project.

Parameters:

Name Type Description Default
project_id str

The project to create the task under.

required
name str | None

Optional human-readable name. Allowed characters are lowercase letters, digits, hyphens, and underscores. If None, a random slug-style name is generated via generate_task_name.

None
Workspace Initialization Protocol:

Each task gets its own workspace directory that persists across container runs. When a container starts, the init script (init-ssh-and-repo.sh) needs to know whether this is:

  1. A NEW task that should be reset to the latest remote HEAD
  2. A RESTARTED task where local changes should be preserved

We use a marker file (.new-task-marker) to signal intent:

  • task_new() creates the marker in the workspace directory
  • init-ssh-and-repo.sh checks for the marker:
  • If marker exists: reset to origin/HEAD, then delete marker
  • If no marker: fetch only, preserve local state
  • Subsequent container runs on the same task won't see the marker, so local work is preserved

This handles edge cases like: - Stale workspace from incompletely deleted previous task with same ID - Ensuring new tasks always start with latest code

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_new(project_id: str, *, name: str | None = None) -> str:
    """Create a new task with a fresh workspace for a project.

    Args:
        project_id: The project to create the task under.
        name: Optional human-readable name.  Allowed characters are
            lowercase letters, digits, hyphens, and underscores.
            If ``None``, a random slug-style name is generated via
            [`generate_task_name`][terok.lib.orchestration.tasks.generate_task_name].

    Workspace Initialization Protocol:
    ----------------------------------
    Each task gets its own workspace directory that persists across container
    runs. When a container starts, the init script (init-ssh-and-repo.sh) needs
    to know whether this is:

    1. A NEW task that should be reset to the latest remote HEAD
    2. A RESTARTED task where local changes should be preserved

    We use a marker file (.new-task-marker) to signal intent:

    - task_new() creates the marker in the workspace directory
    - init-ssh-and-repo.sh checks for the marker:
      - If marker exists: reset to origin/HEAD, then delete marker
      - If no marker: fetch only, preserve local state
    - Subsequent container runs on the same task won't see the marker,
      so local work is preserved

    This handles edge cases like:
    - Stale workspace from incompletely deleted previous task with same ID
    - Ensuring new tasks always start with latest code
    """
    return _task_new(load_project(project_id), name=name)

task_rename(project_id, task_id, new_name)

Rename a task by updating its metadata file.

Sanitizes new_name and writes the result to the task's metadata file. Raises SystemExit if the task is unknown or the sanitized name is invalid.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_rename(project_id: str, task_id: str, new_name: str) -> None:
    """Rename a task by updating its metadata file.

    Sanitizes *new_name* and writes the result to the task's metadata file.
    Raises ``SystemExit`` if the task is unknown or the sanitized name is invalid.
    """
    _task_rename(load_project(project_id), task_id, new_name)

task_status(project_id, task_id)

Show live task status with container state diagnostics.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_status(project_id: str, task_id: str) -> None:
    """Show live task status with container state diagnostics."""
    project = load_project(project_id)
    meta_dir = tasks_meta_dir(project.id)
    meta = read_task_meta(meta_dir, task_id)
    if meta is None:
        raise SystemExit(f"Unknown task {task_id}")

    mode = meta.get("mode")
    web_port = meta.get("web_port")
    exit_code = meta.get("exit_code")

    color_enabled = _supports_color()

    # Query live container state
    cname = None
    cs = None
    if mode:
        cname = container_name(project.id, mode, task_id)
        cs = _rt.resolve_runtime(project).container(cname).state

    # Build TaskMeta for effective_status / mode_emoji computation
    task = TaskMeta(
        task_id=task_id,
        mode=mode,
        workspace=meta.get("workspace", ""),
        web_port=web_port,
        web_token=meta.get("web_token"),
        backend=meta.get("backend"),
        exit_code=exit_code,
        deleting=bool(meta.get("deleting")),
        initialized=_is_initialized(meta),
        container_state=cs,
        name=meta["name"],
        provider=meta.get("provider"),
        unrestricted=meta.get("unrestricted"),
        created_at=meta.get("created_at"),
    )
    status = effective_status(task)
    info = STATUS_DISPLAY.get(status, STATUS_DISPLAY["created"])

    status_color = {"green": _green, "yellow": _yellow, "red": _red}.get(info.color, _yellow)
    m = mode_info(task.mode)
    m_emoji = render_emoji(m)

    print(f"Task {task_id}:")
    print(f"  Name:            {task.name}")
    print(f"  Status:          {render_emoji(info)} {status_color(info.label, color_enabled)}")
    print(f"  Mode:            {m_emoji} {m.label or 'not set'}")
    if cname:
        print(f"  Container:       {cname}")
    if cs:
        state_color = _green if cs == "running" else _yellow
        print(f"  Container state: {state_color(cs, color_enabled)}")
    elif mode:
        print(f"  Container state: {_red('not found', color_enabled)}")
    if task.unrestricted is not None:
        perm_label = "unrestricted" if task.unrestricted else "restricted"
        print(f"  Permissions:     {perm_label}")
    if exit_code is not None:
        print(f"  Exit code:       {exit_code}")
    if web_port:
        print(f"  Web port:        {web_port}")
    # Work status from agent
    tasks_root = project.tasks_root
    agent_cfg = tasks_root / task_id / "agent-config"
    ws = read_work_status(agent_cfg)
    if ws.status:
        print(f"  Work status:     {ws.status}")
        if ws.message:
            print(f"  Work message:    {ws.message}")

task_stop(project_id, task_id, *, timeout=None)

Gracefully stop a running task container.

Uses podman stop --time <N> to give the container timeout seconds before SIGKILL. When timeout is None the project's run.shutdown_timeout setting is used (default 10 s).

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def task_stop(project_id: str, task_id: str, *, timeout: int | None = None) -> None:
    """Gracefully stop a running task container.

    Uses ``podman stop --time <N>`` to give the container *timeout* seconds
    before SIGKILL.  When *timeout* is ``None`` the project's
    ``run.shutdown_timeout`` setting is used (default 10 s).
    """
    _task_stop(load_project(project_id), task_id, timeout=timeout)

wait_for_container_exit(container_name, project_id, task_id, timeout=7200)

Wait for container_name to exit and record its code in task metadata.

Returns (exit_code, error_message). On a successful wait error_message is None and the real exit code is persisted — including a legitimate exit code of 124, which is no longer conflated with the watcher's own timeout. On timeout exit_code is None and the error message describes it.

Source code in src/terok/lib/orchestration/tasks/lifecycle.py
def wait_for_container_exit(
    container_name: str,
    project_id: str,
    task_id: str,
    timeout: int = 7200,
) -> tuple[int | None, str | None]:
    """Wait for *container_name* to exit and record its code in task metadata.

    Returns ``(exit_code, error_message)``.  On a successful wait
    *error_message* is ``None`` and the real exit code is persisted
    — including a legitimate exit code of 124, which is no longer
    conflated with the watcher's own timeout.  On timeout *exit_code*
    is ``None`` and the error message describes it.
    """
    from .meta import update_task_exit_code

    try:
        exit_code = AgentRunner().wait_for_exit(container_name, timeout=float(timeout))
    except TimeoutError:
        return None, "Watcher timed out"
    except Exception as e:
        return None, str(e)

    update_task_exit_code(project_id, task_id, exit_code)
    return exit_code, None

agent_config_dir(project_id, task_id)

Host path of the agent-config dir bind-mounted at CONTAINER_TEROK_CONFIG.

Source code in src/terok/lib/orchestration/tasks/meta.py
def agent_config_dir(project_id: str, task_id: str) -> Path:
    """Host path of the agent-config dir bind-mounted at `CONTAINER_TEROK_CONFIG`."""
    _reject_unsafe_id(project_id, "project_id")
    _reject_unsafe_id(task_id, "task_id")
    return load_project(project_id).tasks_root / str(task_id) / "agent-config"

dossier_path(meta_dir, task_id)

Path to the wire-dossier JSON file — what shield consumers read.

The OCI dossier.meta_path annotation points operators at this file. Companion bookkeeping lives at the _meta.yml sibling.

Source code in src/terok/lib/orchestration/tasks/meta.py
def dossier_path(meta_dir: Path, task_id: str) -> Path:
    """Path to the wire-dossier JSON file — what shield consumers read.

    The OCI ``dossier.meta_path`` annotation points operators at *this*
    file.  Companion bookkeeping lives at the ``_meta.yml`` sibling.
    """
    _reject_unsafe_id(task_id, "task_id")
    return meta_dir / f"{task_id}{_DOSSIER_SUFFIX}"

iter_task_ids(meta_dir)

Yield every task ID with at least one meta file in meta_dir.

Source code in src/terok/lib/orchestration/tasks/meta.py
def iter_task_ids(meta_dir: Path) -> Iterator[str]:
    """Yield every task ID with at least one meta file in *meta_dir*."""
    if not meta_dir.is_dir():
        return
    seen: set[str] = set()
    for path in meta_dir.iterdir():
        if not path.is_file() or path.name.endswith(".tmp"):
            continue
        tid = _task_id_from_filename(path.name)
        if tid and tid not in seen:
            seen.add(tid)
            yield tid

load_task_meta(project_id, task_id, expected_mode=None)

Load task metadata and optionally validate mode.

Returns (meta, dossier_path): the merged in-memory dict (in its internal storage shape — project_id / task_id / …) plus the canonical dossier-file handle for write-back via write_task_meta(dossier_path, meta). Raises SystemExit if the task is unknown or its mode conflicts with expected_mode.

Source code in src/terok/lib/orchestration/tasks/meta.py
def load_task_meta(
    project_id: str, task_id: str, expected_mode: str | None = None
) -> tuple[dict, Path]:
    """Load task metadata and optionally validate mode.

    Returns ``(meta, dossier_path)``: the merged in-memory dict (in
    its internal storage shape — ``project_id`` / ``task_id`` / …)
    plus the canonical dossier-file handle for write-back via
    ``write_task_meta(dossier_path, meta)``.  Raises SystemExit if
    the task is unknown or its mode conflicts with *expected_mode*.
    """
    meta_dir = tasks_meta_dir(project_id)
    meta = read_task_meta(meta_dir, task_id)
    if meta is None:
        raise SystemExit(f"Unknown task {task_id}")
    if expected_mode is not None:
        _check_mode(meta, expected_mode)
    return meta, dossier_path(meta_dir, task_id)

mark_task_deleting(project_id, task_id)

Persist deleting: true to the task's metadata file.

Source code in src/terok/lib/orchestration/tasks/meta.py
def mark_task_deleting(project_id: str, task_id: str) -> None:
    """Persist ``deleting: true`` to the task's metadata file."""
    try:
        meta_dir = tasks_meta_dir(project_id)
        meta = read_task_meta(meta_dir, task_id)
        if meta is None:
            return
        meta["deleting"] = True
        write_task_meta(dossier_path(meta_dir, task_id), meta)
    except Exception as e:
        _log_debug(f"mark_task_deleting: failed project_id={project_id} task_id={task_id}: {e}")

meta_path(meta_dir, task_id)

Path to the orchestrator-bookkeeping YAML — terok-internal state.

Holds everything except the wire-dossier triple. Single consumer (terok itself), so ruamel round-tripping is fine.

Source code in src/terok/lib/orchestration/tasks/meta.py
def meta_path(meta_dir: Path, task_id: str) -> Path:
    """Path to the orchestrator-bookkeeping YAML — terok-internal state.

    Holds everything except the wire-dossier triple.  Single consumer
    (terok itself), so ruamel round-tripping is fine.
    """
    _reject_unsafe_id(task_id, "task_id")
    return meta_dir / f"{task_id}{_META_SUFFIX}"

read_task_meta(meta_dir, task_id)

Compose the orchestrator's logical task-meta dict from the on-disk pair.

Reads the wire-dossier JSON and the internal YAML, translates the JSON's wire keys back to internal storage names, and returns the union. Either file may be absent. Returns None only when neither file is on disk.

Source code in src/terok/lib/orchestration/tasks/meta.py
def read_task_meta(meta_dir: Path, task_id: str) -> dict | None:
    """Compose the orchestrator's logical task-meta dict from the on-disk pair.

    Reads the wire-dossier JSON and the internal YAML, translates the
    JSON's wire keys back to internal storage names, and returns the
    union.  Either file may be absent.  Returns ``None`` only when
    neither file is on disk.
    """
    json_path = dossier_path(meta_dir, task_id)
    yml_path = meta_path(meta_dir, task_id)

    if not (json_path.is_file() or yml_path.is_file()):
        return None

    return _merge_dossier_into(_read_yml_meta(yml_path), _read_json_meta(json_path))

task_exists(project_id, task_id)

Return True if any task-meta file exists for (project_id, task_id).

Source code in src/terok/lib/orchestration/tasks/meta.py
def task_exists(project_id: str, task_id: str) -> bool:
    """Return ``True`` if any task-meta file exists for ``(project_id, task_id)``."""
    meta_dir = tasks_meta_dir(project_id)
    return dossier_path(meta_dir, task_id).is_file() or meta_path(meta_dir, task_id).is_file()

tasks_archive_dir(project_id)

Return the directory containing archived task data for project_id.

Lives under the namespace archive tree (archive/<pid>/tasks/) so operators can find all archived data in one location. On project deletion the entire archive/<pid>/ subtree is bundled into the project snapshot and removed.

Source code in src/terok/lib/orchestration/tasks/meta.py
def tasks_archive_dir(project_id: str) -> Path:
    """Return the directory containing archived task data for *project_id*.

    Lives under the namespace archive tree (``archive/<pid>/tasks/``) so
    operators can find all archived data in one location.  On project
    deletion the entire ``archive/<pid>/`` subtree is bundled into the
    project snapshot and removed.
    """
    _reject_unsafe_id(project_id, "project_id")
    return archive_dir() / project_id / "tasks"

tasks_meta_dir(project_id)

Return the directory containing task metadata files for project_id.

Source code in src/terok/lib/orchestration/tasks/meta.py
def tasks_meta_dir(project_id: str) -> Path:
    """Return the directory containing task metadata files for *project_id*."""
    _reject_unsafe_id(project_id, "project_id")
    return core_state_dir() / "projects" / project_id / "tasks"

update_task_exit_code(project_id, task_id, exit_code)

Update task metadata with exit code and final status.

Parameters:

Name Type Description Default
project_id str

The project ID

required
task_id str

The task ID

required
exit_code int | None

The exit code from the task, or None if unknown/failed

required
Source code in src/terok/lib/orchestration/tasks/meta.py
def update_task_exit_code(project_id: str, task_id: str, exit_code: int | None) -> None:
    """Update task metadata with exit code and final status.

    Args:
        project_id: The project ID
        task_id: The task ID
        exit_code: The exit code from the task, or None if unknown/failed
    """
    meta_dir = tasks_meta_dir(project_id)
    meta = read_task_meta(meta_dir, task_id)
    if meta is None:
        return
    meta["exit_code"] = exit_code
    write_task_meta(dossier_path(meta_dir, task_id), meta)

write_task_meta(dossier_handle, meta)

Atomic split-write: _dossier.json + _meta.yml in one atomic step each.

dossier_handle is a dossier-file handle — what dossier_path returns and what load_task_meta hands back. The companion bookkeeping path is derived by swapping the suffix. Atomic-rename writes (.tmp + os.replace) on each file mean a partial write under EINTR / power loss can leave one stale and the other fresh, but never a half-written file — readers always get a parseable shape.

Source code in src/terok/lib/orchestration/tasks/meta.py
def write_task_meta(dossier_handle: Path, meta: dict) -> None:
    """Atomic split-write: ``_dossier.json`` + ``_meta.yml`` in one atomic step each.

    *dossier_handle* is a dossier-file handle — what
    [`dossier_path`][terok.lib.orchestration.tasks.dossier_path] returns
    and what [`load_task_meta`][terok.lib.orchestration.tasks.load_task_meta]
    hands back.  The companion bookkeeping path is derived by swapping
    the suffix.  Atomic-rename writes (``.tmp`` + ``os.replace``) on each
    file mean a partial write under EINTR / power loss can leave one
    stale and the other fresh, but never a half-written file — readers
    always get a parseable shape.
    """
    meta_dir, task_id = _dossier_handle_to_dir_and_id(dossier_handle)
    meta_dir.mkdir(parents=True, exist_ok=True)

    dossier = {
        wire_key: str(meta[internal_key])
        for internal_key, wire_key in _DOSSIER_TO_WIRE.items()
        if meta.get(internal_key)
    }
    state = {k: v for k, v in meta.items() if k not in _DOSSIER_INTERNAL_KEYS}

    _atomic_write(
        dossier_path(meta_dir, task_id),
        json.dumps(dossier, indent=2, ensure_ascii=False, default=str) + "\n",
    )
    _atomic_write(meta_path(meta_dir, task_id), _yaml_dump(state))

generate_task_name(project_id=None)

Generate a random human-readable task name (e.g. talented-toucan).

When project_id is given, name categories are resolved from config: project tasks.name_categories → global tasks.name_categories → deterministic 3-category selection based on project ID hash.

Source code in src/terok/lib/orchestration/tasks/naming.py
def generate_task_name(project_id: str | None = None) -> str:
    """Generate a random human-readable task name (e.g. ``talented-toucan``).

    When *project_id* is given, name categories are resolved from config:
    project ``tasks.name_categories`` → global ``tasks.name_categories``
    → deterministic 3-category selection based on project ID hash.
    """
    import namer

    categories = _resolve_name_categories(project_id) if project_id else None
    return namer.generate(separator="-", category=categories)

sanitize_task_name(raw)

Sanitize a raw task name into a slug-style identifier.

Strips whitespace, lowercases, replaces spaces with hyphens, removes characters outside [a-z0-9_-], collapses consecutive hyphens, strips trailing hyphens, and truncates to TASK_NAME_MAX_LEN. Returns None if the result is empty.

Leading hyphens are preserved so callers can detect and reject them (a name starting with - looks like a CLI flag).

Source code in src/terok/lib/orchestration/tasks/naming.py
def sanitize_task_name(raw: str | None) -> str | None:
    """Sanitize a raw task name into a slug-style identifier.

    Strips whitespace, lowercases, replaces spaces with hyphens,
    removes characters outside ``[a-z0-9_-]``, collapses consecutive
    hyphens, strips trailing hyphens, and truncates to
    ``TASK_NAME_MAX_LEN``.  Returns ``None`` if the result is empty.

    Leading hyphens are preserved so callers can detect and reject them
    (a name starting with ``-`` looks like a CLI flag).
    """
    if raw is None:
        return None
    name = raw.strip().lower()
    name = name.replace(" ", "-")
    name = re.sub(r"[^a-z0-9_-]", "", name)
    name = re.sub(r"-{2,}", "-", name)
    name = name.rstrip("-")
    name = name[:TASK_NAME_MAX_LEN]
    return name or None

validate_task_name(sanitized)

Return an error message if sanitized is not a valid task name, else None.

A name is invalid if it starts with a hyphen (looks like a CLI flag). Callers should first check for None from sanitize_task_name (which indicates the name was empty after sanitization).

Source code in src/terok/lib/orchestration/tasks/naming.py
def validate_task_name(sanitized: str) -> str | None:
    """Return an error message if *sanitized* is not a valid task name, else ``None``.

    A name is invalid if it starts with a hyphen (looks like a CLI flag).
    Callers should first check for ``None`` from [`sanitize_task_name`][terok.lib.orchestration.tasks.sanitize_task_name]
    (which indicates the name was empty after sanitization).
    """
    if sanitized.startswith("-"):
        return "name must not start with a hyphen"
    return None

get_all_task_states(project_id, tasks)

Map each task to its live container state via a single batch query.

Parameters:

Name Type Description Default
project_id str

The project whose containers to query.

required
tasks list[TaskMeta]

List of TaskMeta instances (must have task_id and mode).

required

Returns:

Type Description
dict[str, str | None]

{task_id: container_state_or_None} dict.

Source code in src/terok/lib/orchestration/tasks/query.py
def get_all_task_states(
    project_id: str,
    tasks: list[TaskMeta],
) -> dict[str, str | None]:
    """Map each task to its live container state via a single batch query.

    Args:
        project_id: The project whose containers to query.
        tasks: List of ``TaskMeta`` instances (must have ``task_id`` and ``mode``).

    Returns:
        ``{task_id: container_state_or_None}`` dict.
    """
    # Use PodmanRuntime directly: this is a `podman ps` enumeration that
    # doesn't differ across OCI runtimes.
    from terok.lib.integrations.sandbox import PodmanRuntime

    container_states = PodmanRuntime().container_states(project_id)
    result: dict[str, str | None] = {}
    for t in tasks:
        if t.mode:
            cname = container_name(project_id, t.mode, str(t.task_id))
            result[str(t.task_id)] = container_states.get(cname)
        else:
            result[str(t.task_id)] = None
    return result

get_task_container_state(project_id, task_id, mode)

Get actual container state for a task (TUI helper).

Container state queries are runtime-agnostic — podman inspect returns the same shape regardless of OCI runtime — so this short cuts to PodmanRuntime rather than walking through the per-project resolver (which would force a load_project for a one-shot state probe).

Source code in src/terok/lib/orchestration/tasks/query.py
def get_task_container_state(project_id: str, task_id: str, mode: str | None) -> str | None:
    """Get actual container state for a task (TUI helper).

    Container state queries are runtime-agnostic — ``podman inspect``
    returns the same shape regardless of OCI runtime — so this short
    cuts to ``PodmanRuntime`` rather than walking through the
    per-project resolver (which would force a ``load_project`` for a
    one-shot state probe).
    """
    if not mode:
        return None
    from terok.lib.integrations.sandbox import PodmanRuntime

    cname = container_name(project_id, mode, task_id)
    return PodmanRuntime().container(cname).state

get_task_meta(project_id, task_id)

Return metadata for a single task with live container state.

Hydrates container_state from the running container so that TaskMeta.status reflects current reality rather than stale YAML. Raises SystemExit if the task metadata file is not found.

Source code in src/terok/lib/orchestration/tasks/query.py
def get_task_meta(project_id: str, task_id: str) -> TaskMeta:
    """Return metadata for a single task with live container state.

    Hydrates ``container_state`` from the running container so that
    ``TaskMeta.status`` reflects current reality rather than stale YAML.
    Raises ``SystemExit`` if the task metadata file is not found.
    """
    meta_dir = tasks_meta_dir(project_id)
    raw = read_task_meta(meta_dir, task_id)
    if raw is None:
        raise SystemExit(f"Unknown task {task_id}")
    mode = raw.get("mode")
    # Fall back to the caller-known task_id rather than a blank identity
    # when the on-disk record predates the field.
    tid = str(raw.get("task_id") or task_id)
    # Hydrate live container state only for tasks that have actually been started
    # (state probe is runtime-agnostic — see ``get_task_container_state``).
    live_state: str | None = None
    if mode is not None:
        try:
            from terok.lib.integrations.sandbox import PodmanRuntime

            cname = container_name(project_id, mode, task_id)
            live_state = PodmanRuntime().container(cname).state
        except Exception:
            pass
    # Hydrate work status from agent-config (same logic as _get_tasks)
    ws_status: str | None = None
    ws_message: str | None = None
    if tid:
        project = load_project(project_id)
        try:
            agent_cfg = project.tasks_root / tid / "agent-config"
            ws = read_work_status(agent_cfg)
            ws_status = ws.status
            ws_message = ws.message
        except Exception:  # noqa: BLE001 — best-effort; agent-config may not exist yet
            pass
    return TaskMeta(
        task_id=tid,
        # ``or`` (not the dict default) so a migrated record carrying an
        # empty string still falls back to the path-derived project_id.
        project_id=raw.get("project_id") or project_id,
        mode=mode,
        workspace=raw.get("workspace", ""),
        web_port=raw.get("web_port"),
        web_token=raw.get("web_token"),
        backend=raw.get("backend"),
        container_state=live_state,
        exit_code=raw.get("exit_code"),
        deleting=bool(raw.get("deleting")),
        initialized=_is_initialized(raw),
        preset=raw.get("preset"),
        name=raw["name"],
        provider=raw.get("provider"),
        unrestricted=raw.get("unrestricted"),
        work_status=ws_status,
        work_message=ws_message,
        created_at=raw.get("created_at"),
    )

get_tasks(project_id, reverse=False)

Return all task metadata for project_id, sorted by task ID.

Source code in src/terok/lib/orchestration/tasks/query.py
def get_tasks(project_id: str, reverse: bool = False) -> list[TaskMeta]:
    """Return all task metadata for *project_id*, sorted by task ID."""
    return _get_tasks(project_id, reverse=reverse)

get_workspace_git_diff(project_id, task_id, against='HEAD')

Get git diff from a task's workspace via container exec.

Runs git diff inside the task container rather than on the host, so that even poisoned git hooks only execute within the container sandbox.

Parameters:

Name Type Description Default
project_id str

The project ID

required
task_id str

The task ID

required
against str

What to diff against ("HEAD" or "PREV")

'HEAD'

Returns:

Type Description
str | None

The git diff output as a string, or None if failed

Source code in src/terok/lib/orchestration/tasks/query.py
def get_workspace_git_diff(project_id: str, task_id: str, against: str = "HEAD") -> str | None:
    """Get git diff from a task's workspace via container exec.

    Runs ``git diff`` **inside** the task container rather than on the host,
    so that even poisoned git hooks only execute within the container sandbox.

    Args:
        project_id: The project ID
        task_id: The task ID
        against: What to diff against (``"HEAD"`` or ``"PREV"``)

    Returns:
        The git diff output as a string, or ``None`` if failed
    """
    try:
        load_project(project_id)  # validate project exists
        meta_dir = tasks_meta_dir(project_id)
        meta = read_task_meta(meta_dir, task_id)
        if meta is None:
            return None
        mode = meta.get("mode")
        if not mode:
            return None

        if against == "PREV":
            return container_git_diff(project_id, task_id, mode, "HEAD~1", "HEAD")
        return container_git_diff(project_id, task_id, mode, "HEAD")

    except (Exception, SystemExit):
        return None

lookup_container_by_pt(project_id, task_id)

Resolve a project/task pair to the current container name, or None.

Powers the slash-form identity acceptance on per-container terok executor * verbs (stop, future exec / logs / state / login). Reads the recorded mode from the task's meta file; returns None when the task is unknown or has never been launched (no mode recorded), or when either input would escape the project task store (path-traversal guard). The caller decides whether to treat None as "pass the input through verbatim" (raw container id) or "fail with an actionable error" (unknown project/task).

Source code in src/terok/lib/orchestration/tasks/query.py
def lookup_container_by_pt(project_id: str, task_id: str) -> str | None:
    """Resolve a `project/task` pair to the current container name, or `None`.

    Powers the slash-form identity acceptance on per-container
    ``terok executor *`` verbs (``stop``, future ``exec`` / ``logs`` /
    ``state`` / ``login``).  Reads the recorded mode from the task's
    meta file; returns ``None`` when the task is unknown or has never
    been launched (no ``mode`` recorded), or when either input would
    escape the project task store (path-traversal guard).  The caller
    decides whether to treat ``None`` as "pass the input through
    verbatim" (raw container id) or "fail with an actionable error"
    (unknown project/task).
    """
    if not is_valid_project_id(project_id) or not _is_safe_id_segment(task_id):
        return None
    meta_dir = tasks_meta_dir(project_id)
    raw = read_task_meta(meta_dir, task_id)
    if raw is None:
        return None
    mode = raw.get("mode")
    if not mode:
        return None
    return container_name(project_id, mode, task_id)

task_list(project_id, *, status=None, mode=None, agent=None)

List tasks for a project, optionally filtered by status, mode, or agent preset.

Status is computed live from podman container state + task metadata.

Source code in src/terok/lib/orchestration/tasks/query.py
def task_list(
    project_id: str,
    *,
    status: str | None = None,
    mode: str | None = None,
    agent: str | None = None,
) -> None:
    """List tasks for a project, optionally filtered by status, mode, or agent preset.

    Status is computed live from podman container state + task metadata.
    """
    tasks = get_tasks(project_id)

    # Pre-filter by mode/agent before the podman query to reduce work
    if mode:
        tasks = [t for t in tasks if t.mode == mode]
    if agent:
        tasks = [t for t in tasks if t.preset == agent]

    if not tasks:
        print("No tasks found")
        return

    # Batch-query podman for all container states in one call
    live_states = get_all_task_states(project_id, tasks)
    for t in tasks:
        t.container_state = live_states.get(t.task_id)

    # Filter by effective status (computed live)
    if status:
        tasks = [t for t in tasks if effective_status(t) == status]

    if not tasks:
        print("No tasks found")
        return

    for t in tasks:
        t_status = effective_status(t)
        extra = []
        if t.mode:
            extra.append(f"mode={t.mode}")
        if t.web_port:
            extra.append(f"port={t.web_port}")
        if t.work_status:
            extra.append(f"work={t.work_status}")
        extra_s = f" [{'; '.join(extra)}]" if extra else ""
        print(f"- {t.task_id}: {t.name} {t_status}{extra_s}")