Skip to content

tasks

tasks

Task metadata, lifecycle, and query operations.

Provides module-level functions for CRUD over YAML-backed task metadata.

Container runner functions (task_run_cli, task_run_headless, task_restart) live in the companion task_runners module. Display types and status computation live in task_display. Log viewing lives in task_logs.

TASK_NAME_MAX_LEN = 60 module-attribute

Maximum length of a sanitized task name.

TaskMeta(container_state=None, exit_code=None, deleting=False, initialized=False, *, task_id, mode, workspace, web_port, backend=None, preset=None, name='', provider=None, unrestricted=None, work_status=None, work_message=None, shield_state=None) dataclass

Bases: TaskState

Lightweight metadata snapshot for a single task.

Inherits lifecycle fields (container_state, exit_code, deleting, initialized) from :class:~terok.lib.core.task_display.TaskState.

status property

Compute effective status from live container state + metadata.

ArchivedTask(archive_dir, archived_at, task_id, name, mode, exit_code) dataclass

Metadata snapshot of an archived (deleted) task.

container_name(project_id, mode, task_id)

Return the canonical container name for a task.

Source code in src/terok/lib/orchestration/tasks.py
def container_name(project_id: str, mode: str, task_id: str) -> str:
    """Return the canonical container name for a task."""
    return f"{project_id}-{mode}-{task_id}"

get_task_container_state(project_id, task_id, mode)

Get actual container state for a task (TUI helper).

Source code in src/terok/lib/orchestration/tasks.py
def get_task_container_state(project_id: str, task_id: str, mode: str | None) -> str | None:
    """Get actual container state for a task (TUI helper)."""
    if not mode:
        return None
    cname = container_name(project_id, mode, task_id)
    return get_container_state(cname)

sanitize_task_name(raw)

Sanitize a raw task name into a slug-style identifier.

Strips whitespace, lowercases, replaces spaces with hyphens, removes characters outside [a-z0-9_-], collapses consecutive hyphens, strips trailing hyphens, and truncates to TASK_NAME_MAX_LEN. Returns None if the result is empty.

Leading hyphens are preserved so callers can detect and reject them (a name starting with - looks like a CLI flag).

Source code in src/terok/lib/orchestration/tasks.py
def sanitize_task_name(raw: str | None) -> str | None:
    """Sanitize a raw task name into a slug-style identifier.

    Strips whitespace, lowercases, replaces spaces with hyphens,
    removes characters outside ``[a-z0-9_-]``, collapses consecutive
    hyphens, strips trailing hyphens, and truncates to
    ``TASK_NAME_MAX_LEN``.  Returns ``None`` if the result is empty.

    Leading hyphens are preserved so callers can detect and reject them
    (a name starting with ``-`` looks like a CLI flag).
    """
    if raw is None:
        return None
    name = raw.strip().lower()
    name = name.replace(" ", "-")
    name = re.sub(r"[^a-z0-9_-]", "", name)
    name = re.sub(r"-{2,}", "-", name)
    name = name.rstrip("-")
    name = name[:TASK_NAME_MAX_LEN]
    return name or None

validate_task_name(sanitized)

Return an error message if sanitized is not a valid task name, else None.

A name is invalid if it starts with a hyphen (looks like a CLI flag). Callers should first check for None from :func:sanitize_task_name (which indicates the name was empty after sanitization).

Source code in src/terok/lib/orchestration/tasks.py
def validate_task_name(sanitized: str) -> str | None:
    """Return an error message if *sanitized* is not a valid task name, else ``None``.

    A name is invalid if it starts with a hyphen (looks like a CLI flag).
    Callers should first check for ``None`` from :func:`sanitize_task_name`
    (which indicates the name was empty after sanitization).
    """
    if sanitized.startswith("-"):
        return "name must not start with a hyphen"
    return None

generate_task_name(project_id=None)

Generate a random human-readable task name (e.g. talented-toucan).

When project_id is given, name categories are resolved from config: project tasks.name_categories → global tasks.name_categories → deterministic 3-category selection based on project ID hash.

Source code in src/terok/lib/orchestration/tasks.py
def generate_task_name(project_id: str | None = None) -> str:
    """Generate a random human-readable task name (e.g. ``talented-toucan``).

    When *project_id* is given, name categories are resolved from config:
    project ``tasks.name_categories`` → global ``tasks.name_categories``
    → deterministic 3-category selection based on project ID hash.
    """
    import namer

    categories = _resolve_name_categories(project_id) if project_id else None
    return namer.generate(separator="-", category=categories)

get_task_meta(project_id, task_id)

Return metadata for a single task with live container state.

Hydrates container_state from the running container so that TaskMeta.status reflects current reality rather than stale YAML. Raises SystemExit if the task metadata file is not found.

Source code in src/terok/lib/orchestration/tasks.py
def get_task_meta(project_id: str, task_id: str) -> TaskMeta:
    """Return metadata for a single task with live container state.

    Hydrates ``container_state`` from the running container so that
    ``TaskMeta.status`` reflects current reality rather than stale YAML.
    Raises ``SystemExit`` if the task metadata file is not found.
    """
    meta_dir = tasks_meta_dir(project_id)
    meta_path = meta_dir / f"{task_id}.yml"
    if not meta_path.is_file():
        raise SystemExit(f"Unknown task {task_id}")
    raw = _yaml_load(meta_path.read_text()) or {}
    mode = raw.get("mode")
    tid = str(raw.get("task_id", ""))
    # Hydrate live container state only for tasks that have actually been started
    live_state: str | None = None
    if mode is not None:
        try:
            cname = container_name(project_id, mode, task_id)
            live_state = get_container_state(cname)
        except Exception:
            pass
    # Hydrate work status from agent-config (same logic as _get_tasks)
    ws_status: str | None = None
    ws_message: str | None = None
    if tid:
        project = load_project(project_id)
        try:
            agent_cfg = project.tasks_root / tid / "agent-config"
            ws = read_work_status(agent_cfg)
            ws_status = ws.status
            ws_message = ws.message
        except Exception:  # noqa: BLE001 — best-effort; agent-config may not exist yet
            pass
    return TaskMeta(
        task_id=tid,
        mode=mode,
        workspace=raw.get("workspace", ""),
        web_port=raw.get("web_port"),
        backend=raw.get("backend"),
        container_state=live_state,
        exit_code=raw.get("exit_code"),
        deleting=bool(raw.get("deleting")),
        initialized=_is_initialized(raw),
        preset=raw.get("preset"),
        name=raw["name"],
        provider=raw.get("provider"),
        unrestricted=raw.get("unrestricted"),
        work_status=ws_status,
        work_message=ws_message,
    )

get_workspace_git_diff(project_id, task_id, against='HEAD')

Get git diff from a task's workspace via container exec.

Runs git diff inside the task container rather than on the host, so that even poisoned git hooks only execute within the container sandbox.

Parameters:

Name Type Description Default
project_id str

The project ID

required
task_id str

The task ID

required
against str

What to diff against ("HEAD" or "PREV")

'HEAD'

Returns:

Type Description
str | None

The git diff output as a string, or None if failed

Source code in src/terok/lib/orchestration/tasks.py
def get_workspace_git_diff(project_id: str, task_id: str, against: str = "HEAD") -> str | None:
    """Get git diff from a task's workspace via container exec.

    Runs ``git diff`` **inside** the task container rather than on the host,
    so that even poisoned git hooks only execute within the container sandbox.

    Args:
        project_id: The project ID
        task_id: The task ID
        against: What to diff against (``"HEAD"`` or ``"PREV"``)

    Returns:
        The git diff output as a string, or ``None`` if failed
    """
    try:
        load_project(project_id)  # validate project exists
        meta_dir = tasks_meta_dir(project_id)
        meta_path = meta_dir / f"{task_id}.yml"
        if not meta_path.is_file():
            return None
        meta = _yaml_load(meta_path.read_text()) or {}
        mode = meta.get("mode")
        if not mode:
            return None

        if against == "PREV":
            return container_git_diff(project_id, task_id, mode, "HEAD~1", "HEAD")
        return container_git_diff(project_id, task_id, mode, "HEAD")

    except (Exception, SystemExit):
        return None

tasks_meta_dir(project_id)

Return the directory containing task metadata YAML files for project_id.

Source code in src/terok/lib/orchestration/tasks.py
def tasks_meta_dir(project_id: str) -> Path:
    """Return the directory containing task metadata YAML files for *project_id*."""
    return state_dir() / "projects" / project_id / "tasks"

tasks_archive_dir(project_id)

Return the directory containing archived task data for project_id.

Source code in src/terok/lib/orchestration/tasks.py
def tasks_archive_dir(project_id: str) -> Path:
    """Return the directory containing archived task data for *project_id*."""
    return state_dir() / "projects" / project_id / "archive"

update_task_exit_code(project_id, task_id, exit_code)

Update task metadata with exit code and final status.

Parameters:

Name Type Description Default
project_id str

The project ID

required
task_id str

The task ID

required
exit_code int | None

The exit code from the task, or None if unknown/failed

required
Source code in src/terok/lib/orchestration/tasks.py
def update_task_exit_code(project_id: str, task_id: str, exit_code: int | None) -> None:
    """Update task metadata with exit code and final status.

    Args:
        project_id: The project ID
        task_id: The task ID
        exit_code: The exit code from the task, or None if unknown/failed
    """
    meta_dir = tasks_meta_dir(project_id)
    meta_path = meta_dir / f"{task_id}.yml"
    if not meta_path.is_file():
        return
    meta = _yaml_load(meta_path.read_text()) or {}
    meta["exit_code"] = exit_code
    meta_path.write_text(_yaml_dump(meta))

task_new(project_id, *, name=None)

Create a new task with a fresh workspace for a project.

Parameters:

Name Type Description Default
project_id str

The project to create the task under.

required
name str | None

Optional human-readable name. Allowed characters are lowercase letters, digits, hyphens, and underscores. If None, a random slug-style name is generated via :func:generate_task_name.

None
Workspace Initialization Protocol:

Each task gets its own workspace directory that persists across container runs. When a container starts, the init script (init-ssh-and-repo.sh) needs to know whether this is:

  1. A NEW task that should be reset to the latest remote HEAD
  2. A RESTARTED task where local changes should be preserved

We use a marker file (.new-task-marker) to signal intent:

  • task_new() creates the marker in the workspace directory
  • init-ssh-and-repo.sh checks for the marker:
  • If marker exists: reset to origin/HEAD, then delete marker
  • If no marker: fetch only, preserve local state
  • Subsequent container runs on the same task won't see the marker, so local work is preserved

This handles edge cases like: - Stale workspace from incompletely deleted previous task with same ID - Ensuring new tasks always start with latest code

Source code in src/terok/lib/orchestration/tasks.py
def task_new(project_id: str, *, name: str | None = None) -> str:
    """Create a new task with a fresh workspace for a project.

    Args:
        project_id: The project to create the task under.
        name: Optional human-readable name.  Allowed characters are
            lowercase letters, digits, hyphens, and underscores.
            If ``None``, a random slug-style name is generated via
            :func:`generate_task_name`.

    Workspace Initialization Protocol:
    ----------------------------------
    Each task gets its own workspace directory that persists across container
    runs. When a container starts, the init script (init-ssh-and-repo.sh) needs
    to know whether this is:

    1. A NEW task that should be reset to the latest remote HEAD
    2. A RESTARTED task where local changes should be preserved

    We use a marker file (.new-task-marker) to signal intent:

    - task_new() creates the marker in the workspace directory
    - init-ssh-and-repo.sh checks for the marker:
      - If marker exists: reset to origin/HEAD, then delete marker
      - If no marker: fetch only, preserve local state
    - Subsequent container runs on the same task won't see the marker,
      so local work is preserved

    This handles edge cases like:
    - Stale workspace from incompletely deleted previous task with same ID
    - Ensuring new tasks always start with latest code
    """
    return _task_new(load_project(project_id), name=name)

task_rename(project_id, task_id, new_name)

Rename a task by updating its metadata YAML.

Sanitizes new_name and writes the result to the task's metadata file. Raises SystemExit if the task is unknown or the sanitized name is invalid.

Source code in src/terok/lib/orchestration/tasks.py
def task_rename(project_id: str, task_id: str, new_name: str) -> None:
    """Rename a task by updating its metadata YAML.

    Sanitizes *new_name* and writes the result to the task's metadata file.
    Raises ``SystemExit`` if the task is unknown or the sanitized name is invalid.
    """
    _task_rename(load_project(project_id), task_id, new_name)

get_tasks(project_id, reverse=False)

Return all task metadata for project_id, sorted by task ID.

Source code in src/terok/lib/orchestration/tasks.py
def get_tasks(project_id: str, reverse: bool = False) -> list[TaskMeta]:
    """Return all task metadata for *project_id*, sorted by task ID."""
    return _get_tasks(project_id, reverse=reverse)

get_all_task_states(project_id, tasks)

Map each task to its live container state via a single batch query.

Parameters:

Name Type Description Default
project_id str

The project whose containers to query.

required
tasks list[TaskMeta]

List of TaskMeta instances (must have task_id and mode).

required

Returns:

Type Description
dict[str, str | None]

{task_id: container_state_or_None} dict.

Source code in src/terok/lib/orchestration/tasks.py
def get_all_task_states(
    project_id: str,
    tasks: list[TaskMeta],
) -> dict[str, str | None]:
    """Map each task to its live container state via a single batch query.

    Args:
        project_id: The project whose containers to query.
        tasks: List of ``TaskMeta`` instances (must have ``task_id`` and ``mode``).

    Returns:
        ``{task_id: container_state_or_None}`` dict.
    """
    container_states = get_project_container_states(project_id)
    result: dict[str, str | None] = {}
    for t in tasks:
        if t.mode:
            cname = container_name(project_id, t.mode, str(t.task_id))
            result[str(t.task_id)] = container_states.get(cname)
        else:
            result[str(t.task_id)] = None
    return result

task_list(project_id, *, status=None, mode=None, agent=None)

List tasks for a project, optionally filtered by status, mode, or agent preset.

Status is computed live from podman container state + task metadata.

Source code in src/terok/lib/orchestration/tasks.py
def task_list(
    project_id: str,
    *,
    status: str | None = None,
    mode: str | None = None,
    agent: str | None = None,
) -> None:
    """List tasks for a project, optionally filtered by status, mode, or agent preset.

    Status is computed live from podman container state + task metadata.
    """
    tasks = get_tasks(project_id)

    # Pre-filter by mode/agent before the podman query to reduce work
    if mode:
        tasks = [t for t in tasks if t.mode == mode]
    if agent:
        tasks = [t for t in tasks if t.preset == agent]

    if not tasks:
        print("No tasks found")
        return

    # Batch-query podman for all container states in one call
    live_states = get_all_task_states(project_id, tasks)
    for t in tasks:
        t.container_state = live_states.get(t.task_id)

    # Filter by effective status (computed live)
    if status:
        tasks = [t for t in tasks if effective_status(t) == status]

    if not tasks:
        print("No tasks found")
        return

    for t in tasks:
        t_status = effective_status(t)
        extra = []
        if t.mode:
            extra.append(f"mode={t.mode}")
        if t.web_port:
            extra.append(f"port={t.web_port}")
        if t.work_status:
            extra.append(f"work={t.work_status}")
        extra_s = f" [{'; '.join(extra)}]" if extra else ""
        print(f"- {t.task_id:>3}: {t.name} {t_status}{extra_s}")

load_task_meta(project_id, task_id, expected_mode=None)

Load task metadata and optionally validate mode.

Returns (meta, meta_path). Raises SystemExit if task is unknown or mode conflicts with expected_mode.

Source code in src/terok/lib/orchestration/tasks.py
def load_task_meta(
    project_id: str, task_id: str, expected_mode: str | None = None
) -> tuple[dict, Path]:
    """Load task metadata and optionally validate mode.

    Returns (meta, meta_path). Raises SystemExit if task is unknown or mode
    conflicts with *expected_mode*.
    """
    meta_dir = tasks_meta_dir(project_id)
    meta_path = meta_dir / f"{task_id}.yml"
    if not meta_path.is_file():
        raise SystemExit(f"Unknown task {task_id}")
    meta = _yaml_load(meta_path.read_text()) or {}
    if expected_mode is not None:
        _check_mode(meta, expected_mode)
    return meta, meta_path

mark_task_deleting(project_id, task_id)

Persist deleting: true to the task's YAML metadata file.

Source code in src/terok/lib/orchestration/tasks.py
def mark_task_deleting(project_id: str, task_id: str) -> None:
    """Persist ``deleting: true`` to the task's YAML metadata file."""
    try:
        meta_dir = tasks_meta_dir(project_id)
        meta_path = meta_dir / f"{task_id}.yml"
        if not meta_path.is_file():
            return
        meta = _yaml_load(meta_path.read_text()) or {}
        meta["deleting"] = True
        meta_path.write_text(_yaml_dump(meta))
    except Exception as e:
        _log_debug(f"mark_task_deleting: failed project_id={project_id} task_id={task_id}: {e}")

capture_task_logs(project, task_id, mode)

Capture container logs to the task's logs/ directory on the host.

Writes stdout/stderr from podman logs to <tasks_root>/<task_id>/logs/container.log. Returns the log file path on success, or None if the container doesn't exist or podman fails.

project may be a :class:ProjectConfig or a project-ID string (the string form loads the config internally for backward compat).

Source code in src/terok/lib/orchestration/tasks.py
def capture_task_logs(project: ProjectConfig | str, task_id: str, mode: str) -> Path | None:
    """Capture container logs to the task's ``logs/`` directory on the host.

    Writes stdout/stderr from ``podman logs`` to
    ``<tasks_root>/<task_id>/logs/container.log``.  Returns the log file
    path on success, or ``None`` if the container doesn't exist or podman
    fails.

    *project* may be a :class:`ProjectConfig` or a project-ID string
    (the string form loads the config internally for backward compat).
    """
    if isinstance(project, str):
        project = load_project(project)
    task_dir = project.tasks_root / str(task_id)
    logs_dir = task_dir / "logs"
    ensure_dir(logs_dir)
    log_file = logs_dir / "container.log"

    cname = container_name(project.id, mode, task_id)
    try:
        with log_file.open("wb") as f:
            result = subprocess.run(
                ["podman", "logs", "--timestamps", cname],
                stdout=f,
                stderr=subprocess.PIPE,
                timeout=60,
            )
    except (FileNotFoundError, subprocess.TimeoutExpired):
        log_file.unlink(missing_ok=True)
        return None

    if result.returncode != 0:
        log_file.unlink(missing_ok=True)
        return None

    return log_file

task_delete(project_id, task_id)

Delete a task's workspace, metadata, and any associated containers.

Before removing the task, captures container logs and archives the task metadata and logs to <state_root>/projects/<project_id>/archive/. The archive directory is named by archival timestamp + task ID + name for unique identification (task numbers and names can be reused).

This mirrors the behavior used by the TUI when deleting a task, but is exposed here so both CLI and TUI share the same logic. Containers are stopped best-effort via podman using the naming scheme "--".

Source code in src/terok/lib/orchestration/tasks.py
def task_delete(project_id: str, task_id: str) -> None:
    """Delete a task's workspace, metadata, and any associated containers.

    Before removing the task, captures container logs and archives the task
    metadata and logs to ``<state_root>/projects/<project_id>/archive/``.
    The archive directory is named by archival timestamp + task ID + name
    for unique identification (task numbers and names can be reused).

    This mirrors the behavior used by the TUI when deleting a task, but is
    exposed here so both CLI and TUI share the same logic. Containers are
    stopped best-effort via podman using the naming scheme
    "<project.id>-<mode>-<task_id>".
    """
    _task_delete(load_project(project_id), task_id)

get_login_command(project_id, task_id)

Return the podman exec command to log into a task container.

Source code in src/terok/lib/orchestration/tasks.py
def get_login_command(project_id: str, task_id: str) -> list[str]:
    """Return the podman exec command to log into a task container."""
    return _get_login_command(load_project(project_id), task_id)

task_login(project_id, task_id)

Open an interactive shell in a running task container.

Source code in src/terok/lib/orchestration/tasks.py
def task_login(project_id: str, task_id: str) -> None:
    """Open an interactive shell in a running task container."""
    _task_login(load_project(project_id), task_id)

task_stop(project_id, task_id, *, timeout=None)

Gracefully stop a running task container.

Uses podman stop --time <N> to give the container timeout seconds before SIGKILL. When timeout is None the project's run.shutdown_timeout setting is used (default 10 s).

Source code in src/terok/lib/orchestration/tasks.py
def task_stop(project_id: str, task_id: str, *, timeout: int | None = None) -> None:
    """Gracefully stop a running task container.

    Uses ``podman stop --time <N>`` to give the container *timeout* seconds
    before SIGKILL.  When *timeout* is ``None`` the project's
    ``run.shutdown_timeout`` setting is used (default 10 s).
    """
    _task_stop(load_project(project_id), task_id, timeout=timeout)

task_status(project_id, task_id)

Show live task status with container state diagnostics.

Source code in src/terok/lib/orchestration/tasks.py
def task_status(project_id: str, task_id: str) -> None:
    """Show live task status with container state diagnostics."""
    project = load_project(project_id)
    meta_dir = tasks_meta_dir(project.id)
    meta_path = meta_dir / f"{task_id}.yml"
    if not meta_path.is_file():
        raise SystemExit(f"Unknown task {task_id}")
    meta = _yaml_load(meta_path.read_text()) or {}

    mode = meta.get("mode")
    web_port = meta.get("web_port")
    exit_code = meta.get("exit_code")

    color_enabled = _supports_color()

    # Query live container state
    cname = None
    cs = None
    if mode:
        cname = container_name(project.id, mode, task_id)
        cs = get_container_state(cname)

    # Build TaskMeta for effective_status / mode_emoji computation
    task = TaskMeta(
        task_id=task_id,
        mode=mode,
        workspace=meta.get("workspace", ""),
        web_port=web_port,
        backend=meta.get("backend"),
        exit_code=exit_code,
        deleting=bool(meta.get("deleting")),
        initialized=_is_initialized(meta),
        container_state=cs,
        name=meta["name"],
        provider=meta.get("provider"),
        unrestricted=meta.get("unrestricted"),
    )
    status = effective_status(task)
    info = STATUS_DISPLAY.get(status, STATUS_DISPLAY["created"])

    status_color = {"green": _green, "yellow": _yellow, "red": _red}.get(info.color, _yellow)
    m = mode_info(task.mode)
    m_emoji = render_emoji(m)

    print(f"Task {task_id}:")
    print(f"  Name:            {task.name}")
    print(f"  Status:          {render_emoji(info)} {status_color(info.label, color_enabled)}")
    print(f"  Mode:            {m_emoji} {m.label or 'not set'}")
    if cname:
        print(f"  Container:       {cname}")
    if cs:
        state_color = _green if cs == "running" else _yellow
        print(f"  Container state: {state_color(cs, color_enabled)}")
    elif mode:
        print(f"  Container state: {_red('not found', color_enabled)}")
    if task.unrestricted is not None:
        perm_label = "unrestricted" if task.unrestricted else "restricted"
        print(f"  Permissions:     {perm_label}")
    if exit_code is not None:
        print(f"  Exit code:       {exit_code}")
    if web_port:
        print(f"  Web port:        {web_port}")
    # Work status from agent
    tasks_root = project.tasks_root
    agent_cfg = tasks_root / task_id / "agent-config"
    ws = read_work_status(agent_cfg)
    if ws.status:
        print(f"  Work status:     {ws.status}")
        if ws.message:
            print(f"  Work message:    {ws.message}")

list_archived_tasks(project_id)

Return archived tasks for project_id, sorted newest-first.

Source code in src/terok/lib/orchestration/tasks.py
def list_archived_tasks(project_id: str) -> list[ArchivedTask]:
    """Return archived tasks for *project_id*, sorted newest-first."""
    archive_root = tasks_archive_dir(project_id)
    if not archive_root.is_dir():
        return []
    results: list[ArchivedTask] = []
    for entry in sorted(archive_root.iterdir(), reverse=True):
        if not entry.is_dir():
            continue
        meta_path = entry / "task.yml"
        if not meta_path.is_file():
            continue
        try:
            meta = _yaml_load(meta_path.read_text()) or {}
        except Exception:
            continue
        # Parse archive timestamp from directory name: <timestamp>_<task_id>[_<name>]
        parts = entry.name.split("_", 2)
        archived_at = parts[0] if parts else entry.name
        results.append(
            ArchivedTask(
                archive_dir=entry,
                archived_at=archived_at,
                task_id=str(meta.get("task_id", "")),
                name=meta.get("name", ""),
                mode=meta.get("mode"),
                exit_code=meta.get("exit_code"),
            )
        )
    return results

task_archive_list(project_id)

Print archived tasks for project_id.

Source code in src/terok/lib/orchestration/tasks.py
def task_archive_list(project_id: str) -> None:
    """Print archived tasks for *project_id*."""
    archived = list_archived_tasks(project_id)
    if not archived:
        print("No archived tasks found")
        return
    for a in archived:
        extra = []
        if a.mode:
            extra.append(f"mode={a.mode}")
        if a.exit_code is not None:
            extra.append(f"exit={a.exit_code}")
        extra_s = f" [{'; '.join(extra)}]" if extra else ""
        print(f"- {a.archived_at} #{a.task_id}: {a.name}{extra_s}")

task_archive_logs(project_id, archive_id)

Return the log file path for an archived task identified by archive_id.

archive_id is matched against archive directory names (prefix match). Returns the log file path if found, or None.

Source code in src/terok/lib/orchestration/tasks.py
def task_archive_logs(project_id: str, archive_id: str) -> Path | None:
    """Return the log file path for an archived task identified by *archive_id*.

    *archive_id* is matched against archive directory names (prefix match).
    Returns the log file path if found, or ``None``.
    """
    archive_root = tasks_archive_dir(project_id)
    if not archive_root.is_dir():
        return None
    for entry in sorted(archive_root.iterdir(), reverse=True):
        if entry.is_dir() and entry.name.startswith(archive_id):
            log_file = entry / "logs" / "container.log"
            if log_file.is_file():
                return log_file
    return None