Skip to content

subscriber

subscriber

Render clearance-hub events as desktop notifications.

Turns the event stream from ClearanceClient into calls on an injected Notifier: a block arrives, the operator sees a popup with Allow/Deny actions, clicks route back to the hub as a Verdict. Live-block dedup, shield-down popup tracking, and task-identity resolution live here because they're all presentation concerns — the hub stays transport-only.

EventSubscriber(notifier, client=None, *, identity_resolver=None, socket_path=None)

Bridge clearance-hub events into desktop notifications.

Owns the presentation-layer state a rendering client needs: live-block dedup keyed on (container, target), the tracked ShieldDown popup per container so ShieldUp can retire it, and verdict routing through notifier action callbacks.

Parameters:

Name Type Description Default
notifier Notifier

Desktop notification backend (any Notifier works).

required
client ClearanceClient | None

Pre-configured ClearanceClient. When omitted, one is created on start pointing at socket_path (defaulting to default_clearance_socket_path).

None
identity_resolver Callable[[str], ContainerIdentity] | None

Turns a short container ID into a ContainerIdentity so terok task annotations surface as "Task: project/task_id · name" bodies. Called from a worker thread so a slow podman inspect doesn't stall the event loop. None renders the raw container ID.

None
socket_path Path | None

Clearance-socket override when client isn't supplied (tests).

None

Initialise the subscriber with a notifier and transport.

Source code in src/terok_clearance/client/subscriber.py
def __init__(
    self,
    notifier: Notifier,
    client: ClearanceClient | None = None,
    *,
    identity_resolver: Callable[[str], ContainerIdentity] | None = None,
    socket_path: Path | None = None,
) -> None:
    """Initialise the subscriber with a notifier and transport."""
    self._notifier = notifier
    self._client = client or ClearanceClient(socket_path=socket_path)
    self._identity_resolver = identity_resolver
    # request_id → pending block + its notification.
    self._pending: dict[str, _PendingBlock] = {}
    # container → notification_id of the active ShieldDown popup, so
    # ShieldUp can close the matching one before firing its brief
    # confirmation.  A stale "Shield DOWN" popup after shield is back
    # is a security hazard, not a benign leftover.
    self._shield_down_notifs: dict[str, int] = {}
    # Background action / lifecycle tasks we spawn.
    self._tasks: set[asyncio.Task[None]] = set()

start() async

Connect to the clearance hub and begin rendering its event stream.

Source code in src/terok_clearance/client/subscriber.py
async def start(self) -> None:
    """Connect to the clearance hub and begin rendering its event stream."""
    await self._client.start(self._on_event)
    _log.info("clearance subscriber online")

stop() async

Drain pending tasks and close the transport.

Closes the client first so no new handler tasks are scheduled, then awaits the currently-tracked tasks to settle (with their own CancelledError suppressed). A bare sleep(0) would yield only one loop turn — not enough for cancellation to propagate through chained awaits — and tasks.clear() on its own would drop references to tasks still writing to handles we then close.

Source code in src/terok_clearance/client/subscriber.py
async def stop(self) -> None:
    """Drain pending tasks and close the transport.

    Closes the client first so no new handler tasks are scheduled,
    then awaits the currently-tracked tasks to settle (with their
    own ``CancelledError`` suppressed).  A bare ``sleep(0)`` would
    yield only one loop turn — not enough for cancellation to
    propagate through chained awaits — and ``tasks.clear()`` on its
    own would drop references to tasks still writing to handles we
    then close.
    """
    tasks = list(self._tasks)
    for task in tasks:
        task.cancel()
    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)
    self._tasks.clear()
    await self._client.stop()
    self._pending.clear()
    self._shield_down_notifs.clear()

poke_reconnect()

Cut short any in-flight reconnect back-off — forwards to the client.

Source code in src/terok_clearance/client/subscriber.py
def poke_reconnect(self) -> None:
    """Cut short any in-flight reconnect back-off — forwards to the client."""
    self._client.poke_reconnect()