Skip to content

terok_clearance

terok_clearance

Clearance hub + desktop notification library for terok.

Two unrelated wire formats live under this one package:

  • org.terok.Clearance1 over a unix-socket varlink transport — the hub (ClearanceHub) and the client library (ClearanceClient, EventSubscriber) that drive the per-container block / verdict / lifecycle flow.
  • org.freedesktop.Notifications over D-Bus — the DbusNotifier wrapper that renders those events as desktop popups. Kept because that's the OS API; every other D-Bus path in this package (org.terok.Shield1) was removed in favour of the varlink transport.

CLEARANCE_INTERFACE_NAME = 'org.terok.Clearance1' module-attribute

__all__ = ['CLEARANCE_INTERFACE_NAME', 'CallbackNotifier', 'Clearance1Interface', 'ClearanceClient', 'ClearanceEvent', 'ClearanceHub', 'ContainerIdentity', 'ContainerInfo', 'ContainerInspector', 'DbusNotifier', 'EventSubscriber', 'IdentityResolver', 'InvalidAction', 'Notification', 'Notifier', 'NullInspector', 'NullNotifier', 'ShieldCliFailed', 'UnknownRequest', 'VerdictTupleMismatch', 'check_units_outdated', 'configure_logging', 'create_notifier', 'default_clearance_socket_path', 'install_notifier_service', 'read_installed_unit_version', 'serve', 'uninstall_notifier_service', 'uninstall_service', 'wait_for_shutdown_signal'] module-attribute

__version__ = '0.0.0' module-attribute

ClearanceClient(*, socket_path=None)

Thin async client for the Clearance1 varlink service.

Two async coroutines to drive:

  • start — open the subscribe + RPC connections and begin relaying events to the user-supplied callback. Returns once both channels are live; events arrive via on_event from then on.
  • verdict — RPC call; returns True if terok-shield applied the action, False on any refusal or shield failure. The refusal reason is logged at WARNING.

The callback runs on the same event loop as the rest of the client; exceptions it raises are logged and swallowed so one bad handler can't kill the stream for every subsequent event.

Remember the target socket; defaults to default_clearance_socket_path.

Source code in src/terok_clearance/client/client.py
def __init__(self, *, socket_path: Path | None = None) -> None:
    """Remember the target socket; defaults to [`default_clearance_socket_path`][terok_clearance.client.client.default_clearance_socket_path]."""
    self._socket_path = socket_path or default_clearance_socket_path()
    self._on_event: EventCallback | None = None
    self._sub_transport: object | None = None
    self._rpc_transport: object | None = None
    self._sub_proxy: object | None = None
    self._rpc_proxy: object | None = None
    self._stream_task: asyncio.Task[None] | None = None
    self._stopping = False
    # Set by [`poke_reconnect`][terok_clearance.client.client.ClearanceClient.poke_reconnect]; awaited inside the back-off
    # window.  Constructed here (not lazily) so a focus-gain poke
    # that lands between ``start()`` and the first ``_run_stream``
    # iteration isn't silently dropped.
    self._reconnect_poke = asyncio.Event()

start(on_event) async

Open both connections and begin relaying events to on_event.

The initial connect is awaited synchronously so callers see start() return only after the subscription is live — a hub that's down at startup still propagates as an exception. Subsequent drops are handled by _run_stream's internal reconnect loop so long-running consumers (TUI, notifier) survive a systemctl restart terok-clearance without restarting themselves.

Source code in src/terok_clearance/client/client.py
async def start(self, on_event: EventCallback) -> None:
    """Open both connections and begin relaying events to *on_event*.

    The initial connect is awaited synchronously so callers see
    ``start()`` return only after the subscription is live — a
    hub that's down at startup still propagates as an exception.
    Subsequent drops are handled by `_run_stream`'s internal
    reconnect loop so long-running consumers (TUI, notifier)
    survive a ``systemctl restart terok-clearance`` without
    restarting themselves.
    """
    self._on_event = on_event
    self._stopping = False
    await self._connect()
    self._stream_task = asyncio.create_task(self._run_stream())

stop() async

Close both connections and await the stream task.

Source code in src/terok_clearance/client/client.py
async def stop(self) -> None:
    """Close both connections and await the stream task."""
    self._stopping = True
    if self._stream_task is not None:
        self._stream_task.cancel()
        with contextlib.suppress(asyncio.CancelledError, Exception):
            await self._stream_task
        self._stream_task = None
    self._close_transports()

poke_reconnect()

Skip any in-flight reconnect back-off and retry immediately.

Idempotent; a no-op when the stream is healthy because the event is only awaited inside _run_stream's back-off window.

Source code in src/terok_clearance/client/client.py
def poke_reconnect(self) -> None:
    """Skip any in-flight reconnect back-off and retry immediately.

    Idempotent; a no-op when the stream is healthy because the
    event is only awaited inside `_run_stream`'s back-off
    window.
    """
    self._reconnect_poke.set()

verdict(container, request_id, dest, action) async

Apply action (allow / deny) to dest via the hub's Verdict RPC.

Returns True when the hub accepted and applied the verdict, False for any refusal (unknown request_id, tuple mismatch, invalid action, shield-exec failure). Callers typically ignore the return value and let the subsequent verdict_applied event drive UI updates; refusal reasons are logged at WARNING.

Source code in src/terok_clearance/client/client.py
async def verdict(self, container: str, request_id: str, dest: str, action: str) -> bool:
    """Apply *action* (``allow`` / ``deny``) to *dest* via the hub's ``Verdict`` RPC.

    Returns ``True`` when the hub accepted and applied the verdict,
    ``False`` for any refusal (unknown request_id, tuple mismatch,
    invalid action, shield-exec failure).  Callers typically ignore
    the return value and let the subsequent ``verdict_applied``
    event drive UI updates; refusal reasons are logged at WARNING.
    """
    if self._rpc_proxy is None:
        _log.error("verdict() called before start()")
        return False
    try:
        reply = await self._rpc_proxy.Verdict(
            container=container,
            request_id=request_id,
            dest=dest,
            action=action,
        )
    except VarlinkErrorReply as err:
        _log.warning(
            "Verdict refused for %s (%s%s): %s",
            container,
            request_id,
            action,
            err,
        )
        return False
    # reply is {"ok": bool} per the return_parameter wrapper.
    return bool(reply.get("ok", False))

IdentityResolver(inspector)

Compose a ContainerInspector lookup + task-meta YAML into an identity.

Callable: resolver(container_id) -> ContainerIdentity. Four soft-fail paths, all returning a degraded identity that keeps the notification pipeline usable:

  • The inspector failed → empty ContainerIdentity; the subscriber falls back to the raw container ID.
  • Container carries no terok annotations (a standalone container that happened to hit the firewall) → container-name-only.
  • ai.terok.task_meta_path annotation absent → identity without task_name (project + task_id still present).
  • task_meta_path YAML unreadable / missing / malformed → same as above; the name field is left empty.

Configure the resolver with a ContainerInspector implementation.

The inspector is required (no default) so the caller owns the runtime-selection decision — clearance is runtime-neutral and must not reach for a specific backend itself. The notifier entry point picks an appropriate implementation at startup (terok-sandbox's create_container_inspector when available, NullInspector otherwise).

Source code in src/terok_clearance/client/identity_resolver.py
def __init__(self, inspector: ContainerInspector) -> None:
    """Configure the resolver with a [`ContainerInspector`][terok_clearance.client.identity_resolver.ContainerInspector] implementation.

    The inspector is required (no default) so the caller owns the
    runtime-selection decision — clearance is runtime-neutral and
    must not reach for a specific backend itself.  The notifier
    entry point picks an appropriate implementation at startup
    (terok-sandbox's ``create_container_inspector`` when available,
    [`NullInspector`][terok_clearance.NullInspector] otherwise).
    """
    self._inspector = inspector

__call__(container_id)

Return the task-aware identity for container_id.

Source code in src/terok_clearance/client/identity_resolver.py
def __call__(self, container_id: str) -> ContainerIdentity:
    """Return the task-aware identity for *container_id*."""
    try:
        info = self._inspector(container_id)
    except Exception:
        # The contract says inspectors soft-fail by returning an
        # empty ``ContainerInfo``, but a runtime-side race or an
        # unexpected error path in a third-party backend can still
        # raise.  Clamp it here so the caller (notifier / TUI)
        # never takes a crash from identity resolution.
        _log.debug("ContainerInspector raised for %s", container_id, exc_info=True)
        return ContainerIdentity()
    if not info.container_id:
        return ContainerIdentity()
    project = info.annotations.get(ANNOTATION_PROJECT, "")
    task_id = info.annotations.get(ANNOTATION_TASK, "")
    if not (project and task_id):
        return ContainerIdentity(container_name=info.name, project=project, task_id=task_id)
    meta_path = info.annotations.get(ANNOTATION_TASK_META_PATH, "")
    return ContainerIdentity(
        container_name=info.name,
        project=project,
        task_id=task_id,
        task_name=_read_task_name(meta_path) if meta_path else "",
    )

EventSubscriber(notifier, client=None, *, identity_resolver=None, socket_path=None)

Bridge clearance-hub events into desktop notifications.

Owns the presentation-layer state a rendering client needs: live-block dedup keyed on (container, target), the tracked ShieldDown popup per container so ShieldUp can retire it, and verdict routing through notifier action callbacks.

Parameters:

Name Type Description Default
notifier Notifier

Desktop notification backend (any Notifier works).

required
client ClearanceClient | None

Pre-configured ClearanceClient. When omitted, one is created on start pointing at socket_path (defaulting to default_clearance_socket_path).

None
identity_resolver Callable[[str], ContainerIdentity] | None

Turns a short container ID into a ContainerIdentity so terok task annotations surface as "Task: project/task_id · name" bodies. Called from a worker thread so a slow podman inspect doesn't stall the event loop. None renders the raw container ID.

None
socket_path Path | None

Clearance-socket override when client isn't supplied (tests).

None

Initialise the subscriber with a notifier and transport.

Source code in src/terok_clearance/client/subscriber.py
def __init__(
    self,
    notifier: Notifier,
    client: ClearanceClient | None = None,
    *,
    identity_resolver: Callable[[str], ContainerIdentity] | None = None,
    socket_path: Path | None = None,
) -> None:
    """Initialise the subscriber with a notifier and transport."""
    self._notifier = notifier
    self._client = client or ClearanceClient(socket_path=socket_path)
    self._identity_resolver = identity_resolver
    # request_id → pending block + its notification.
    self._pending: dict[str, _PendingBlock] = {}
    # container → notification_id of the active ShieldDown popup, so
    # ShieldUp can close the matching one before firing its brief
    # confirmation.  A stale "Shield DOWN" popup after shield is back
    # is a security hazard, not a benign leftover.
    self._shield_down_notifs: dict[str, int] = {}
    # Background action / lifecycle tasks we spawn.
    self._tasks: set[asyncio.Task[None]] = set()

start() async

Connect to the clearance hub and begin rendering its event stream.

Source code in src/terok_clearance/client/subscriber.py
async def start(self) -> None:
    """Connect to the clearance hub and begin rendering its event stream."""
    await self._client.start(self._on_event)
    _log.info("clearance subscriber online")

stop() async

Drain pending tasks and close the transport.

Closes the client first so no new handler tasks are scheduled, then awaits the currently-tracked tasks to settle (with their own CancelledError suppressed). A bare sleep(0) would yield only one loop turn — not enough for cancellation to propagate through chained awaits — and tasks.clear() on its own would drop references to tasks still writing to handles we then close.

Source code in src/terok_clearance/client/subscriber.py
async def stop(self) -> None:
    """Drain pending tasks and close the transport.

    Closes the client first so no new handler tasks are scheduled,
    then awaits the currently-tracked tasks to settle (with their
    own ``CancelledError`` suppressed).  A bare ``sleep(0)`` would
    yield only one loop turn — not enough for cancellation to
    propagate through chained awaits — and ``tasks.clear()`` on its
    own would drop references to tasks still writing to handles we
    then close.
    """
    tasks = list(self._tasks)
    for task in tasks:
        task.cancel()
    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)
    self._tasks.clear()
    await self._client.stop()
    self._pending.clear()
    self._shield_down_notifs.clear()

poke_reconnect()

Cut short any in-flight reconnect back-off — forwards to the client.

Source code in src/terok_clearance/client/subscriber.py
def poke_reconnect(self) -> None:
    """Cut short any in-flight reconnect back-off — forwards to the client."""
    self._client.poke_reconnect()

ContainerInfo(container_id='', name='', state='', annotations=(lambda: _EMPTY_ANNOTATIONS)()) dataclass

What podman inspect tells us about one container.

Empty instance (ContainerInfo()) represents "not found" or "lookup failed" — callers should treat missing fields as best-effort and fall back to the raw container ID when they don't have a better label.

container_id = '' class-attribute instance-attribute

The short ID podman reported back, or empty on failure.

name = '' class-attribute instance-attribute

The container's name without podman's leading / prefix.

state = '' class-attribute instance-attribute

Lifecycle state: running, exited, created, etc. Empty when unknown.

annotations = field(default_factory=(lambda: _EMPTY_ANNOTATIONS)) class-attribute instance-attribute

Every OCI annotation podman recorded for this container.

Exposed as a read-only Mapping — cached instances are shared across inspector callers, so mutating the underlying dict would poison future lookups. Build with types.MappingProxyType at construction time; callers (clearance's task-aware resolver, anything else that cares) pluck out the keys they know about.

ClearanceEvent(type, container, request_id='', dest='', port=0, proto=0, domain='', action='', ok=False, reason='') dataclass

One event fanned out to every Subscribe() caller.

type + container are always populated; the remaining fields are filled in per-kind and default to zero-values otherwise.

Known values of type (additional fields beyond container):

  • connection_blockedrequest_id, dest, port, proto, domain. Requires an operator verdict.
  • verdict_appliedrequest_id, action, ok.
  • container_started — no extras.
  • container_exitedreason.
  • shield_up / shield_down / shield_down_all — no extras.

Unknown values are forwarded unchanged so the wire format can grow without breaking clients pinned to older schemas.

type instance-attribute

container instance-attribute

request_id = '' class-attribute instance-attribute

dest = '' class-attribute instance-attribute

port = 0 class-attribute instance-attribute

proto = 0 class-attribute instance-attribute

domain = '' class-attribute instance-attribute

action = '' class-attribute instance-attribute

ok = False class-attribute instance-attribute

reason = '' class-attribute instance-attribute

ContainerIdentity(container_name='', project='', task_id='', task_name='') dataclass

Host-side facts about a container, as much as the resolver found.

Terok-managed task containers carry project and task_id via OCI annotations set at podman run time; task_name is looked up live from terok's task metadata so a rename between block and verdict is reflected in the resolved popup. Standalone containers produce an instance with only container_name set (or empty everywhere when podman inspect itself failed).

container_name = '' class-attribute instance-attribute

project = '' class-attribute instance-attribute

task_id = '' class-attribute instance-attribute

task_name = '' class-attribute instance-attribute

ContainerInspector

Bases: Protocol

Callable that maps a container id to a ContainerInfo.

The protocol intentionally covers only the notification-rendering use case — name + OCI annotations + lifecycle state. Broader runtime operations (exec, mount, signals) live on terok_sandbox.runtime.ContainerRuntime and are not part of this contract.

Implementations MUST soft-fail: an unreachable runtime / missing container / malformed metadata returns an empty ContainerInfo rather than raising, so notification pipelines keep their fallback label instead of crashing on a lookup hiccup.

__call__(container_id)

Return the best-effort ContainerInfo for container_id.

Source code in src/terok_clearance/domain/inspector.py
def __call__(self, container_id: str) -> ContainerInfo:
    """Return the best-effort [`ContainerInfo`][terok_clearance.ContainerInfo] for *container_id*."""
    ...

NullInspector

Always-empty ContainerInspector — the graceful-degradation default.

Installed when no runtime-aware package provides a concrete backend. Every lookup returns ContainerInfo() so the notifier still renders (raw container id, no enrichment).

__call__(_container_id)

Return the universal empty ContainerInfo.

Source code in src/terok_clearance/domain/inspector.py
def __call__(self, _container_id: str) -> ContainerInfo:
    """Return the universal empty [`ContainerInfo`][terok_clearance.ContainerInfo]."""
    return ContainerInfo()

ClearanceHub(*, clearance_socket=None, reader_socket=None, verdict_client=None)

Server for the org.terok.Clearance1 interface.

Owns three pieces of state:

  • _subscribers — a set of bounded per-connection queues; the hub puts a ClearanceEvent on each one every time the reader ingester delivers an event. Slow clients see their oldest events dropped; fast clients aren't affected.
  • _live_verdicts — the request_id → (container, dest) map the Verdict method checks for the authz binding.
  • An EventIngester bound to the canonical reader socket.

Lifecycle: start brings everything up; stop tears it down under individual timeouts so a flaky bus or a stuck subscriber can't burn systemd's stop-sigterm deadline.

Configure the two sockets and the verdict-helper client.

verdict_client is injected so tests can stub out shield exec without spawning the helper process. Production callers leave it defaulted — a fresh VerdictClient pointing at the canonical helper socket.

Source code in src/terok_clearance/hub/server.py
def __init__(
    self,
    *,
    clearance_socket: Path | None = None,
    reader_socket: Path | None = None,
    verdict_client: VerdictClient | None = None,
) -> None:
    """Configure the two sockets and the verdict-helper client.

    ``verdict_client`` is injected so tests can stub out shield exec
    without spawning the helper process.  Production callers leave
    it defaulted — a fresh [`VerdictClient`][terok_clearance.hub.server.VerdictClient] pointing at the
    canonical helper socket.
    """
    self._clearance_socket = clearance_socket or default_clearance_socket_path()
    self._reader_socket = reader_socket  # None → EventIngester picks its default.
    self._verdict_client = verdict_client or VerdictClient()

    self._subscribers: set[asyncio.Queue[ClearanceEvent]] = set()
    # request_id → (container, dest) the hub emitted in the matching
    # ConnectionBlocked; Verdict calls must cite a triple that matches.
    self._live_verdicts: dict[str, tuple[str, str]] = {}

    self._ingester: EventIngester | None = None
    self._varlink_server: object | None = None  # asyncvarlink's UnixServer

start() async

Bring the ingester + varlink server online and accept clients.

Transactional: if the varlink bind fails after the ingester is already listening, the ingester is stopped before the exception propagates so a half-started hub doesn't leak a live reader-side socket on systemd restart paths.

Source code in src/terok_clearance/hub/server.py
async def start(self) -> None:
    """Bring the ingester + varlink server online and accept clients.

    Transactional: if the varlink bind fails after the ingester is
    already listening, the ingester is stopped before the exception
    propagates so a half-started hub doesn't leak a live
    reader-side socket on systemd restart paths.
    """
    self._ingester = EventIngester(
        socket_path=self._reader_socket or _default_reader_socket(),
        on_event=self._relay_reader_event,
    )
    await self._ingester.start()
    try:
        registry = VarlinkInterfaceRegistry()
        registry.register_interface(
            Clearance1Interface(
                event_stream_factory=self._subscribe,
                apply_verdict=self._apply_verdict,
            )
        )
        registry.register_interface(
            VarlinkServiceInterface(
                vendor="terok",
                product="terok-clearance",
                version=_own_version(),
                url="https://github.com/terok-ai/terok-clearance",
                registry=registry,
            )
        )

        from terok_clearance.wire.socket import bind_hardened

        async def _factory(path: str) -> object:
            return await create_unix_server(registry.protocol_factory, path=path)

        self._varlink_server = await bind_hardened(
            _factory, self._clearance_socket, "clearance"
        )
    except BaseException:
        with contextlib.suppress(Exception):
            await self._ingester.stop()
        self._ingester = None
        raise
    _log.info("clearance hub online at %s", self._clearance_socket)

stop() async

Close the varlink server + ingester; drain subscriber queues.

Source code in src/terok_clearance/hub/server.py
async def stop(self) -> None:
    """Close the varlink server + ingester; drain subscriber queues."""
    if self._varlink_server is not None:
        # ``close()`` on its own only stops accepting new connections;
        # existing subscribers would sit forever in ``queue.get()`` and
        # ``wait_closed`` would hang until the timeout fires.
        # ``close_clients()`` walks the live transports and closes them,
        # which makes the server-side ``_call_async_method_more``'s
        # next ``send_reply`` fail with OSError — that in turn calls
        # ``generator.aclose()`` on the subscriber, propagating cleanly
        # through to our ``finally`` block.  This avoids the
        # assertion asyncvarlink fires when a streaming generator
        # ends "normally" with ``continues=True`` on the last reply.
        self._varlink_server.close()
        with contextlib.suppress(AttributeError):
            self._varlink_server.close_clients()
        with contextlib.suppress(TimeoutError, Exception):
            await asyncio.wait_for(self._varlink_server.wait_closed(), timeout=1.0)
        self._varlink_server = None
    if self._ingester is not None:
        with contextlib.suppress(Exception):
            await self._ingester.stop()
        self._ingester = None
    with contextlib.suppress(Exception):
        await self._verdict_client.stop()
    self._subscribers.clear()
    self._live_verdicts.clear()

CallbackNotifier(on_notify=None, *, on_container_started=None, on_container_exited=None, on_shield_up=None, on_shield_down=None, on_shield_down_all=None)

Notifier backend that delegates rendering to caller-supplied hooks.

Parameters:

Name Type Description Default
on_notify Callable[[Notification], None] | None

Called for every notify() with a Notification. Receives new notifications (replaces_id == 0) and in-place updates (replaces_id > 0, e.g. verdict results).

None
on_container_started Callable[[str], None] | None

Called for every ContainerStarted signal with the short container ID. Optional — consumers that don't care about container lifecycle skip the parameter.

None
on_container_exited Callable[[str, str], None] | None

Called for every ContainerExited signal with (container, reason). Optional, same semantics.

None
on_shield_up Callable[[str], None] | None

Called for every ShieldUp signal with the container identifier. Lets the TUI flip a "shielded" badge on the per-container row without polling nft state.

None
on_shield_down Callable[[str], None] | None

Called for every ShieldDown signal — partial bypass (loopback-only traffic still allowed).

None
on_shield_down_all Callable[[str], None] | None

Called for every ShieldDownAll signal — unrestricted bypass. Split from on_shield_down so the consumer can render the two modes differently.

None

Bind optional notify and lifecycle callbacks.

Source code in src/terok_clearance/notifications/callback.py
def __init__(
    self,
    on_notify: Callable[[Notification], None] | None = None,
    *,
    on_container_started: Callable[[str], None] | None = None,
    on_container_exited: Callable[[str, str], None] | None = None,
    on_shield_up: Callable[[str], None] | None = None,
    on_shield_down: Callable[[str], None] | None = None,
    on_shield_down_all: Callable[[str], None] | None = None,
) -> None:
    """Bind optional notify and lifecycle callbacks."""
    self._on_notify = on_notify
    self._on_container_started = on_container_started
    self._on_container_exited = on_container_exited
    self._on_shield_up = on_shield_up
    self._on_shield_down = on_shield_down
    self._on_shield_down_all = on_shield_down_all
    self._next_id = 1
    self._callbacks: dict[int, Callable[[str], None]] = {}

notify(summary, body='', *, actions=(), timeout_ms=-1, hints=None, replaces_id=0, app_icon='', container_id='', container_name='', project='', task_id='', task_name='') async

Record the notification and invoke the on_notify hook.

Returns a monotonically increasing ID, or replaces_id for updates.

Source code in src/terok_clearance/notifications/callback.py
async def notify(
    self,
    summary: str,
    body: str = "",
    *,
    actions: Sequence[tuple[str, str]] = (),
    timeout_ms: int = -1,
    hints: Mapping[str, Any] | None = None,
    replaces_id: int = 0,
    app_icon: str = "",
    container_id: str = "",
    container_name: str = "",
    project: str = "",
    task_id: str = "",
    task_name: str = "",
) -> int:
    """Record the notification and invoke the ``on_notify`` hook.

    Returns a monotonically increasing ID, or *replaces_id* for updates.
    """
    nid = replaces_id if replaces_id else self._next_id
    if not replaces_id:
        self._next_id += 1
    notification = Notification(
        nid=nid,
        summary=summary,
        body=body,
        actions=list(actions),
        replaces_id=replaces_id,
        timeout_ms=timeout_ms,
        container_id=container_id,
        container_name=container_name,
        project=project,
        task_id=task_id,
        task_name=task_name,
    )
    if self._on_notify:
        self._on_notify(notification)
    return nid

on_action(notification_id, callback) async

Store the action callback for later invocation.

Source code in src/terok_clearance/notifications/callback.py
async def on_action(
    self,
    notification_id: int,
    callback: Callable[[str], None],
) -> None:
    """Store the action callback for later invocation."""
    self._callbacks[notification_id] = callback

close(notification_id) async

Remove the callback for a closed notification.

Source code in src/terok_clearance/notifications/callback.py
async def close(self, notification_id: int) -> None:
    """Remove the callback for a closed notification."""
    self._callbacks.pop(notification_id, None)

disconnect() async

Release all stored callbacks.

Source code in src/terok_clearance/notifications/callback.py
async def disconnect(self) -> None:
    """Release all stored callbacks."""
    self._callbacks.clear()

invoke_action(notification_id, action_key)

Invoke the stored callback for a user verdict.

This is the entry point for consumers that handle user input (Allow/Deny) and need to route the decision back through EventSubscriber to the D-Bus Verdict/Resolve method.

Source code in src/terok_clearance/notifications/callback.py
def invoke_action(self, notification_id: int, action_key: str) -> None:
    """Invoke the stored callback for a user verdict.

    This is the entry point for consumers that handle user input
    (Allow/Deny) and need to route the decision back through
    ``EventSubscriber`` to the D-Bus ``Verdict``/``Resolve`` method.
    """
    if cb := self._callbacks.pop(notification_id, None):
        cb(action_key)

on_container_started(container)

Forward a ContainerStarted lifecycle event to the consumer hook.

Source code in src/terok_clearance/notifications/callback.py
def on_container_started(self, container: str) -> None:
    """Forward a ``ContainerStarted`` lifecycle event to the consumer hook."""
    if self._on_container_started:
        self._on_container_started(container)

on_container_exited(container, reason)

Forward a ContainerExited lifecycle event to the consumer hook.

Source code in src/terok_clearance/notifications/callback.py
def on_container_exited(self, container: str, reason: str) -> None:
    """Forward a ``ContainerExited`` lifecycle event to the consumer hook."""
    if self._on_container_exited:
        self._on_container_exited(container, reason)

on_shield_up(container)

Forward a ShieldUp signal to the consumer hook.

Source code in src/terok_clearance/notifications/callback.py
def on_shield_up(self, container: str) -> None:
    """Forward a ``ShieldUp`` signal to the consumer hook."""
    if self._on_shield_up:
        self._on_shield_up(container)

on_shield_down(container)

Forward a ShieldDown signal (partial bypass) to the consumer hook.

Source code in src/terok_clearance/notifications/callback.py
def on_shield_down(self, container: str) -> None:
    """Forward a ``ShieldDown`` signal (partial bypass) to the consumer hook."""
    if self._on_shield_down:
        self._on_shield_down(container)

on_shield_down_all(container)

Forward a ShieldDownAll signal (full bypass) to the consumer hook.

Source code in src/terok_clearance/notifications/callback.py
def on_shield_down_all(self, container: str) -> None:
    """Forward a ``ShieldDownAll`` signal (full bypass) to the consumer hook."""
    if self._on_shield_down_all:
        self._on_shield_down_all(container)

Notification(nid, summary, body, actions, replaces_id, timeout_ms, container_id='', container_name='', project='', task_id='', task_name='') dataclass

Snapshot of a single notification posted by the subscriber.

The identity fields (container_id, container_name, project, task_id, task_name) are presentation-layer context the subscriber's identity_resolver produced — empty strings when unresolved. The desktop DbusNotifier discards all of them; the TUI uses the task triple to render a Task column for terok-managed containers and falls back to the container name for standalone ones.

nid instance-attribute

summary instance-attribute

body instance-attribute

actions instance-attribute

replaces_id instance-attribute

timeout_ms instance-attribute

container_id = '' class-attribute instance-attribute

container_name = '' class-attribute instance-attribute

project = '' class-attribute instance-attribute

task_id = '' class-attribute instance-attribute

task_name = '' class-attribute instance-attribute

DbusNotifier(app_name='terok')

Send desktop notifications over the D-Bus session bus.

The connection is established lazily on the first notify call. Action callbacks are dispatched from the ActionInvoked signal; stale callbacks are cleaned up automatically on NotificationClosed.

Parameters:

Name Type Description Default
app_name str

Application name sent with every notification.

'terok'

Initialise with the given application name.

Source code in src/terok_clearance/notifications/desktop.py
def __init__(self, app_name: str = "terok") -> None:
    """Initialise with the given application name."""
    self._app_name = app_name
    self._conn: _Connection | None = None
    self._callbacks: dict[int, Callable[[str], None]] = {}
    self._connect_lock = asyncio.Lock()

connect() async

Idempotently open the session-bus connection and subscribe to signals.

Safe to call concurrently and repeatedly: the lock serialises racing callers so exactly one MessageBus is ever created for this notifier.

Source code in src/terok_clearance/notifications/desktop.py
async def connect(self) -> None:
    """Idempotently open the session-bus connection and subscribe to signals.

    Safe to call concurrently and repeatedly: the lock serialises racing
    callers so exactly one MessageBus is ever created for this notifier.
    """
    if self._conn is not None:
        return
    async with self._connect_lock:
        if self._conn is not None:
            return
        bus = await MessageBus().connect()
        try:
            introspection = await bus.introspect(BUS_NAME, OBJECT_PATH)
            proxy = bus.get_proxy_object(BUS_NAME, OBJECT_PATH, introspection)
            iface = proxy.get_interface(INTERFACE_NAME)
            if hasattr(iface, "on_action_invoked"):
                iface.on_action_invoked(self._handle_action)
            if hasattr(iface, "on_notification_closed"):
                iface.on_notification_closed(self._handle_closed)
        except BaseException:
            # Catch ``BaseException`` so an ``asyncio.CancelledError``
            # (``BaseException`` subclass on 3.11+) mid-handshake doesn't
            # leak the already-connected bus.
            bus.disconnect()
            raise
        self._conn = _Connection(bus=bus, interface=iface)

notify(summary, body='', *, actions=(), timeout_ms=-1, hints=None, replaces_id=0, app_icon='', container_id='', container_name='', project='', task_id='', task_name='') async

Send a desktop notification.

Freedesktop notifications render summary + body + actions only, so the structured identity kwargs (container_id and the terok task triple) are dropped on the floor here — callers are expected to have folded the user-facing identity into body already. The kwargs stay in the signature for Notifier conformance so callers don't have to branch on notifier kind.

Source code in src/terok_clearance/notifications/desktop.py
async def notify(
    self,
    summary: str,
    body: str = "",
    *,
    actions: Sequence[tuple[str, str]] = (),
    timeout_ms: int = -1,
    hints: Mapping[str, Any] | None = None,
    replaces_id: int = 0,
    app_icon: str = "",
    container_id: str = "",  # noqa: ARG002 — protocol kwarg ignored by desktop
    container_name: str = "",  # noqa: ARG002 — protocol kwarg ignored by desktop
    project: str = "",  # noqa: ARG002 — protocol kwarg ignored by desktop
    task_id: str = "",  # noqa: ARG002 — protocol kwarg ignored by desktop
    task_name: str = "",  # noqa: ARG002 — protocol kwarg ignored by desktop
) -> int:
    """Send a desktop notification.

    Freedesktop notifications render summary + body + actions only,
    so the structured identity kwargs (``container_id`` and the
    terok task triple) are dropped on the floor here — callers are
    expected to have folded the user-facing identity into ``body``
    already.  The kwargs stay in the signature for
    [`Notifier`][terok_clearance.notifications.protocol.Notifier] conformance so callers
    don't have to branch on notifier kind.
    """
    await self.connect()
    assert self._conn is not None  # connect() post-condition

    actions_flat: list[str] = []
    for action_id, label in actions:
        actions_flat.extend((action_id, label))

    return await self._conn.interface.call_notify(
        self._app_name,
        replaces_id,
        app_icon or _DEFAULT_APP_ICON,
        summary,
        body,
        actions_flat,
        dict(hints) if hints is not None else {},
        timeout_ms,
    )

on_action(notification_id, callback) async

Register a callback for when the user clicks an action button.

Parameters:

Name Type Description Default
notification_id int

ID returned by notify.

required
callback Callable[[str], None]

Called with the action_id string when invoked.

required
Source code in src/terok_clearance/notifications/desktop.py
async def on_action(
    self,
    notification_id: int,
    callback: Callable[[str], None],
) -> None:
    """Register a callback for when the user clicks an action button.

    Args:
        notification_id: ID returned by ``notify``.
        callback: Called with the ``action_id`` string when invoked.
    """
    self._callbacks[notification_id] = callback

close(notification_id) async

Close an active notification.

Parameters:

Name Type Description Default
notification_id int

ID returned by notify.

required
Source code in src/terok_clearance/notifications/desktop.py
async def close(self, notification_id: int) -> None:
    """Close an active notification.

    Args:
        notification_id: ID returned by ``notify``.
    """
    self._callbacks.pop(notification_id, None)
    if self._conn is not None:
        await self._conn.interface.call_close_notification(notification_id)

disconnect() async

Tear down the session-bus connection.

Source code in src/terok_clearance/notifications/desktop.py
async def disconnect(self) -> None:
    """Tear down the session-bus connection."""
    conn = self._conn
    if conn is None:
        return
    if hasattr(conn.interface, "off_action_invoked"):
        conn.interface.off_action_invoked(self._handle_action)
    if hasattr(conn.interface, "off_notification_closed"):
        conn.interface.off_notification_closed(self._handle_closed)
    conn.bus.disconnect()
    self._conn = None
    self._callbacks.clear()

NullNotifier

Silent fallback that satisfies the Notifier protocol.

Every method is a no-op. notify always returns 0.

notify(summary, body='', *, actions=(), timeout_ms=-1, hints=None, replaces_id=0, app_icon='', container_id='', container_name='', project='', task_id='', task_name='') async

Accept and discard a notification, returning 0.

Source code in src/terok_clearance/notifications/null.py
async def notify(
    self,
    summary: str,
    body: str = "",
    *,
    actions: Sequence[tuple[str, str]] = (),
    timeout_ms: int = -1,
    hints: Mapping[str, Any] | None = None,
    replaces_id: int = 0,
    app_icon: str = "",
    container_id: str = "",
    container_name: str = "",
    project: str = "",
    task_id: str = "",
    task_name: str = "",
) -> int:
    """Accept and discard a notification, returning ``0``."""
    return 0

on_action(notification_id, callback) async

Accept and discard an action callback registration.

Source code in src/terok_clearance/notifications/null.py
async def on_action(
    self,
    notification_id: int,
    callback: Callable[[str], None],
) -> None:
    """Accept and discard an action callback registration."""

close(notification_id) async

Accept and discard a close request.

Source code in src/terok_clearance/notifications/null.py
async def close(self, notification_id: int) -> None:
    """Accept and discard a close request."""

disconnect() async

Accept and discard a teardown request.

Source code in src/terok_clearance/notifications/null.py
async def disconnect(self) -> None:
    """Accept and discard a teardown request."""

Notifier

Bases: Protocol

Structural type for desktop notification backends.

Implementations must provide notify, on_action, close, and disconnect. DbusNotifier talks to a real session bus; NullNotifier silently discards everything for headless environments.

notify(summary, body='', *, actions=(), timeout_ms=-1, hints=None, replaces_id=0, app_icon='', container_id='', container_name='', project='', task_id='', task_name='') async

Send a desktop notification.

Parameters:

Name Type Description Default
summary str

Notification title.

required
body str

Optional body text.

''
actions Sequence[tuple[str, str]]

(action_id, label) pairs rendered as buttons.

()
timeout_ms int

Expiration hint in milliseconds (-1 = server default).

-1
hints Mapping[str, Any] | None

Freedesktop hint dict (values are dbus_fast.Variant for DbusNotifier, ignored by NullNotifier).

None
replaces_id int

Replace an existing notification in-place.

0
app_icon str

Icon name or file:/// URI.

''
container_id str

Presentation-layer hint: the 12-char podman container ID the event refers to. The desktop DbusNotifier ignores it; CallbackNotifier attaches it to the Notification so rich consumers can render it alongside the user-facing name.

''
container_name str

Podman --name matching the ID. Same propagation rules as container_id.

''
project str

Terok project slug when the container is orchestrator- managed (from the ai.terok.project annotation). Empty for standalone containers.

''
task_id str

Terok task ID (ai.terok.task annotation); empty for standalone containers.

''
task_name str

Human-readable task label from terok's metadata — mutable at any point in the task's life, so resolved live by callers, not snapshotted. Empty when unknown.

''

Returns:

Type Description
int

Server-assigned notification ID (0 for null implementations).

Source code in src/terok_clearance/notifications/protocol.py
async def notify(
    self,
    summary: str,
    body: str = "",
    *,
    actions: Sequence[tuple[str, str]] = (),
    timeout_ms: int = -1,
    hints: Mapping[str, Any] | None = None,
    replaces_id: int = 0,
    app_icon: str = "",
    container_id: str = "",
    container_name: str = "",
    project: str = "",
    task_id: str = "",
    task_name: str = "",
) -> int:
    """Send a desktop notification.

    Args:
        summary: Notification title.
        body: Optional body text.
        actions: ``(action_id, label)`` pairs rendered as buttons.
        timeout_ms: Expiration hint in milliseconds (``-1`` = server default).
        hints: Freedesktop hint dict (values are ``dbus_fast.Variant`` for
            ``DbusNotifier``, ignored by ``NullNotifier``).
        replaces_id: Replace an existing notification in-place.
        app_icon: Icon name or ``file:///`` URI.
        container_id: Presentation-layer hint: the 12-char podman
            container ID the event refers to.  The desktop
            ``DbusNotifier`` ignores it; ``CallbackNotifier`` attaches
            it to the [`Notification`][terok_clearance.notifications.callback.Notification] so
            rich consumers can render it alongside the user-facing name.
        container_name: Podman ``--name`` matching the ID.  Same
            propagation rules as ``container_id``.
        project: Terok project slug when the container is orchestrator-
            managed (from the ``ai.terok.project`` annotation).  Empty
            for standalone containers.
        task_id: Terok task ID (``ai.terok.task`` annotation); empty
            for standalone containers.
        task_name: Human-readable task label from terok's metadata —
            mutable at any point in the task's life, so resolved live
            by callers, not snapshotted.  Empty when unknown.

    Returns:
        Server-assigned notification ID (``0`` for null implementations).
    """
    ...

on_action(notification_id, callback) async

Register a callback for when the user clicks an action button.

Parameters:

Name Type Description Default
notification_id int

ID returned by notify.

required
callback Callable[[str], None]

Called with the action_id string when invoked.

required
Source code in src/terok_clearance/notifications/protocol.py
async def on_action(
    self,
    notification_id: int,
    callback: Callable[[str], None],
) -> None:
    """Register a callback for when the user clicks an action button.

    Args:
        notification_id: ID returned by ``notify``.
        callback: Called with the ``action_id`` string when invoked.
    """
    ...

close(notification_id) async

Close an active notification.

Parameters:

Name Type Description Default
notification_id int

ID returned by notify.

required
Source code in src/terok_clearance/notifications/protocol.py
async def close(self, notification_id: int) -> None:
    """Close an active notification.

    Args:
        notification_id: ID returned by ``notify``.
    """
    ...

disconnect() async

Release backend resources (no-op for null backends).

Source code in src/terok_clearance/notifications/protocol.py
async def disconnect(self) -> None:
    """Release backend resources (no-op for null backends)."""
    ...

InvalidAction

Bases: TypedVarlinkErrorReply

action wasn't one of allow / deny.

Parameters

Typed payload for the varlink error reply.

action instance-attribute

ShieldCliFailed

Bases: TypedVarlinkErrorReply

terok-shield allow|deny exited non-zero or timed out.

Clients render this as the red "Allow failed" / "Deny failed" popup variant: the user's click reached the hub but the firewall didn't accept it, so the notification's premise ("you decided X") is misleading. stderr is whatever terok-shield wrote, truncated to a reasonable length by the hub.

Parameters

Typed payload for the varlink error reply.

action instance-attribute
stderr instance-attribute

UnknownRequest

Bases: TypedVarlinkErrorReply

Verdict referenced a request_id the hub didn't emit.

Fail-closed for the attacker's dream-up case: a peer connecting to the clearance socket synthesises a verdict for a block that was never broadcast. No binding, no action.

Parameters

Typed payload for the varlink error reply.

request_id instance-attribute

VerdictTupleMismatch

Bases: TypedVarlinkErrorReply

(container, dest) don't match the hub's pending record.

Cheap defence against replay attackers who sniffed a request_id on this connection but try to apply a verdict against a different destination. expected_* are what the hub recorded when it emitted connection_blocked; got_* are what the call carried.

Parameters

Typed payload for the varlink error reply.

expected_container instance-attribute
expected_dest instance-attribute
got_container instance-attribute
got_dest instance-attribute

Clearance1Interface(event_stream_factory, apply_verdict)

Bases: VarlinkInterface

Varlink interface served by the clearance hub.

Two callables are injected so the state machine stays testable without a live varlink connection:

  • event_stream_factory — returns a fresh AsyncIterator yielding ClearanceEvent instances. The hub owns one per connected subscriber so backpressure is local to the slow client.
  • apply_verdict — validates the triple and, on success, shells out to terok-shield. Raises a typed varlink error for any refusal path; returns True only when the shield invocation itself succeeded.

Bind the per-subscriber event stream factory and the verdict callable.

Source code in src/terok_clearance/wire/interface.py
def __init__(
    self,
    event_stream_factory: Callable[[], AsyncIterator[ClearanceEvent]],
    apply_verdict: Callable[[str, str, str, str], Awaitable[bool]],
) -> None:
    """Bind the per-subscriber event stream factory and the verdict callable."""
    self._event_stream_factory = event_stream_factory
    self._apply_verdict = apply_verdict

Subscribe() async

Stream hub events to this caller until the connection closes.

Every yield is forwarded immediately with continues=true; the stream ends only when the client disconnects. A buffered (delay_generator=True) stream would hold the first event until a second arrives, breaking the "something just happened" liveness contract operators expect from a notification channel.

Source code in src/terok_clearance/wire/interface.py
@varlinkmethod(return_parameter="event", delay_generator=False)
async def Subscribe(self) -> AsyncIterator[ClearanceEvent]:  # noqa: N802
    """Stream hub events to this caller until the connection closes.

    Every yield is forwarded immediately with ``continues=true``;
    the stream ends only when the client disconnects.  A buffered
    (``delay_generator=True``) stream would hold the first event
    until a second arrives, breaking the "something just happened"
    liveness contract operators expect from a notification channel.
    """
    async for event in self._event_stream_factory():
        yield event

Verdict(*, container, request_id, dest, action) async

Apply action (allow / deny) to dest for container.

Returns True when terok-shield accepted the verdict. Raises UnknownRequest, VerdictTupleMismatch, InvalidAction, or ShieldCliFailed on the four refusal paths — clients get a typed error they can render without stringly-matching the message.

Source code in src/terok_clearance/wire/interface.py
@varlinkmethod(return_parameter="ok")
async def Verdict(  # noqa: N802
    self, *, container: str, request_id: str, dest: str, action: str
) -> bool:
    """Apply *action* (``allow`` / ``deny``) to *dest* for *container*.

    Returns ``True`` when ``terok-shield`` accepted the verdict.
    Raises [`UnknownRequest`][terok_clearance.wire.errors.UnknownRequest],
    [`VerdictTupleMismatch`][terok_clearance.wire.errors.VerdictTupleMismatch],
    [`InvalidAction`][terok_clearance.wire.errors.InvalidAction], or
    [`ShieldCliFailed`][terok_clearance.wire.errors.ShieldCliFailed] on the
    four refusal paths — clients get a typed error they can render
    without stringly-matching the message.
    """
    return await self._apply_verdict(container, request_id, dest, action)

serve() async

Run the hub service until SIGINT/SIGTERM.

The entry point terok-clearance serve hands off here. Blocks forever on a signal-set asyncio.Event; systemd's SIGTERM flips it, then stop tears down the server under a timeout.

Source code in src/terok_clearance/hub/server.py
async def serve() -> None:  # pragma: no cover — integration path
    """Run the hub service until SIGINT/SIGTERM.

    The entry point ``terok-clearance serve`` hands off here.  Blocks forever
    on a signal-set [`asyncio.Event`][asyncio.Event]; systemd's SIGTERM flips it,
    then [`stop`][terok_clearance.hub.server.ClearanceHub.stop] tears down the server under a timeout.
    """
    from terok_clearance.runtime.service import configure_logging, wait_for_shutdown_signal

    configure_logging()
    hub = ClearanceHub()
    await hub.start()
    try:
        await wait_for_shutdown_signal()
    finally:
        await hub.stop()

create_notifier(app_name='terok') async

Return a connected DbusNotifier, or a NullNotifier on failure.

Parameters:

Name Type Description Default
app_name str

Application name sent with every notification.

'terok'

Returns:

Type Description
Notifier

A Notifier-compatible instance.

Source code in src/terok_clearance/notifications/factory.py
async def create_notifier(app_name: str = "terok") -> Notifier:
    """Return a connected ``DbusNotifier``, or a ``NullNotifier`` on failure.

    Args:
        app_name: Application name sent with every notification.

    Returns:
        A ``Notifier``-compatible instance.
    """
    notifier = DbusNotifier(app_name)
    try:
        await notifier.connect()
    except (OSError, DBusError, ValueError) as exc:
        _log.debug("D-Bus session bus unavailable, falling back to NullNotifier: %s", exc)
        return NullNotifier()
    return notifier

check_units_outdated()

Return a one-line drift warning if any installed unit is stale, else None.

Checks hub + verdict together (they're installed as a pair by install_service) plus the notifier independently (headless hosts may install it later, or not at all). None is returned when neither pair nor notifier is installed (headless host, or no setup command has run yet); a one-sided hub/verdict pair is reported as stale so the operator is prompted to restore it. A legacy terok-dbus.service on disk counts as "stale" so the operator is prompted to rerun setup and get the split pair.

Source code in src/terok_clearance/runtime/installer.py
def check_units_outdated() -> str | None:
    """Return a one-line drift warning if any installed unit is stale, else ``None``.

    Checks hub + verdict together (they're installed as a pair by
    [`install_service`][terok_clearance.runtime.installer.install_service]) plus the notifier independently (headless
    hosts may install it later, or not at all).  ``None`` is returned
    when neither pair nor notifier is installed (headless host, or
    no setup command has run yet); a one-sided hub/verdict pair is
    reported as stale so the operator is prompted to restore it.  A
    legacy ``terok-dbus.service`` on disk counts as "stale" so the
    operator is prompted to rerun setup and get the split pair.
    """
    legacy = _user_systemd_dir() / _LEGACY_UNIT_NAME
    if legacy.is_file():
        return (
            f"{_LEGACY_UNIT_NAME} is from a pre-split release — "
            f"{_RERUN_HINT} to migrate to the hub/verdict pair."
        )
    if (verdict := _check_pair_outdated()) is not None:
        return verdict
    return _check_notifier_outdated()

install_notifier_service(bin_path=None)

Render + write the notifier unit into the user systemd directory.

Paired with install_service: headless hosts that installed the hub + verdict pair can opt into the desktop notifier later by calling only this function. Daemon-reloads once at the end.

Parameters:

Name Type Description Default
bin_path Path | list[str] | None

Path to the notifier launcher, or a list[str] argv. None (the default) renders python -m terok_clearance.notifier.app against the running interpreter — same rationale as install_service.

None

Returns:

Type Description
Path

The on-disk path of the written unit file.

Source code in src/terok_clearance/runtime/installer.py
def install_notifier_service(bin_path: Path | list[str] | None = None) -> Path:
    """Render + write the notifier unit into the user systemd directory.

    Paired with [`install_service`][terok_clearance.runtime.installer.install_service]: headless hosts that installed
    the hub + verdict pair can opt into the desktop notifier later by
    calling only this function.  Daemon-reloads once at the end.

    Args:
        bin_path: ``Path`` to the notifier launcher, or a ``list[str]``
            argv.  ``None`` (the default) renders
            ``python -m terok_clearance.notifier.app`` against the
            running interpreter — same rationale as [`install_service`][terok_clearance.runtime.installer.install_service].

    Returns:
        The on-disk path of the written unit file.
    """
    bin_rendered = _render_exec_start(
        bin_path if bin_path is not None else list(_DEFAULT_NOTIFIER_ARGV)
    )
    dest_dir = _user_systemd_dir()
    dest_dir.mkdir(parents=True, exist_ok=True)
    template = _read_template(NOTIFIER_UNIT_NAME)
    rendered = template.replace("{{UNIT_VERSION}}", str(_NOTIFIER_UNIT_VERSION)).replace(
        "{{BIN}}", bin_rendered
    )
    dest = dest_dir / NOTIFIER_UNIT_NAME
    dest.write_text(rendered)
    _daemon_reload()
    return dest

read_installed_unit_version()

Return the hub unit's # terok-clearance-hub-version: stamp, or None.

None is either "unit not installed" or "unit installed without a marker" (the pre-split legacy unit) — check_units_outdated differentiates between those in its operator-facing message.

Source code in src/terok_clearance/runtime/installer.py
def read_installed_unit_version() -> int | None:
    """Return the hub unit's ``# terok-clearance-hub-version:`` stamp, or ``None``.

    ``None`` is either "unit not installed" or "unit installed without
    a marker" (the pre-split legacy unit) — ``check_units_outdated``
    differentiates between those in its operator-facing message.
    """
    return _version_for(HUB_UNIT_NAME, _HUB[1])

uninstall_notifier_service()

Disable + unlink the notifier unit; daemon-reload once.

Symmetric teardown for install_notifier_service. Soft-fail on every step so a half-installed tree still ends up clean.

Source code in src/terok_clearance/runtime/installer.py
def uninstall_notifier_service() -> None:
    """Disable + unlink the notifier unit; daemon-reload once.

    Symmetric teardown for [`install_notifier_service`][terok_clearance.runtime.installer.install_notifier_service].  Soft-fail
    on every step so a half-installed tree still ends up clean.
    """
    _disable_and_unlink(NOTIFIER_UNIT_NAME)
    _daemon_reload()

uninstall_service()

Disable + unlink both new units + any pre-split legacy leftover.

Symmetric teardown for install_serviceterok uninstall calls this instead of rolling its own systemctl + unlink sequence. Daemon-reloads once at the end so systemd's in-memory registry drops the now-missing units. All individual steps soft-fail so a half-installed tree still ends up clean.

Source code in src/terok_clearance/runtime/installer.py
def uninstall_service() -> None:
    """Disable + unlink both new units + any pre-split legacy leftover.

    Symmetric teardown for [`install_service`][terok_clearance.runtime.installer.install_service] — ``terok uninstall``
    calls this instead of rolling its own systemctl + unlink sequence.
    Daemon-reloads once at the end so systemd's in-memory registry
    drops the now-missing units.  All individual steps soft-fail so a
    half-installed tree still ends up clean.
    """
    for name in (HUB_UNIT_NAME, VERDICT_UNIT_NAME, _LEGACY_UNIT_NAME):
        _disable_and_unlink(name)
    _daemon_reload()

configure_logging(level=logging.INFO)

Send INFO-level logs to stderr so journald / systemd pick them up.

Source code in src/terok_clearance/runtime/service.py
def configure_logging(level: int = logging.INFO) -> None:
    """Send INFO-level logs to stderr so journald / systemd pick them up."""
    logging.basicConfig(
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
        level=level,
        stream=sys.stderr,
    )

wait_for_shutdown_signal() async

Block the current task until SIGINT or SIGTERM arrives.

Source code in src/terok_clearance/runtime/service.py
async def wait_for_shutdown_signal() -> None:  # pragma: no cover — real signals
    """Block the current task until ``SIGINT`` or ``SIGTERM`` arrives."""
    stop = asyncio.Event()
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, stop.set)
    await stop.wait()

default_clearance_socket_path()

Return the canonical clearance-socket path under $XDG_RUNTIME_DIR.

Source code in src/terok_clearance/wire/socket.py
def default_clearance_socket_path() -> Path:
    """Return the canonical clearance-socket path under ``$XDG_RUNTIME_DIR``."""
    return runtime_socket_path(_CLEARANCE_SOCKET_BASENAME)