Skip to content

dbus_bridge

dbus_bridge

D-Bus event bridge for interactive NFLOG sessions.

Translates between :class:InteractiveSession's JSON-lines protocol and D-Bus org.terok.Shield1 signals/methods. Each bridge serves one container; MPRIS-style per-container bus names (org.terok.Shield1.Container_<short_id>) allow unlimited coexistence.

The bridge does not own the bus name — the caller (standalone CLI or orchestrator) acquires the name and passes the connected bus. This lets a single orchestrator manage multiple bridges on one bus connection.

Requires optional dependencies dbus-fast and terok-dbus. Install via poetry install --with dbus.

BUS_NAME_PREFIX = 'org.terok.Shield1.Container_' module-attribute

Per-container bus name prefix. Suffixed with the short container ID.

ShieldBridge(*, state_dir, container, bus)

D-Bus bridge for one container's interactive NFLOG session.

Spawns InteractiveSession (JSON-lines mode) as a subprocess that enters the container's network namespace via nsenter. Translates JSON-lines events to D-Bus Shield1 signals and routes verdicts from D-Bus method calls back to the subprocess's stdin.

Parameters:

Name Type Description Default
state_dir Path

Per-container state directory.

required
container str

Container name (used for nsenter and signal payloads).

required
bus MessageBus

Connected MessageBus instance (caller-owned).

required

Initialise the bridge with state directory, container name, and bus.

Source code in src/terok_shield/lib/dbus_bridge.py
def __init__(self, *, state_dir: Path, container: str, bus: MessageBus) -> None:
    """Initialise the bridge with state directory, container name, and bus."""
    self._state_dir = state_dir
    self._container = container
    self._bus = bus
    self._process: asyncio.subprocess.Process | None = None
    self._read_task: asyncio.Task[None] | None = None
    self._interface = _ShieldInterface(self)
    self._container_id: str | None = None

container_id property

Short container ID read from state_dir/container.id.

bus_name property

Per-container well-known bus name.

start() async

Spawn the interactive subprocess and begin the event relay loop.

Exports the Shield1 interface on the bus at /org/terok/Shield1, then reads JSON lines from the subprocess stdout and emits D-Bus signals for each event.

Source code in src/terok_shield/lib/dbus_bridge.py
async def start(self) -> None:
    """Spawn the interactive subprocess and begin the event relay loop.

    Exports the Shield1 interface on the bus at
    ``/org/terok/Shield1``, then reads JSON lines from the subprocess
    stdout and emits D-Bus signals for each event.
    """
    # Preload bus_name (file I/O) before any side effects so a
    # missing container.id raises before we export or spawn.
    name = self.bus_name

    self._bus.export(SHIELD_OBJECT_PATH, self._interface)
    try:
        env = {**os.environ, _RAW_ENV: "1"}
        env.pop(_NSENTER_ENV, None)
        self._process = await asyncio.create_subprocess_exec(
            sys.executable,
            "-m",
            "terok_shield.cli.interactive",
            str(self._state_dir),
            self._container,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=None,
            env=env,
        )
    except Exception:
        try:
            self._bus.unexport(SHIELD_OBJECT_PATH, self._interface)
        except Exception:
            logger.debug("Unexport failed during start rollback", exc_info=True)
        raise
    self._read_task = asyncio.create_task(self._read_loop())
    logger.info(
        "Bridge started for %s (bus name: %s, pid: %s)",
        self._container,
        name,
        self._process.pid,
    )

submit_verdict(request_id, action) async

Write a verdict command to the subprocess stdin.

Parameters:

Name Type Description Default
request_id str

Compound ID "{container}:{packet_id}".

required
action str

"accept" or "deny".

required

Returns:

Type Description
bool

True if the verdict was written successfully.

Source code in src/terok_shield/lib/dbus_bridge.py
async def submit_verdict(self, request_id: str, action: str) -> bool:
    """Write a verdict command to the subprocess stdin.

    Args:
        request_id: Compound ID ``"{container}:{packet_id}"``.
        action: ``"accept"`` or ``"deny"``.

    Returns:
        ``True`` if the verdict was written successfully.
    """
    if not self._process or self._process.stdin is None:
        logger.warning("Cannot submit verdict — subprocess not running")
        return False

    if action not in ("accept", "deny"):
        logger.warning("Invalid verdict action: %s", action)
        return False

    container, sep, packet_raw = request_id.partition(":")
    if not sep or not container:
        logger.warning("Invalid request_id format: %s", request_id)
        return False
    if container != self._container:
        logger.warning(
            "request_id container mismatch: expected=%s got=%s",
            self._container,
            container,
        )
        return False
    try:
        packet_id = int(packet_raw)
    except ValueError:
        logger.warning("Non-integer packet ID in request_id: %s", request_id)
        return False

    verdict = {"type": "verdict", "id": packet_id, "action": action}
    line = json.dumps(verdict, separators=(",", ":")) + "\n"
    try:
        self._process.stdin.write(line.encode())
        await self._process.stdin.drain()
    except (BrokenPipeError, ConnectionResetError):
        logger.warning("Subprocess stdin closed while writing verdict")
        return False
    return True

stop() async

Terminate the subprocess and clean up resources.

Runs cleanup to completion even if the caller's task is cancelled, then re-raises CancelledError.

Source code in src/terok_shield/lib/dbus_bridge.py
async def stop(self) -> None:
    """Terminate the subprocess and clean up resources.

    Runs cleanup to completion even if the caller's task is
    cancelled, then re-raises ``CancelledError``.
    """
    await self._cancel_read_task()
    cancelled = await self._terminate_process()
    self._unexport_bus()
    logger.info("Bridge stopped for %s", self._container)
    if cancelled:
        raise asyncio.CancelledError

bus_name_for_container(short_id)

Derive the per-container well-known bus name.

D-Bus bus name segments must start with [A-Za-z_], so hex IDs (which may start with a digit) are prefixed with Container_.

Source code in src/terok_shield/lib/dbus_bridge.py
def bus_name_for_container(short_id: str) -> str:
    """Derive the per-container well-known bus name.

    D-Bus bus name segments must start with ``[A-Za-z_]``, so hex IDs
    (which may start with a digit) are prefixed with ``Container_``.
    """
    return f"{BUS_NAME_PREFIX}{short_id}"