Skip to content

watchers

watchers

Live blocked-access event stream for shield watch.

Multiplexes three event sources into a single JSON-lines stream:

  1. DNS log — tails the per-container dnsmasq query log and emits events for blocked domain lookups.
  2. Audit log — tails audit.jsonl and surfaces shield lifecycle events (allow, deny, up, down, setup, teardown).
  3. NFLOG — reads denied packets via AF_NETLINK and emits events for raw-IP connections that bypassed DNS. Optional — graceful degradation when netlink is unavailable.

WatchEvent(ts, source, action, container, domain='', query_type='', dest='', detail='', port=0, proto=0, extra=dict()) dataclass

A single watch event emitted to the output stream.

Core fields (always present): ts, source, action, container. DNS-specific: domain, query_type. Audit/NFLOG: dest, detail, port, proto.

to_json()

Serialize to a compact JSON line, omitting empty optional fields.

Source code in src/terok_shield/lib/watchers/_event.py
def to_json(self) -> str:
    """Serialize to a compact JSON line, omitting empty optional fields."""
    d = {
        k: v
        for k, v in asdict(self).items()
        if v or k in ("ts", "source", "action", "container")
    }
    return json.dumps(d, separators=(",", ":"))

AuditLogWatcher(audit_path, container)

Tail audit.jsonl and yield events for shield lifecycle changes.

Open audit_path and seek to end.

Parameters:

Name Type Description Default
audit_path Path

Path to the per-container audit.jsonl file.

required
container str

Container name (for event metadata).

required
Source code in src/terok_shield/lib/watchers/audit_log.py
def __init__(self, audit_path: Path, container: str) -> None:
    """Open *audit_path* and seek to end.

    Args:
        audit_path: Path to the per-container ``audit.jsonl`` file.
        container: Container name (for event metadata).
    """
    self._audit_path = audit_path
    self._container = container
    audit_path.touch(exist_ok=True)
    self._fh = open(audit_path)  # noqa: SIM115 — needs fileno() for select
    self._fh.seek(0, os.SEEK_END)

fileno()

Return the file descriptor for select.select() multiplexing.

Source code in src/terok_shield/lib/watchers/audit_log.py
def fileno(self) -> int:
    """Return the file descriptor for ``select.select()`` multiplexing."""
    return self._fh.fileno()

close()

Close the underlying file handle.

Source code in src/terok_shield/lib/watchers/audit_log.py
def close(self) -> None:
    """Close the underlying file handle."""
    self._fh.close()

poll()

Read new audit lines and return watch events.

Source code in src/terok_shield/lib/watchers/audit_log.py
def poll(self) -> list[WatchEvent]:
    """Read new audit lines and return watch events."""
    events: list[WatchEvent] = []
    while line := self._fh.readline():
        line = line.strip()
        if not line:
            continue
        try:
            entry = json.loads(line)
        except json.JSONDecodeError:
            continue
        if not isinstance(entry, dict):
            continue
        events.append(
            WatchEvent(
                ts=entry.get("ts", datetime.now(UTC).isoformat()),
                source="audit",
                action=entry.get("action", "unknown"),
                container=entry.get("container", self._container),
                dest=entry.get("dest", ""),
                detail=entry.get("detail", ""),
            )
        )
    return events

DnsLogWatcher(log_path, state_dir, container)

Tail the dnsmasq query log and yield events for blocked domains.

Opens the log file, seeks to the end, and watches for new query lines.

Open log_path, seek to end, and load the initial allowed domain set.

Source code in src/terok_shield/lib/watchers/dns_log.py
def __init__(self, log_path: Path, state_dir: Path, container: str) -> None:
    """Open *log_path*, seek to end, and load the initial allowed domain set."""
    self._log_path = log_path
    self._state_dir = state_dir
    self._container = container
    self._fh = open(log_path)  # noqa: SIM115 — needs fileno() for select
    try:
        self._fh.seek(0, os.SEEK_END)
        self._allowed_domains: set[str] = set()
        self._last_refresh = 0.0
        self._refresh_domains()
    except Exception:
        self._fh.close()
        raise

fileno()

Return the file descriptor for select.select() multiplexing.

Source code in src/terok_shield/lib/watchers/dns_log.py
def fileno(self) -> int:
    """Return the file descriptor for ``select.select()`` multiplexing."""
    return self._fh.fileno()

close()

Close the underlying file handle.

Source code in src/terok_shield/lib/watchers/dns_log.py
def close(self) -> None:
    """Close the underlying file handle."""
    self._fh.close()

poll()

Read new lines and return events for blocked queries.

Source code in src/terok_shield/lib/watchers/dns_log.py
def poll(self) -> list[WatchEvent]:
    """Read new lines and return events for blocked queries."""
    if _monotonic() - self._last_refresh > _DOMAIN_REFRESH_INTERVAL:
        self._refresh_domains()

    events: list[WatchEvent] = []
    while line := self._fh.readline():
        m = _QUERY_RE.search(line)
        if not m:
            continue
        query_type, domain = m.group(1), m.group(2).lower().rstrip(".")
        if self._is_allowed(domain):
            continue
        events.append(
            WatchEvent(
                ts=datetime.now(UTC).isoformat(),
                source="dns",
                action="blocked_query",
                domain=domain,
                query_type=query_type,
                container=self._container,
            )
        )
    return events

DomainCache(state_dir)

IP-to-domain reverse lookup cache.

Initialise with the dnsmasq log path derived from state_dir.

Source code in src/terok_shield/lib/watchers/domain_cache.py
def __init__(self, state_dir: Path) -> None:
    """Initialise with the dnsmasq log path derived from *state_dir*."""
    self._log_path = state.dnsmasq_log_path(state_dir)
    self._mapping: dict[str, str] = {}

lookup(ip)

Return the cached domain for ip, or empty string if unknown.

Source code in src/terok_shield/lib/watchers/domain_cache.py
def lookup(self, ip: str) -> str:
    """Return the cached domain for *ip*, or empty string if unknown."""
    return self._mapping.get(ip, "")

refresh()

Reload the IP-to-domain mapping from the dnsmasq query log.

On OSError the previous cache is preserved.

Source code in src/terok_shield/lib/watchers/domain_cache.py
def refresh(self) -> None:
    """Reload the IP-to-domain mapping from the dnsmasq query log.

    On ``OSError`` the previous cache is preserved.
    """
    try:
        text = self._log_path.read_text()
    except OSError:
        return
    mapping: dict[str, str] = {}
    for m in _REPLY_RE.finditer(text):
        domain, ip = m.group(1), m.group(2)
        mapping[ip] = domain.lower().rstrip(".")
    self._mapping = mapping

NflogWatcher(sock, container)

Read NFLOG messages via AF_NETLINK and yield events for denied packets.

Wrap an already-bound NFLOG netlink socket.

Use :meth:create instead of calling this directly.

Source code in src/terok_shield/lib/watchers/nflog.py
def __init__(self, sock: socket.socket, container: str) -> None:
    """Wrap an already-bound NFLOG netlink socket.

    Use :meth:`create` instead of calling this directly.
    """
    self._sock = sock
    self._container = container

create(container, group=NFLOG_GROUP) classmethod

Create and bind an NFLOG watcher, or return None on failure.

Failure is expected in environments without AF_NETLINK support, unprivileged containers, or missing kernel modules.

Parameters:

Name Type Description Default
container str

Container name (for event metadata).

required
group int

NFLOG group number to subscribe to.

NFLOG_GROUP
Source code in src/terok_shield/lib/watchers/nflog.py
@classmethod
def create(cls, container: str, group: int = NFLOG_GROUP) -> "NflogWatcher | None":
    """Create and bind an NFLOG watcher, or return ``None`` on failure.

    Failure is expected in environments without ``AF_NETLINK`` support,
    unprivileged containers, or missing kernel modules.

    Args:
        container: Container name (for event metadata).
        group: NFLOG group number to subscribe to.
    """
    sock: socket.socket | None = None
    try:
        sock = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, _NETLINK_NETFILTER)
        sock.bind((0, 0))

        # Handshake in blocking mode so the bind ACK is reliably received.
        sock.settimeout(2.0)
        sock.send(_build_nflog_bind_msg(group))
        ack = sock.recv(4096)
        if len(ack) >= _NLMSG_HDR.size + 4:
            err = struct.unpack_from("=i", ack, _NLMSG_HDR.size)[0]
            if err < 0:
                sock.close()
                logger.debug("NFLOG bind rejected (errno %d) — skipping", -err)
                return None

        # Switch to non-blocking for the poll() loop
        sock.setblocking(False)
        return cls(sock, container)
    except (OSError, AttributeError):
        # OSError: netlink/timeout unavailable; AttributeError: AF_NETLINK
        # missing on non-Linux platforms.
        logger.debug("NFLOG socket unavailable — skipping packet events")
        if sock is not None:
            sock.close()
        return None

fileno()

Return the file descriptor for select.select() multiplexing.

Source code in src/terok_shield/lib/watchers/nflog.py
def fileno(self) -> int:
    """Return the file descriptor for ``select.select()`` multiplexing."""
    return self._sock.fileno()

close()

Close the netlink socket.

Source code in src/terok_shield/lib/watchers/nflog.py
def close(self) -> None:
    """Close the netlink socket."""
    self._sock.close()

poll()

Read pending NFLOG messages and return watch events.

Source code in src/terok_shield/lib/watchers/nflog.py
def poll(self) -> list[WatchEvent]:
    """Read pending NFLOG messages and return watch events."""
    events: list[WatchEvent] = []
    while True:
        try:
            data = self._sock.recv(65535)
        except OSError:
            break
        if not data:
            break
        events.extend(self._parse_messages(data))
    return events