Skip to content

dns_log

dns_log

Tail the dnsmasq query log and emit events for blocked domain lookups.

Watches for new query[A] / query[AAAA] lines and classifies each domain by suffix-matching against the merged allowed domain set (profile + live - denied). Requires the dnsmasq DNS tier.

DnsLogWatcher(log_path, state_dir, container)

Tail the dnsmasq query log and yield events for blocked domains.

Opens the log file, seeks to the end, and watches for new query lines.

Open log_path, seek to end, and load the initial allowed domain set.

Source code in src/terok_shield/lib/watchers/dns_log.py
def __init__(self, log_path: Path, state_dir: Path, container: str) -> None:
    """Open *log_path*, seek to end, and load the initial allowed domain set."""
    self._log_path = log_path
    self._state_dir = state_dir
    self._container = container
    self._fh = open(log_path)  # noqa: SIM115 — needs fileno() for select
    try:
        self._fh.seek(0, os.SEEK_END)
        self._allowed_domains: set[str] = set()
        self._last_refresh = 0.0
        self._refresh_domains()
    except Exception:
        self._fh.close()
        raise

fileno()

Return the file descriptor for select.select() multiplexing.

Source code in src/terok_shield/lib/watchers/dns_log.py
def fileno(self) -> int:
    """Return the file descriptor for ``select.select()`` multiplexing."""
    return self._fh.fileno()

close()

Close the underlying file handle.

Source code in src/terok_shield/lib/watchers/dns_log.py
def close(self) -> None:
    """Close the underlying file handle."""
    self._fh.close()

poll()

Read new lines and return events for blocked queries.

Source code in src/terok_shield/lib/watchers/dns_log.py
def poll(self) -> list[WatchEvent]:
    """Read new lines and return events for blocked queries."""
    if _monotonic() - self._last_refresh > _DOMAIN_REFRESH_INTERVAL:
        self._refresh_domains()

    events: list[WatchEvent] = []
    while line := self._fh.readline():
        m = _QUERY_RE.search(line)
        if not m:
            continue
        query_type, domain = m.group(1), m.group(2).lower().rstrip(".")
        if self._is_allowed(domain):
            continue
        events.append(
            WatchEvent(
                ts=datetime.now(UTC).isoformat(),
                source="dns",
                action="blocked_query",
                domain=domain,
                query_type=query_type,
                container=self._container,
            )
        )
    return events