Skip to content

interactive

interactive

NFLOG-based interactive connection handler.

Implements a JSON-lines protocol for interactive verdict flow:

  1. pending — emitted when a new outbound connection is detected via NFLOG. Contains id, dest (IP), port, proto, and domain (if resolvable from the dnsmasq log).
  2. verdict — received on stdin from the controlling process. Must contain type: "verdict", id (matching a pending event), and action ("accept" or "deny").
  3. verdict_applied — emitted after the verdict has been persisted to the nft ruleset and state files. Contains id, dest, action, and ok (boolean indicating success).

The handler deduplicates by destination IP — only the first packet to a given IP triggers a pending event. Accepted IPs are added to the allow sets and persisted to live.allowed; denied IPs are added to the deny sets and persisted to deny.list.

InteractiveSession(*, runner, state_dir, container, io=None)

Drive the interactive NFLOG verdict loop.

Creates an :class:NflogWatcher, listens for queued-connection events, emits pending events as JSON lines on stdout, reads verdict commands from stdin, and applies them to the nft ruleset and state files.

Initialise the session.

Parameters:

Name Type Description Default
runner CommandRunner

Command runner for nft operations.

required
state_dir Path

Per-container state directory.

required
container str

Container name (for nft nsenter).

required
io SessionIO | None

I/O protocol implementation (defaults to :class:JsonSessionIO).

None
Source code in src/terok_shield/cli/interactive.py
def __init__(
    self,
    *,
    runner: CommandRunner,
    state_dir: Path,
    container: str,
    io: SessionIO | None = None,
) -> None:
    """Initialise the session.

    Args:
        runner: Command runner for nft operations.
        state_dir: Per-container state directory.
        container: Container name (for nft nsenter).
        io: I/O protocol implementation (defaults to :class:`JsonSessionIO`).
    """
    self._runner = runner
    self._state_dir = state_dir
    self._container = container
    self._io: SessionIO = io if io is not None else JsonSessionIO()
    self._domain_cache = DomainCache(state_dir)

    self._seen_ips: set[str] = set()
    self._pending_by_ip: dict[str, _PendingPacket] = {}
    self._last_domain_refresh: float = 0.0
    self._next_id: int = 1

run()

Enter the interactive event loop.

Creates the NFLOG watcher, sets stdin to non-blocking, installs signal handlers, and delegates to :meth:_loop. Exits with code 1 if the NFLOG watcher cannot be created.

Source code in src/terok_shield/cli/interactive.py
def run(self) -> None:
    """Enter the interactive event loop.

    Creates the NFLOG watcher, sets stdin to non-blocking, installs
    signal handlers, and delegates to :meth:`_loop`.  Exits with
    code 1 if the NFLOG watcher cannot be created.
    """
    watcher = NflogWatcher.create(self._container)
    if watcher is None:
        print(
            "Error: could not create NFLOG watcher (netlink unavailable).",
            file=sys.stderr,
        )
        raise SystemExit(1)

    self._io.emit_banner()

    stdin_fd = sys.stdin.fileno()
    _set_nonblocking(stdin_fd)

    global _running  # noqa: PLW0603
    _running = True
    signal.signal(signal.SIGINT, _handle_signal)
    signal.signal(signal.SIGTERM, _handle_signal)

    try:
        self._loop(watcher, stdin_fd)
    finally:
        watcher.close()

SessionIO

Bases: Protocol

I/O protocol for the interactive session.

Decouples rendering and parsing from the verdict engine so the same :class:InteractiveSession can drive both machine-readable JSON-lines (--raw) and human-friendly CLI output.

emit_pending(packet_id, dest, port, proto, domain)

Emit a pending-connection event to the operator.

Source code in src/terok_shield/cli/interactive.py
def emit_pending(self, packet_id: int, dest: str, port: int, proto: int, domain: str) -> None:
    """Emit a pending-connection event to the operator."""
    ...

emit_verdict_applied(verdict_id, dest, action, *, ok)

Emit a verdict-applied confirmation.

Source code in src/terok_shield/cli/interactive.py
def emit_verdict_applied(self, verdict_id: int, dest: str, action: str, *, ok: bool) -> None:
    """Emit a verdict-applied confirmation."""
    ...

parse_command(line)

Parse one line of operator input into (packet_id, action).

Returns None when the line is invalid or unparseable.

Source code in src/terok_shield/cli/interactive.py
def parse_command(self, line: str) -> tuple[int, str] | None:
    """Parse one line of operator input into *(packet_id, action)*.

    Returns ``None`` when the line is invalid or unparseable.
    """
    ...

emit_banner()

Print a startup banner (no-op for machine protocols).

Source code in src/terok_shield/cli/interactive.py
def emit_banner(self) -> None:
    """Print a startup banner (no-op for machine protocols)."""
    ...

JsonSessionIO

JSON-lines session I/O — machine-readable protocol.

Emits compact JSON objects (one per line) and expects JSON verdict commands on stdin. This is the original protocol from PR #162.

emit_pending(packet_id, dest, port, proto, domain)

Emit a pending event as a JSON line.

Source code in src/terok_shield/cli/interactive.py
def emit_pending(self, packet_id: int, dest: str, port: int, proto: int, domain: str) -> None:
    """Emit a pending event as a JSON line."""
    out = {
        "type": "pending",
        "id": packet_id,
        "dest": dest,
        "port": port,
        "proto": proto,
        "domain": domain,
    }
    print(json.dumps(out, separators=(",", ":")), flush=True)

emit_verdict_applied(verdict_id, dest, action, *, ok)

Emit a verdict-applied confirmation as a JSON line.

Source code in src/terok_shield/cli/interactive.py
def emit_verdict_applied(self, verdict_id: int, dest: str, action: str, *, ok: bool) -> None:
    """Emit a verdict-applied confirmation as a JSON line."""
    out = {
        "type": "verdict_applied",
        "id": verdict_id,
        "dest": dest,
        "action": action,
        "ok": ok,
    }
    print(json.dumps(out, separators=(",", ":")), flush=True)

parse_command(line)

Parse a JSON verdict command.

Expected format: {"type": "verdict", "id": 1, "action": "accept"}. Returns (id, action) on success or None on any validation failure.

Source code in src/terok_shield/cli/interactive.py
def parse_command(self, line: str) -> tuple[int, str] | None:
    """Parse a JSON verdict command.

    Expected format: ``{"type": "verdict", "id": 1, "action": "accept"}``.
    Returns ``(id, action)`` on success or ``None`` on any validation failure.
    """
    try:
        cmd = json.loads(line)
    except json.JSONDecodeError:
        logger.warning("Invalid JSON on stdin: %s", line)
        return None
    if not isinstance(cmd, dict):
        logger.warning("Expected JSON object, got %s", type(cmd).__name__)
        return None
    if cmd.get("type") != "verdict":
        logger.warning("Unknown command type: %s", cmd.get("type"))
        return None
    verdict_id = cmd.get("id")
    if isinstance(verdict_id, bool) or not isinstance(verdict_id, int):
        logger.warning("Verdict id must be an integer, got %r", verdict_id)
        return None
    action = cmd.get("action")
    if action not in ("accept", "deny"):
        logger.warning("Verdict action must be 'accept' or 'deny', got %r", action)
        return None
    return (verdict_id, action)

emit_banner()

No-op — machine protocol has no banner.

Source code in src/terok_shield/cli/interactive.py
def emit_banner(self) -> None:
    """No-op — machine protocol has no banner."""

CliSessionIO()

Human-friendly interactive CLI session I/O.

Renders blocked connections as readable lines and accepts short operator input (a/d or allow/deny). Pending packets are tracked in a FIFO queue — input always targets the oldest.

Initialise with empty pending-packet queues.

Source code in src/terok_shield/cli/interactive.py
def __init__(self) -> None:
    """Initialise with empty pending-packet queues."""
    self._queue: list[int] = []
    """FIFO of pending packet IDs awaiting a verdict."""

    self._info: dict[int, tuple[str, int, str]] = {}
    """Packet metadata: *id* → *(dest, port, domain)*."""

emit_pending(packet_id, dest, port, proto, domain)

Show a [BLOCKED] line and queue the packet for verdict.

Source code in src/terok_shield/cli/interactive.py
def emit_pending(self, packet_id: int, dest: str, port: int, proto: int, domain: str) -> None:
    """Show a ``[BLOCKED]`` line and queue the packet for verdict."""
    label = f"{dest} ({domain})" if domain else dest
    self._queue.append(packet_id)
    self._info[packet_id] = (dest, port, domain)
    if len(self._queue) == 1:
        self._prompt_head()
    else:
        # Additional pending while the operator is thinking.
        print(f"\n[BLOCKED] {label} :{port} (queued)", flush=True)
        self._prompt_head()

emit_verdict_applied(verdict_id, dest, action, *, ok)

Show verdict result and prompt the next queued packet if any.

On success the packet is removed from the queue. On failure it stays queued so the operator can retry with the same a/d input (mirrors :meth:InteractiveSession._process_command which keeps failed verdicts in _pending_by_ip).

Source code in src/terok_shield/cli/interactive.py
def emit_verdict_applied(self, verdict_id: int, dest: str, action: str, *, ok: bool) -> None:
    """Show verdict result and prompt the next queued packet if any.

    On success the packet is removed from the queue.  On failure it
    stays queued so the operator can retry with the same ``a``/``d``
    input (mirrors :meth:`InteractiveSession._process_command` which
    keeps failed verdicts in ``_pending_by_ip``).
    """
    info = self._info.get(verdict_id)
    target = info[2] if info and info[2] else dest
    if ok:
        self._info.pop(verdict_id, None)
        if verdict_id in self._queue:
            self._queue.remove(verdict_id)
        mark = "\u2713" if action == "accept" else "\u2717"
        verb = "allowed" if action == "accept" else "denied"
        print(f"  {mark} {verb} {target}")
    else:
        print(f"  ! verdict failed for {target} (retry with a/d)")
    self._prompt_head()

parse_command(line)

Map operator input to the oldest pending packet.

Accepts a, d, allow, deny (case-insensitive). Returns None and prints a hint on unrecognised input.

Source code in src/terok_shield/cli/interactive.py
def parse_command(self, line: str) -> tuple[int, str] | None:
    """Map operator input to the oldest pending packet.

    Accepts ``a``, ``d``, ``allow``, ``deny`` (case-insensitive).
    Returns ``None`` and prints a hint on unrecognised input.
    """
    action = _INPUT_MAP.get(line.strip().lower())
    if action is None:
        print("  Unknown input. Type 'a' to allow or 'd' to deny.", flush=True)
        self._prompt_head()
        return None
    if not self._queue:
        return None
    return (self._queue[0], action)

emit_banner()

Print a startup message.

Source code in src/terok_shield/cli/interactive.py
def emit_banner(self) -> None:
    """Print a startup message."""
    print("Watching for blocked connections... (Ctrl-C to stop)\n", flush=True)

run_interactive(state_dir, container, *, raw=False)

Start the interactive NFLOG handler for a container.

The NFLOG netlink socket must be inside the container's network namespace to receive packets logged by nft rules. On first invocation, re-execs via podman unshare nsenter into the container's netns. The re-exec sets _TEROK_SHIELD_NFLOG_NSENTER so the second invocation runs the handler directly.

Parameters:

Name Type Description Default
state_dir Path

Per-container state directory (may be relative).

required
container str

Container name.

required
raw bool

If True, use JSON-lines protocol; otherwise use the human-friendly CLI (default).

False

Raises:

Type Description
SystemExit

If the interactive tier is not configured or NFLOG watcher creation fails.

Source code in src/terok_shield/cli/interactive.py
def run_interactive(state_dir: Path, container: str, *, raw: bool = False) -> None:
    """Start the interactive NFLOG handler for a container.

    The NFLOG netlink socket must be inside the container's network
    namespace to receive packets logged by nft rules.  On first
    invocation, re-execs via ``podman unshare nsenter`` into the
    container's netns.  The re-exec sets ``_TEROK_SHIELD_NFLOG_NSENTER``
    so the second invocation runs the handler directly.

    Args:
        state_dir: Per-container state directory (may be relative).
        container: Container name.
        raw: If ``True``, use JSON-lines protocol; otherwise use the
            human-friendly CLI (default).

    Raises:
        SystemExit: If the interactive tier is not configured or NFLOG
            watcher creation fails.
    """
    state_dir = state_dir.resolve()
    tier = read_interactive_tier(state_dir)
    if tier != "nflog":
        print(
            f"Error: interactive tier not configured (got {tier!r}, expected 'nflog').",
            file=sys.stderr,
        )
        raise SystemExit(1)

    if os.environ.get(_NSENTER_ENV) != "1":
        _nsenter_reexec(state_dir, container, raw=raw)
        return

    io: SessionIO = JsonSessionIO() if raw else CliSessionIO()
    runner = SubprocessRunner()
    session = InteractiveSession(runner=runner, state_dir=state_dir, container=container, io=io)
    session.run()