Skip to content

watch

watch

shield watch — stream blocked-access events as JSON lines.

Tails the dnsmasq query log, per-container audit log, and (optionally) the NFLOG netlink socket. Only works when the dnsmasq DNS tier is active. Clean exit on SIGINT or SIGTERM.

run_watch(state_dir, container)

Stream blocked-access events as JSON lines to stdout.

Only meaningful under the dnsmasq tier — the query log and nftset integration that feed the watchers do not exist in the dig/getent tiers. Uses select so a single thread can multiplex the DNS log, audit log, and NFLOG socket without blocking on any one source.

Parameters:

Name Type Description Default
state_dir Path

Per-container state directory.

required
container str

Container name (for event metadata).

required

Raises:

Type Description
SystemExit

If the DNS tier is not dnsmasq.

Source code in src/terok_shield/cli/watch.py
def run_watch(state_dir: Path, container: str) -> None:
    """Stream blocked-access events as JSON lines to stdout.

    Only meaningful under the dnsmasq tier — the query log and nftset
    integration that feed the watchers do not exist in the dig/getent
    tiers.  Uses ``select`` so a single thread can multiplex the DNS
    log, audit log, and NFLOG socket without blocking on any one source.

    Args:
        state_dir: Per-container state directory.
        container: Container name (for event metadata).

    Raises:
        SystemExit: If the DNS tier is not dnsmasq.
    """
    _validate_dnsmasq_tier(state_dir)

    log_path = state.dnsmasq_log_path(state_dir)
    _ensure_log_file(log_path)

    _install_signal_handlers()

    dns_watcher = DnsLogWatcher(log_path, state_dir, container)
    audit_watcher = AuditLogWatcher(state.audit_path(state_dir), container)
    nflog_watcher = NflogWatcher.create(container)
    domain_cache = DomainCache(state_dir)

    try:
        while _running:
            _poll_nflog_or_sleep(nflog_watcher, domain_cache)
            _emit_events(dns_watcher.poll())
            _emit_events(audit_watcher.poll())
    finally:
        dns_watcher.close()
        audit_watcher.close()
        if nflog_watcher:
            nflog_watcher.close()