Skip to content

server

server

The clearance hub — varlink server + reader ingester + verdict exec.

Fans reader-emitted events (blocks, container lifecycle, shield state) out to every connected clearance client, and applies verdicts the clients send back by shelling out to terok-shield allow|deny. The only D-Bus in sight is what individual clients choose to use on their own (the desktop notifier reaches for org.freedesktop.Notifications out-of-band); the hub itself speaks plain unix-socket varlink.

Authorisation is structural: the socket is mode 0600 (same-UID only), and every Verdict call must cite a (container, request_id, dest) triple the hub actually emitted via connection_blocked. The triple is recorded at emit time and dropped on verdict or lifecycle change; anything that doesn't match is a UnknownRequest or VerdictTupleMismatch refusal.

ClearanceHub(*, clearance_socket=None, reader_socket=None, verdict_client=None)

Server for the org.terok.Clearance1 interface.

Owns three pieces of state:

  • _subscribers — a set of bounded per-connection queues; the hub puts a ClearanceEvent on each one every time the reader ingester delivers an event. Slow clients see their oldest events dropped; fast clients aren't affected.
  • _live_verdicts — the request_id → (container, dest) map the Verdict method checks for the authz binding.
  • An EventIngester bound to the canonical reader socket.

Lifecycle: start brings everything up; stop tears it down under individual timeouts so a flaky bus or a stuck subscriber can't burn systemd's stop-sigterm deadline.

Configure the two sockets and the verdict-helper client.

verdict_client is injected so tests can stub out shield exec without spawning the helper process. Production callers leave it defaulted — a fresh VerdictClient pointing at the canonical helper socket.

Source code in src/terok_clearance/hub/server.py
def __init__(
    self,
    *,
    clearance_socket: Path | None = None,
    reader_socket: Path | None = None,
    verdict_client: VerdictClient | None = None,
) -> None:
    """Configure the two sockets and the verdict-helper client.

    ``verdict_client`` is injected so tests can stub out shield exec
    without spawning the helper process.  Production callers leave
    it defaulted — a fresh [`VerdictClient`][terok_clearance.hub.server.VerdictClient] pointing at the
    canonical helper socket.
    """
    self._clearance_socket = clearance_socket or default_clearance_socket_path()
    self._reader_socket = reader_socket  # None → EventIngester picks its default.
    self._verdict_client = verdict_client or VerdictClient()

    self._subscribers: set[asyncio.Queue[ClearanceEvent]] = set()
    # request_id → (container, dest) the hub emitted in the matching
    # ConnectionBlocked; Verdict calls must cite a triple that matches.
    self._live_verdicts: dict[str, tuple[str, str]] = {}

    self._ingester: EventIngester | None = None
    self._varlink_server: object | None = None  # asyncvarlink's UnixServer

start() async

Bring the ingester + varlink server online and accept clients.

Transactional: if the varlink bind fails after the ingester is already listening, the ingester is stopped before the exception propagates so a half-started hub doesn't leak a live reader-side socket on systemd restart paths.

Source code in src/terok_clearance/hub/server.py
async def start(self) -> None:
    """Bring the ingester + varlink server online and accept clients.

    Transactional: if the varlink bind fails after the ingester is
    already listening, the ingester is stopped before the exception
    propagates so a half-started hub doesn't leak a live
    reader-side socket on systemd restart paths.
    """
    self._ingester = EventIngester(
        socket_path=self._reader_socket or _default_reader_socket(),
        on_event=self._relay_reader_event,
    )
    await self._ingester.start()
    try:
        registry = VarlinkInterfaceRegistry()
        registry.register_interface(
            Clearance1Interface(
                event_stream_factory=self._subscribe,
                apply_verdict=self._apply_verdict,
            )
        )
        registry.register_interface(
            VarlinkServiceInterface(
                vendor="terok",
                product="terok-clearance",
                version=_own_version(),
                url="https://github.com/terok-ai/terok-clearance",
                registry=registry,
            )
        )

        from terok_clearance.wire.socket import bind_hardened

        async def _factory(path: str) -> object:
            return await create_unix_server(registry.protocol_factory, path=path)

        self._varlink_server = await bind_hardened(
            _factory, self._clearance_socket, "clearance"
        )
    except BaseException:
        with contextlib.suppress(Exception):
            await self._ingester.stop()
        self._ingester = None
        raise
    _log.info("clearance hub online at %s", self._clearance_socket)

stop() async

Close the varlink server + ingester; drain subscriber queues.

Source code in src/terok_clearance/hub/server.py
async def stop(self) -> None:
    """Close the varlink server + ingester; drain subscriber queues."""
    if self._varlink_server is not None:
        # ``close()`` on its own only stops accepting new connections;
        # existing subscribers would sit forever in ``queue.get()`` and
        # ``wait_closed`` would hang until the timeout fires.
        # ``close_clients()`` walks the live transports and closes them,
        # which makes the server-side ``_call_async_method_more``'s
        # next ``send_reply`` fail with OSError — that in turn calls
        # ``generator.aclose()`` on the subscriber, propagating cleanly
        # through to our ``finally`` block.  This avoids the
        # assertion asyncvarlink fires when a streaming generator
        # ends "normally" with ``continues=True`` on the last reply.
        self._varlink_server.close()
        with contextlib.suppress(AttributeError):
            self._varlink_server.close_clients()
        with contextlib.suppress(TimeoutError, Exception):
            await asyncio.wait_for(self._varlink_server.wait_closed(), timeout=1.0)
        self._varlink_server = None
    if self._ingester is not None:
        with contextlib.suppress(Exception):
            await self._ingester.stop()
        self._ingester = None
    with contextlib.suppress(Exception):
        await self._verdict_client.stop()
    self._subscribers.clear()
    self._live_verdicts.clear()

serve() async

Run the hub service until SIGINT/SIGTERM.

The entry point terok-clearance serve hands off here. Blocks forever on a signal-set asyncio.Event; systemd's SIGTERM flips it, then stop tears down the server under a timeout.

Source code in src/terok_clearance/hub/server.py
async def serve() -> None:  # pragma: no cover — integration path
    """Run the hub service until SIGINT/SIGTERM.

    The entry point ``terok-clearance serve`` hands off here.  Blocks forever
    on a signal-set [`asyncio.Event`][asyncio.Event]; systemd's SIGTERM flips it,
    then [`stop`][terok_clearance.hub.server.ClearanceHub.stop] tears down the server under a timeout.
    """
    from terok_clearance.runtime.service import configure_logging, wait_for_shutdown_signal

    configure_logging()
    hub = ClearanceHub()
    await hub.start()
    try:
        await wait_for_shutdown_signal()
    finally:
        await hub.stop()