Skip to content

_event

_event

Shared event type emitted by all watchers.

WatchEvent(ts, source, action, container, domain='', query_type='', dest='', detail='', port=0, proto=0, extra=dict()) dataclass

A single watch event emitted to the output stream.

Core fields (always present): ts, source, action, container. DNS-specific: domain, query_type. Audit/NFLOG: dest, detail, port, proto.

ts instance-attribute

source instance-attribute

action instance-attribute

container instance-attribute

domain = '' class-attribute instance-attribute

query_type = '' class-attribute instance-attribute

dest = '' class-attribute instance-attribute

detail = '' class-attribute instance-attribute

port = 0 class-attribute instance-attribute

proto = 0 class-attribute instance-attribute

extra = field(default_factory=dict) class-attribute instance-attribute

to_json()

Serialise to a compact JSON line, omitting empty optional fields.

Every string value passes through the producer-side WIRE_SPEC(safe-string) sanitiser — container-controlled bytes (DNS query name, dest IP, audit detail) reach this method straight from kernel logs / dnsmasq output, so the boundary lives here, not in any caller.

Source code in src/terok_shield/watchers/_event.py
def to_json(self) -> str:
    """Serialise to a compact JSON line, omitting empty optional fields.

    Every string value passes through the producer-side
    ``WIRE_SPEC(safe-string)`` sanitiser — container-controlled
    bytes (DNS query name, dest IP, audit detail) reach this
    method straight from kernel logs / dnsmasq output, so the
    boundary lives here, not in any caller.
    """
    d: dict[str, Any] = {}
    for k, v in asdict(self).items():
        if not v and k not in ("ts", "source", "action", "container"):
            continue
        if isinstance(v, str):
            d[k] = sanitize(v)
        elif isinstance(v, dict):
            d[k] = sanitize_mapping(v)
        else:
            d[k] = v
    return json.dumps(d, separators=(",", ":"))