Skip to content

_subscriber

_subscriber

Event subscriber bridging Shield1/Clearance1 D-Bus signals to desktop notifications.

EventSubscriber(notifier, bus=None)

Subscribe to Shield1 and Clearance1 D-Bus signals and present desktop notifications.

Creates desktop notifications with Allow/Deny action buttons for blocked connections (Shield) and clearance requests (Clearance). Operator actions are routed back as Verdict / Resolve D-Bus method calls.

Parameters:

Name Type Description Default
notifier Notifier

Desktop notification backend.

required
bus MessageBus | None

Optional pre-connected MessageBus (for testing). If None, a new session-bus connection is created on start().

None

Initialise the subscriber with a notifier and optional bus.

Source code in src/terok_dbus/_subscriber.py
def __init__(self, notifier: Notifier, bus: MessageBus | None = None) -> None:
    """Initialise the subscriber with a notifier and optional bus."""
    self._notifier = notifier
    self._bus = bus
    self._owns_bus = bus is None
    self._shield_iface: Any | None = None
    self._clearance_iface: Any | None = None
    self._pending: dict[int, str] = {}  # notification_id → request_id
    self._tasks: set[asyncio.Task[None]] = set()

start() async

Connect to the session bus and subscribe to Shield1 and Clearance1 signals.

Source code in src/terok_dbus/_subscriber.py
async def start(self) -> None:
    """Connect to the session bus and subscribe to Shield1 and Clearance1 signals."""
    if self._bus is None:
        self._bus = await MessageBus().connect()

    shield_node = Node.parse(SHIELD_XML)
    shield_proxy = self._bus.get_proxy_object(SHIELD_BUS_NAME, SHIELD_OBJECT_PATH, shield_node)
    self._shield_iface = shield_proxy.get_interface(SHIELD_INTERFACE_NAME)
    self._shield_iface.on_connection_blocked(self._on_connection_blocked)
    self._shield_iface.on_verdict_applied(self._on_verdict_applied)
    _log.info("Subscribed to %s", SHIELD_INTERFACE_NAME)

    clearance_node = Node.parse(CLEARANCE_XML)
    clearance_proxy = self._bus.get_proxy_object(
        CLEARANCE_BUS_NAME, CLEARANCE_OBJECT_PATH, clearance_node
    )
    self._clearance_iface = clearance_proxy.get_interface(CLEARANCE_INTERFACE_NAME)
    self._clearance_iface.on_request_received(self._on_request_received)
    self._clearance_iface.on_request_resolved(self._on_request_resolved)
    _log.info("Subscribed to %s", CLEARANCE_INTERFACE_NAME)

stop() async

Unsubscribe from signals and disconnect the bus if owned.

Source code in src/terok_dbus/_subscriber.py
async def stop(self) -> None:
    """Unsubscribe from signals and disconnect the bus if owned."""
    for task in self._tasks:
        task.cancel()
    await asyncio.sleep(0)  # yield to let cancellations propagate
    self._tasks.clear()

    if self._shield_iface is not None:
        if hasattr(self._shield_iface, "off_connection_blocked"):
            self._shield_iface.off_connection_blocked(self._on_connection_blocked)
        if hasattr(self._shield_iface, "off_verdict_applied"):
            self._shield_iface.off_verdict_applied(self._on_verdict_applied)
    if self._clearance_iface is not None:
        if hasattr(self._clearance_iface, "off_request_received"):
            self._clearance_iface.off_request_received(self._on_request_received)
        if hasattr(self._clearance_iface, "off_request_resolved"):
            self._clearance_iface.off_request_resolved(self._on_request_resolved)

    if self._owns_bus and self._bus is not None:
        self._bus.disconnect()

    self._shield_iface = None
    self._clearance_iface = None
    self._pending.clear()