Skip to content

ingester

ingester

Unix-socket ingester that relays container events onto the session bus.

Per-container NFLOG readers live in NS_ROOTLESS (the rootless-podman user namespace that owns the container netns). From there, the session dbus-daemon's SO_PEERCRED check rejects their connection attempts — even when DBUS_SESSION_BUS_ADDRESS points at the right socket.

The hub runs in the host user namespace, so it can reach the session bus. EventIngester gives the readers a pipe to cross: it owns a unix socket that accepts line-delimited JSON events from any local connection, decodes them, and calls the matching ShieldHub signal methods on the bus — where emission works.

One socket per hub, one hub per user session. Readers reconnect on their own if the hub restarts; the hub tolerates disconnected readers without logging.

EventIngester(*, socket_path, on_event)

Accepts JSON event lines from container readers and forwards to the hub.

Keeps ownership of one AF_UNIX listener and a set of accepted-connection handler tasks. Socket file mode is 0600: only the hub's running user can read or write to it, matching the session bus's own ACL model.

Bind the ingester to a filesystem path and a sink coroutine.

Parameters:

Name Type Description Default
socket_path Path

Where the listening AF_UNIX socket will live. The path is unlinked first if a stale file exists, so a crashed previous run doesn't deadlock startup.

required
on_event Callable[[dict], Awaitable[None]]

Coroutine the ingester awaits once per parsed event. Expected to emit the corresponding D-Bus signal; exceptions raised here are logged and swallowed so one bad event can't tear down the ingester.

required
Source code in src/terok_clearance/hub/ingester.py
def __init__(
    self,
    *,
    socket_path: Path,
    on_event: Callable[[dict], Awaitable[None]],
) -> None:
    """Bind the ingester to a filesystem path and a sink coroutine.

    Args:
        socket_path: Where the listening AF_UNIX socket will live.  The
            path is unlinked first if a stale file exists, so a crashed
            previous run doesn't deadlock startup.
        on_event: Coroutine the ingester awaits once per parsed event.
            Expected to emit the corresponding D-Bus signal; exceptions
            raised here are logged and swallowed so one bad event can't
            tear down the ingester.
    """
    self._socket_path = socket_path
    self._on_event = on_event
    self._server: asyncio.AbstractServer | None = None
    self._clients: set[asyncio.Task] = set()

start() async

Bind the socket and start accepting connections in the background.

Source code in src/terok_clearance/hub/ingester.py
async def start(self) -> None:
    """Bind the socket and start accepting connections in the background."""
    from terok_clearance.wire.socket import bind_hardened

    async def _factory(path: str) -> asyncio.AbstractServer:
        return await asyncio.start_unix_server(self._handle_client, path=path)

    self._server = await bind_hardened(_factory, self._socket_path, "ingester")
    _log.info("event ingester listening on %s", self._socket_path)

stop() async

Close the server and await any in-flight client tasks.

Source code in src/terok_clearance/hub/ingester.py
async def stop(self) -> None:
    """Close the server and await any in-flight client tasks."""
    # Cancel client handlers *before* awaiting ``wait_closed()``: from
    # Python 3.12.1 onwards the server tracks active connections and
    # ``wait_closed()`` blocks until every one of them returns.  If we
    # waited first we'd deadlock against our own accepted tasks.
    if self._server is not None:
        self._server.close()
    # Snapshot once: each ``await task`` below yields to the event loop
    # which resumes the handler's ``finally`` and discards itself from
    # ``self._clients``.  Iterating the live set while that happens would
    # raise ``RuntimeError: Set changed size during iteration``.
    pending = tuple(self._clients)
    for task in pending:
        task.cancel()
    for task in pending:
        with contextlib.suppress(asyncio.CancelledError, Exception):
            await task
    if self._server is not None:
        await self._server.wait_closed()
        self._server = None
    with contextlib.suppress(FileNotFoundError):
        self._socket_path.unlink()

default_socket_path()

Return the canonical ingester path under $XDG_RUNTIME_DIR.

Source code in src/terok_clearance/hub/ingester.py
def default_socket_path() -> Path:
    """Return the canonical ingester path under ``$XDG_RUNTIME_DIR``."""
    from terok_clearance.wire.socket import runtime_socket_path

    return runtime_socket_path(_SOCKET_BASENAME)