Subscribe to Shield1 and Clearance1 D-Bus signals and present desktop notifications.
Creates desktop notifications with Allow/Deny action buttons for blocked
connections (Shield) and clearance requests (Clearance). Operator actions are
routed back as Verdict / Resolve D-Bus method calls.
Parameters:
| Name |
Type |
Description |
Default |
notifier
|
Notifier
|
Desktop notification backend.
|
required
|
bus
|
MessageBus | None
|
Optional pre-connected MessageBus (for testing). If None,
a new session-bus connection is created on start().
|
None
|
Initialise the subscriber with a notifier and optional bus.
Source code in src/terok_dbus/_subscriber.py
| def __init__(self, notifier: Notifier, bus: MessageBus | None = None) -> None:
"""Initialise the subscriber with a notifier and optional bus."""
self._notifier = notifier
self._bus = bus
self._owns_bus = bus is None
self._shield_iface: Any | None = None
self._clearance_iface: Any | None = None
self._pending: dict[int, str] = {} # notification_id → request_id
self._tasks: set[asyncio.Task[None]] = set()
|
start()
async
Connect to the session bus and subscribe to Shield1 and Clearance1 signals.
Source code in src/terok_dbus/_subscriber.py
| async def start(self) -> None:
"""Connect to the session bus and subscribe to Shield1 and Clearance1 signals."""
if self._bus is None:
self._bus = await MessageBus().connect()
shield_node = Node.parse(SHIELD_XML)
shield_proxy = self._bus.get_proxy_object(SHIELD_BUS_NAME, SHIELD_OBJECT_PATH, shield_node)
self._shield_iface = shield_proxy.get_interface(SHIELD_INTERFACE_NAME)
self._shield_iface.on_connection_blocked(self._on_connection_blocked)
self._shield_iface.on_verdict_applied(self._on_verdict_applied)
_log.info("Subscribed to %s", SHIELD_INTERFACE_NAME)
clearance_node = Node.parse(CLEARANCE_XML)
clearance_proxy = self._bus.get_proxy_object(
CLEARANCE_BUS_NAME, CLEARANCE_OBJECT_PATH, clearance_node
)
self._clearance_iface = clearance_proxy.get_interface(CLEARANCE_INTERFACE_NAME)
self._clearance_iface.on_request_received(self._on_request_received)
self._clearance_iface.on_request_resolved(self._on_request_resolved)
_log.info("Subscribed to %s", CLEARANCE_INTERFACE_NAME)
|
stop()
async
Unsubscribe from signals and disconnect the bus if owned.
Source code in src/terok_dbus/_subscriber.py
| async def stop(self) -> None:
"""Unsubscribe from signals and disconnect the bus if owned."""
for task in self._tasks:
task.cancel()
await asyncio.sleep(0) # yield to let cancellations propagate
self._tasks.clear()
if self._shield_iface is not None:
if hasattr(self._shield_iface, "off_connection_blocked"):
self._shield_iface.off_connection_blocked(self._on_connection_blocked)
if hasattr(self._shield_iface, "off_verdict_applied"):
self._shield_iface.off_verdict_applied(self._on_verdict_applied)
if self._clearance_iface is not None:
if hasattr(self._clearance_iface, "off_request_received"):
self._clearance_iface.off_request_received(self._on_request_received)
if hasattr(self._clearance_iface, "off_request_resolved"):
self._clearance_iface.off_request_resolved(self._on_request_resolved)
if self._owns_bus and self._bus is not None:
self._bus.disconnect()
self._shield_iface = None
self._clearance_iface = None
self._pending.clear()
|