Skip to content

client

client

Transport-only varlink client for org.terok.Clearance1.

Connects to the hub over the clearance unix socket, streams events via a background subscriber task, and exposes verdict() for the companion RPC channel. Doesn't know anything about notification rendering or desktop state — that's EventSubscriber's job and lives one module up.

Why two connections: varlink is strictly serial per connection (one reply-at-a-time, no multiplexing). A long-lived Subscribe(more=true) would block every Verdict() call on the same transport, so we open one connection for the event stream and a second for the RPC path.

EventCallback = Callable[[ClearanceEvent], Awaitable[None]] module-attribute

ClearanceClient(*, socket_path=None)

Thin async client for the Clearance1 varlink service.

Two async coroutines to drive:

  • start — open the subscribe + RPC connections and begin relaying events to the user-supplied callback. Returns once both channels are live; events arrive via on_event from then on.
  • verdict — RPC call; returns True if terok-shield applied the action, False on any refusal or shield failure. The refusal reason is logged at WARNING.

The callback runs on the same event loop as the rest of the client; exceptions it raises are logged and swallowed so one bad handler can't kill the stream for every subsequent event.

Remember the target socket; defaults to default_clearance_socket_path.

Source code in src/terok_clearance/client/client.py
def __init__(self, *, socket_path: Path | None = None) -> None:
    """Remember the target socket; defaults to [`default_clearance_socket_path`][terok_clearance.client.client.default_clearance_socket_path]."""
    self._socket_path = socket_path or default_clearance_socket_path()
    self._on_event: EventCallback | None = None
    self._sub_transport: object | None = None
    self._rpc_transport: object | None = None
    self._sub_proxy: object | None = None
    self._rpc_proxy: object | None = None
    self._stream_task: asyncio.Task[None] | None = None
    self._stopping = False
    # Set by [`poke_reconnect`][terok_clearance.client.client.ClearanceClient.poke_reconnect]; awaited inside the back-off
    # window.  Constructed here (not lazily) so a focus-gain poke
    # that lands between ``start()`` and the first ``_run_stream``
    # iteration isn't silently dropped.
    self._reconnect_poke = asyncio.Event()

start(on_event) async

Open both connections and begin relaying events to on_event.

The initial connect is awaited synchronously so callers see start() return only after the subscription is live — a hub that's down at startup still propagates as an exception. Subsequent drops are handled by _run_stream's internal reconnect loop so long-running consumers (TUI, notifier) survive a systemctl restart terok-clearance without restarting themselves.

Source code in src/terok_clearance/client/client.py
async def start(self, on_event: EventCallback) -> None:
    """Open both connections and begin relaying events to *on_event*.

    The initial connect is awaited synchronously so callers see
    ``start()`` return only after the subscription is live — a
    hub that's down at startup still propagates as an exception.
    Subsequent drops are handled by `_run_stream`'s internal
    reconnect loop so long-running consumers (TUI, notifier)
    survive a ``systemctl restart terok-clearance`` without
    restarting themselves.
    """
    self._on_event = on_event
    self._stopping = False
    await self._connect()
    self._stream_task = asyncio.create_task(self._run_stream())

stop() async

Close both connections and await the stream task.

Source code in src/terok_clearance/client/client.py
async def stop(self) -> None:
    """Close both connections and await the stream task."""
    self._stopping = True
    if self._stream_task is not None:
        self._stream_task.cancel()
        with contextlib.suppress(asyncio.CancelledError, Exception):
            await self._stream_task
        self._stream_task = None
    self._close_transports()

poke_reconnect()

Skip any in-flight reconnect back-off and retry immediately.

Idempotent; a no-op when the stream is healthy because the event is only awaited inside _run_stream's back-off window.

Source code in src/terok_clearance/client/client.py
def poke_reconnect(self) -> None:
    """Skip any in-flight reconnect back-off and retry immediately.

    Idempotent; a no-op when the stream is healthy because the
    event is only awaited inside `_run_stream`'s back-off
    window.
    """
    self._reconnect_poke.set()

verdict(container, request_id, dest, action) async

Apply action (allow / deny) to dest via the hub's Verdict RPC.

Returns True when the hub accepted and applied the verdict, False for any refusal (unknown request_id, tuple mismatch, invalid action, shield-exec failure). Callers typically ignore the return value and let the subsequent verdict_applied event drive UI updates; refusal reasons are logged at WARNING.

Source code in src/terok_clearance/client/client.py
async def verdict(self, container: str, request_id: str, dest: str, action: str) -> bool:
    """Apply *action* (``allow`` / ``deny``) to *dest* via the hub's ``Verdict`` RPC.

    Returns ``True`` when the hub accepted and applied the verdict,
    ``False`` for any refusal (unknown request_id, tuple mismatch,
    invalid action, shield-exec failure).  Callers typically ignore
    the return value and let the subsequent ``verdict_applied``
    event drive UI updates; refusal reasons are logged at WARNING.
    """
    if self._rpc_proxy is None:
        _log.error("verdict() called before start()")
        return False
    try:
        reply = await self._rpc_proxy.Verdict(
            container=container,
            request_id=request_id,
            dest=dest,
            action=action,
        )
    except VarlinkErrorReply as err:
        _log.warning(
            "Verdict refused for %s (%s%s): %s",
            container,
            request_id,
            action,
            err,
        )
        return False
    # reply is {"ok": bool} per the return_parameter wrapper.
    return bool(reply.get("ok", False))