The clearance hub — varlink server + reader ingester + verdict exec.
Fans reader-emitted events (blocks, container lifecycle, shield state)
out to every connected clearance client, and applies verdicts the
clients send back by shelling out to terok-shield allow|deny. The
only D-Bus in sight is what individual clients choose to use on their
own (the desktop notifier reaches for org.freedesktop.Notifications
out-of-band); the hub itself speaks plain unix-socket varlink.
Authorisation is structural: the socket is mode 0600 (same-UID only),
and every Verdict call must cite a (container, request_id, dest)
triple the hub actually emitted via connection_blocked. The
triple is recorded at emit time and dropped on verdict or lifecycle
change; anything that doesn't match is a UnknownRequest or
VerdictTupleMismatch refusal.
ClearanceHub(*, clearance_socket=None, reader_socket=None, verdict_client=None)
Server for the org.terok.Clearance1 interface.
Owns three pieces of state:
_subscribers — a set of bounded per-connection queues; the hub
puts a ClearanceEvent on each one every time the reader
ingester delivers an event. Slow clients see their oldest events
dropped; fast clients aren't affected.
_live_verdicts — the request_id → (container, dest) map
the Verdict method checks for the authz binding.
- An
EventIngester bound to the canonical reader socket.
Lifecycle: start brings everything up; stop tears
it down under individual timeouts so a flaky bus or a stuck
subscriber can't burn systemd's stop-sigterm deadline.
Configure the two sockets and the verdict-helper client.
verdict_client is injected so tests can stub out shield exec
without spawning the helper process. Production callers leave
it defaulted — a fresh VerdictClient pointing at the
canonical helper socket.
Source code in src/terok_clearance/hub/server.py
| def __init__(
self,
*,
clearance_socket: Path | None = None,
reader_socket: Path | None = None,
verdict_client: VerdictClient | None = None,
) -> None:
"""Configure the two sockets and the verdict-helper client.
``verdict_client`` is injected so tests can stub out shield exec
without spawning the helper process. Production callers leave
it defaulted — a fresh [`VerdictClient`][terok_clearance.hub.server.VerdictClient] pointing at the
canonical helper socket.
"""
self._clearance_socket = clearance_socket or default_clearance_socket_path()
self._reader_socket = reader_socket # None → EventIngester picks its default.
self._verdict_client = verdict_client or VerdictClient()
self._subscribers: set[asyncio.Queue[ClearanceEvent]] = set()
# request_id → (container, dest) the hub emitted in the matching
# ConnectionBlocked; Verdict calls must cite a triple that matches.
self._live_verdicts: dict[str, tuple[str, str]] = {}
self._ingester: EventIngester | None = None
self._varlink_server: object | None = None # asyncvarlink's UnixServer
|
start()
async
Bring the ingester + varlink server online and accept clients.
Transactional: if the varlink bind fails after the ingester is
already listening, the ingester is stopped before the exception
propagates so a half-started hub doesn't leak a live
reader-side socket on systemd restart paths.
Source code in src/terok_clearance/hub/server.py
| async def start(self) -> None:
"""Bring the ingester + varlink server online and accept clients.
Transactional: if the varlink bind fails after the ingester is
already listening, the ingester is stopped before the exception
propagates so a half-started hub doesn't leak a live
reader-side socket on systemd restart paths.
"""
self._ingester = EventIngester(
socket_path=self._reader_socket or _default_reader_socket(),
on_event=self._relay_reader_event,
)
await self._ingester.start()
try:
registry = VarlinkInterfaceRegistry()
registry.register_interface(
Clearance1Interface(
event_stream_factory=self._subscribe,
apply_verdict=self._apply_verdict,
)
)
registry.register_interface(
VarlinkServiceInterface(
vendor="terok",
product="terok-clearance",
version=_own_version(),
url="https://github.com/terok-ai/terok-clearance",
registry=registry,
)
)
from terok_clearance.wire.socket import bind_hardened
async def _factory(path: str) -> object:
return await create_unix_server(registry.protocol_factory, path=path)
self._varlink_server = await bind_hardened(
_factory, self._clearance_socket, "clearance"
)
except BaseException:
with contextlib.suppress(Exception):
await self._ingester.stop()
self._ingester = None
raise
_log.info("clearance hub online at %s", self._clearance_socket)
|
stop()
async
Close the varlink server + ingester; drain subscriber queues.
Source code in src/terok_clearance/hub/server.py
| async def stop(self) -> None:
"""Close the varlink server + ingester; drain subscriber queues."""
if self._varlink_server is not None:
# ``close()`` on its own only stops accepting new connections;
# existing subscribers would sit forever in ``queue.get()`` and
# ``wait_closed`` would hang until the timeout fires.
# ``close_clients()`` walks the live transports and closes them,
# which makes the server-side ``_call_async_method_more``'s
# next ``send_reply`` fail with OSError — that in turn calls
# ``generator.aclose()`` on the subscriber, propagating cleanly
# through to our ``finally`` block. This avoids the
# assertion asyncvarlink fires when a streaming generator
# ends "normally" with ``continues=True`` on the last reply.
self._varlink_server.close()
with contextlib.suppress(AttributeError):
self._varlink_server.close_clients()
with contextlib.suppress(TimeoutError, Exception):
await asyncio.wait_for(self._varlink_server.wait_closed(), timeout=1.0)
self._varlink_server = None
if self._ingester is not None:
with contextlib.suppress(Exception):
await self._ingester.stop()
self._ingester = None
with contextlib.suppress(Exception):
await self._verdict_client.stop()
self._subscribers.clear()
self._live_verdicts.clear()
|
serve()
async
Run the hub service until SIGINT/SIGTERM.
The entry point terok-clearance serve hands off here. Blocks forever
on a signal-set asyncio.Event; systemd's SIGTERM flips it,
then stop tears down the server under a timeout.
Source code in src/terok_clearance/hub/server.py
| async def serve() -> None: # pragma: no cover — integration path
"""Run the hub service until SIGINT/SIGTERM.
The entry point ``terok-clearance serve`` hands off here. Blocks forever
on a signal-set [`asyncio.Event`][asyncio.Event]; systemd's SIGTERM flips it,
then [`stop`][terok_clearance.hub.server.ClearanceHub.stop] tears down the server under a timeout.
"""
from terok_clearance.runtime.service import configure_logging, wait_for_shutdown_signal
configure_logging()
hub = ClearanceHub()
await hub.start()
try:
await wait_for_shutdown_signal()
finally:
await hub.stop()
|