Skip to content

rules

rules

nftables ruleset generation and verification.

Generates per-container nftables rulesets (deny-all and bypass modes), provides set operations for runtime allowlist/denylist management, and verifies applied rulesets against security invariants.

Security boundary: only stdlib + nft_constants.py imports allowed. All inputs are validated before interpolation into nft commands.

RulesetBuilder(*, dns=PASTA_DNS, loopback_ports=(), gateway_v4='', gateway_v6='', set_timeout='')

Builder for nftables ruleset generation and verification.

Security boundary: only stdlib + nft_constants imports. All inputs validated before interpolation.

Binds dns and loopback_ports once at construction so callers do not repeat them on every generation or verification call.

Create a builder with validated DNS, gateway, and port config.

Parameters:

Name Type Description Default
dns str

DNS server address (pasta default forwarder).

PASTA_DNS
loopback_ports tuple[int, ...]

TCP ports to allow on the loopback interface.

()
gateway_v4 str

IPv4 gateway address (e.g. slirp4netns 10.0.2.2).

''
gateway_v6 str

IPv6 gateway address (e.g. slirp4netns fd00::2).

''
set_timeout str

nft set element timeout (e.g. 30m).

''
Source code in src/terok_shield/nft/rules.py
def __init__(
    self,
    *,
    dns: str = PASTA_DNS,
    loopback_ports: tuple[int, ...] = (),
    gateway_v4: str = "",
    gateway_v6: str = "",
    set_timeout: str = "",
) -> None:
    """Create a builder with validated DNS, gateway, and port config.

    Args:
        dns: DNS server address (pasta default forwarder).
        loopback_ports: TCP ports to allow on the loopback interface.
        gateway_v4: IPv4 gateway address (e.g. slirp4netns ``10.0.2.2``).
        gateway_v6: IPv6 gateway address (e.g. slirp4netns ``fd00::2``).
        set_timeout: nft set element timeout (e.g. ``30m``).
    """
    dns = safe_ip(dns)
    for p in loopback_ports:
        _safe_port(p)
    if set_timeout:
        _safe_timeout(set_timeout)
    self._dns = dns
    self._loopback_ports = loopback_ports
    self._gateway_v4 = _safe_ipv4(gateway_v4) if gateway_v4 else ""
    self._gateway_v6 = _safe_ipv6(gateway_v6) if gateway_v6 else ""
    self._set_timeout = set_timeout

build_hook()

Generate the hook-mode (deny-all) nftables ruleset.

Applied by the OCI hook into the container's own netns. Dual-stack: both IPv4 and IPv6 use deny-all + allowlist.

Gateway addresses are baked directly into the ruleset as literal accept rules -- no dynamic sets or runtime discovery needed.

Chain order (output): loopback -> established -> DNS -> gateway ports -> loopback ports -> allow sets -> deny sets -> private-range reject -> terminal deny

Source code in src/terok_shield/nft/rules.py
def build_hook(self) -> str:
    """Generate the hook-mode (deny-all) nftables ruleset.

    Applied by the OCI hook into the container's own netns.
    Dual-stack: both IPv4 and IPv6 use deny-all + allowlist.

    Gateway addresses are baked directly into the ruleset as literal
    accept rules -- no dynamic sets or runtime discovery needed.

    Chain order (output):
        loopback -> established -> DNS -> gateway ports -> loopback ports
        -> allow sets -> deny sets -> private-range reject -> terminal deny
    """
    dns_af, dns, infra_block, set_v4, set_v6, set_deny_v4, set_deny_v6 = self._preamble()
    allow_rules = RulesetBuilder._audit_allow_rules()
    deny_rules = RulesetBuilder._deny_set_rules()
    private_rules = RulesetBuilder._private_range_rules()
    terminal_rule = RulesetBuilder._terminal_deny_rule()
    return textwrap.dedent(f"""\
        table {NFT_TABLE} {{
            {set_v4}
            {set_v6}
            {set_deny_v4}
            {set_deny_v6}

            chain output {{
                type filter hook output priority filter; policy drop;
                oifname "lo" accept
                ct state established,related accept
                udp dport 53 {dns_af} daddr {dns} accept
                tcp dport 53 {dns_af} daddr {dns} accept{infra_block}\
        {allow_rules}
        {deny_rules}
        {private_rules}
        {terminal_rule}
            }}

{_INPUT_CHAIN}
        }}
    """)

build_bypass(*, allow_all=False)

Generate the bypass-mode (accept-all + log) ruleset.

Same structure as build_hook() but output chain policy is accept, new connections are logged with the bypass prefix, and deny sets are enforced so deny.list entries still block traffic.

Parameters:

Name Type Description Default
allow_all bool

If True, remove private-range reject rules.

False
Source code in src/terok_shield/nft/rules.py
def build_bypass(self, *, allow_all: bool = False) -> str:
    """Generate the bypass-mode (accept-all + log) ruleset.

    Same structure as ``build_hook()`` but output chain policy is accept,
    new connections are logged with the bypass prefix, and deny sets are
    enforced so ``deny.list`` entries still block traffic.

    Args:
        allow_all: If True, remove private-range reject rules.
    """
    dns_af, dns, infra_block, set_v4, set_v6, set_deny_v4, set_deny_v6 = self._preamble()
    deny_rules = RulesetBuilder._deny_set_rules()
    private_rules = RulesetBuilder._private_range_rules()
    private_block = "" if allow_all else f"\n{private_rules}"
    bypass_log = (
        f'        ct state new log group {NFLOG_GROUP} prefix "{BYPASS_LOG_PREFIX}: " counter'
    )
    return textwrap.dedent(f"""\
        table {NFT_TABLE} {{
            {set_v4}
            {set_v6}
            {set_deny_v4}
            {set_deny_v6}

            chain output {{
                type filter hook output priority filter; policy accept;
                oifname "lo" accept
                ct state established,related accept
                udp dport 53 {dns_af} daddr {dns} accept
                tcp dport 53 {dns_af} daddr {dns} accept{infra_block}\
        {deny_rules}
        {bypass_log}{private_block}
            }}

{_INPUT_CHAIN}
        }}
    """)

build_quarantine() staticmethod

Generate the quarantine-mode (total blackout) ruleset.

Drops all traffic except loopback and established connections. No DNS, no allowlists, no gateway ports. All dropped packets are tagged for the audit log.

Source code in src/terok_shield/nft/rules.py
@staticmethod
def build_quarantine() -> str:
    """Generate the quarantine-mode (total blackout) ruleset.

    Drops all traffic except loopback and established connections.
    No DNS, no allowlists, no gateway ports.  All dropped packets
    are tagged for the audit log.
    """
    blocked_log = f'        log group {NFLOG_GROUP} prefix "{BLOCKED_LOG_PREFIX}: " drop'
    return textwrap.dedent(f"""\
        table {NFT_TABLE} {{
            chain output {{
                type filter hook output priority filter; policy drop;
                oifname "lo" accept
                ct state established,related accept
        {blocked_log}
            }}

            chain input {{
                type filter hook input priority filter; policy drop;
                iifname "lo" accept
                ct state established,related accept
                drop
            }}
        }}
    """)

verify_hook(nft_output)

Check applied hook ruleset invariants. Returns errors (empty = OK).

Expects output from nft list table inet terok_shield (scoped to the managed table), not nft list ruleset.

Verifies: - Managed table header is present - Default policy is drop - Both output and input chains exist - Reject type is present - Dual-stack allow sets are declared - Dual-stack deny sets are declared - All private ranges are present (RFC 1918 + RFC 4193/4291) - Terminal deny-all rule with BLOCKED prefix present

Source code in src/terok_shield/nft/rules.py
def verify_hook(self, nft_output: str) -> list[str]:
    """Check applied hook ruleset invariants.  Returns errors (empty = OK).

    Expects output from ``nft list table inet terok_shield`` (scoped to the
    managed table), not ``nft list ruleset``.

    Verifies:
    - Managed table header is present
    - Default policy is drop
    - Both output and input chains exist
    - Reject type is present
    - Dual-stack allow sets are declared
    - Dual-stack deny sets are declared
    - All private ranges are present (RFC 1918 + RFC 4193/4291)
    - Terminal deny-all rule with BLOCKED prefix present
    """
    errors: list[str] = []
    if f"table {NFT_TABLE}" not in nft_output:
        errors.append(f"managed table '{NFT_TABLE}' not found in output")
    if "policy drop" not in nft_output:
        errors.append("policy is not drop")
    for chain in ("output", "input"):
        if f"chain {chain}" not in nft_output:
            errors.append(f"{chain} chain missing")
    if "admin-prohibited" not in nft_output:
        errors.append("reject type missing")
    if "allow_v4" not in nft_output:
        errors.append("allow_v4 set missing")
    if "allow_v6" not in nft_output:
        errors.append("allow_v6 set missing")
    if "deny_v4" not in nft_output:
        errors.append("deny_v4 set missing")
    if "deny_v6" not in nft_output:
        errors.append("deny_v6 set missing")
    # Verify the terminal deny-all rule -- a standalone log+reject with the
    # BLOCKED prefix (no daddr selector, unlike deny-set rules).
    terminal_deny_re = re.compile(
        rf'^\s*log\s+.*prefix\s+"{re.escape(BLOCKED_LOG_PREFIX)}',
        re.MULTILINE,
    )
    if not terminal_deny_re.search(nft_output):
        errors.append("terminal deny-all rule missing")
    errors.extend(RulesetBuilder._verify_private_blocks(nft_output))
    return errors

verify_bypass(nft_output, *, allow_all=False)

Check applied bypass ruleset invariants. Returns errors (empty = OK).

Expects output from nft list table inet terok_shield (scoped to the managed table), not nft list ruleset.

Verifies: - Managed table header is present - Output chain has policy accept - Input chain has policy drop - Bypass nflog prefix is present - Dual-stack allow sets are declared - Private-range reject rules present (unless allow_all)

Source code in src/terok_shield/nft/rules.py
def verify_bypass(self, nft_output: str, *, allow_all: bool = False) -> list[str]:
    """Check applied bypass ruleset invariants.  Returns errors (empty = OK).

    Expects output from ``nft list table inet terok_shield`` (scoped to the
    managed table), not ``nft list ruleset``.

    Verifies:
    - Managed table header is present
    - Output chain has policy accept
    - Input chain has policy drop
    - Bypass nflog prefix is present
    - Dual-stack allow sets are declared
    - Private-range reject rules present (unless *allow_all*)
    """
    errors: list[str] = []
    if f"table {NFT_TABLE}" not in nft_output:
        errors.append(f"managed table '{NFT_TABLE}' not found in output")
    if "policy accept" not in nft_output:
        errors.append("output policy is not accept")
    if "policy drop" not in nft_output:
        errors.append("input policy is not drop")
    for chain in ("output", "input"):
        if f"chain {chain}" not in nft_output:
            errors.append(f"{chain} chain missing")
    if BYPASS_LOG_PREFIX not in nft_output:
        errors.append("bypass nflog prefix missing")
    for sname in ("allow_v4", "allow_v6", "deny_v4", "deny_v6"):
        if sname not in nft_output:
            errors.append(f"{sname} set missing")
    if not allow_all:
        errors.extend(RulesetBuilder._verify_private_blocks(nft_output))
    return errors

verify_quarantine(nft_output) staticmethod

Check applied quarantine ruleset invariants. Returns errors (empty = OK).

Expects output from nft list table inet terok_shield (scoped to the managed table), not nft list ruleset.

Verifies: - Managed table header is present - Both chains present with policy drop - Blocked log prefix present - No allow sets (total blackout means no allowlists)

Source code in src/terok_shield/nft/rules.py
@staticmethod
def verify_quarantine(nft_output: str) -> list[str]:
    """Check applied quarantine ruleset invariants.  Returns errors (empty = OK).

    Expects output from ``nft list table inet terok_shield`` (scoped to the
    managed table), not ``nft list ruleset``.

    Verifies:
    - Managed table header is present
    - Both chains present with policy drop
    - Blocked log prefix present
    - No allow sets (total blackout means no allowlists)
    """
    errors: list[str] = []
    if f"table {NFT_TABLE}" not in nft_output:
        errors.append(f"managed table '{NFT_TABLE}' not found in output")
    if "policy drop" not in nft_output:
        errors.append("policy is not drop")
    for chain in ("output", "input"):
        if f"chain {chain}" not in nft_output:
            errors.append(f"{chain} chain missing")
    if BLOCKED_LOG_PREFIX not in nft_output:
        errors.append("blocked nflog prefix missing")
    if "allow_v4" in nft_output:
        errors.append("allow_v4 set present in quarantine mode")
    if "allow_v6" in nft_output:
        errors.append("allow_v6 set present in quarantine mode")
    return errors

add_elements_dual(ips)

Classify IPs by family and generate add-element commands for both sets.

When the builder has a set_timeout configured (dnsmasq tier), permanent IPs are written with timeout 0s so they do not auto-expire along with dnsmasq-learned entries.

Source code in src/terok_shield/nft/rules.py
def add_elements_dual(self, ips: list[str]) -> str:
    """Classify IPs by family and generate add-element commands for both sets.

    When the builder has a ``set_timeout`` configured (dnsmasq tier),
    permanent IPs are written with ``timeout 0s`` so they do not auto-expire
    along with dnsmasq-learned entries.
    """
    return add_elements_dual(ips, permanent=bool(self._set_timeout))

add_elements_dual(ips, *, permanent=False)

Classify IPs by family and generate add-element commands for both sets.

IPv4 addresses go to allow_v4, IPv6 to allow_v6. Returns empty string if no valid IPs.

Parameters:

Name Type Description Default
permanent bool

When True, elements are annotated with timeout 0s so they never expire in sets that carry a default timeout (dnsmasq tier). Permanent IPs (profile/live allowlists) must not be evicted by the same 30-minute expiry used for dnsmasq-learned IPs.

False
Source code in src/terok_shield/nft/rules.py
def add_elements_dual(ips: list[str], *, permanent: bool = False) -> str:
    """Classify IPs by family and generate add-element commands for both sets.

    IPv4 addresses go to ``allow_v4``, IPv6 to ``allow_v6``.
    Returns empty string if no valid IPs.

    Args:
        permanent: When ``True``, elements are annotated with ``timeout 0s``
            so they never expire in sets that carry a default timeout
            (dnsmasq tier).  Permanent IPs (profile/live allowlists) must
            not be evicted by the same 30-minute expiry used for
            dnsmasq-learned IPs.
    """
    v4: list[str] = []
    v6: list[str] = []
    for ip in ips:
        try:
            sanitized = safe_ip(ip)
        except ValueError:
            continue
        (v4 if _is_v4(sanitized) else v6).append(sanitized)
    parts: list[str] = []
    cmd = add_elements("allow_v4", v4, timeout_zero=permanent)
    if cmd:
        parts.append(cmd)
    cmd = add_elements("allow_v6", v6, timeout_zero=permanent)
    if cmd:
        parts.append(cmd)
    return "".join(parts)

add_deny_elements_dual(ips)

Classify IPs by family and generate add-element commands for deny sets.

IPv4 addresses go to deny_v4, IPv6 to deny_v6. Returns empty string if no valid IPs.

Source code in src/terok_shield/nft/rules.py
def add_deny_elements_dual(ips: list[str]) -> str:
    """Classify IPs by family and generate add-element commands for deny sets.

    IPv4 addresses go to ``deny_v4``, IPv6 to ``deny_v6``.
    Returns empty string if no valid IPs.
    """
    v4: list[str] = []
    v6: list[str] = []
    for ip in ips:
        try:
            sanitized = safe_ip(ip)
        except ValueError:
            continue
        (v4 if _is_v4(sanitized) else v6).append(sanitized)
    parts: list[str] = []
    cmd = add_elements("deny_v4", v4)
    if cmd:
        parts.append(cmd)
    cmd = add_elements("deny_v6", v6)
    if cmd:
        parts.append(cmd)
    return "".join(parts)

delete_deny_elements_dual(ips)

Classify IPs by family and generate delete-element commands for deny sets.

IPv4 addresses target deny_v4, IPv6 target deny_v6. Returns empty string if no valid IPs.

Source code in src/terok_shield/nft/rules.py
def delete_deny_elements_dual(ips: list[str]) -> str:
    """Classify IPs by family and generate delete-element commands for deny sets.

    IPv4 addresses target ``deny_v4``, IPv6 target ``deny_v6``.
    Returns empty string if no valid IPs.
    """
    v4: list[str] = []
    v6: list[str] = []
    for ip in ips:
        try:
            sanitized = safe_ip(ip)
        except ValueError:
            continue
        (v4 if _is_v4(sanitized) else v6).append(sanitized)
    parts: list[str] = []
    cmd = delete_elements("deny_v4", v4)
    if cmd:
        parts.append(cmd)
    cmd = delete_elements("deny_v6", v6)
    if cmd:
        parts.append(cmd)
    return "".join(parts)

add_elements(set_name, ips, table=NFT_TABLE, *, timeout_zero=False)

Generate nft command to add validated IPs to a set.

Both set_name and table are validated against injection. Returns empty string if no valid IPs.

Parameters:

Name Type Description Default
timeout_zero bool

When True, each element is annotated with timeout 0s so it never expires, even in sets that carry a default element timeout (dnsmasq tier).

False
Source code in src/terok_shield/nft/rules.py
def add_elements(
    set_name: str, ips: list[str], table: str = NFT_TABLE, *, timeout_zero: bool = False
) -> str:
    """Generate nft command to add validated IPs to a set.

    Both ``set_name`` and ``table`` are validated against injection.
    Returns empty string if no valid IPs.

    Args:
        timeout_zero: When ``True``, each element is annotated with
            ``timeout 0s`` so it never expires, even in sets that carry a
            default element timeout (dnsmasq tier).
    """
    set_name = _safe_ident(set_name)
    table = " ".join(_safe_ident(part) for part in table.split())
    valid = [safe_ip(ip) for ip in ips if _try_validate(ip)]
    if not valid:
        return ""
    if timeout_zero:
        elements = ", ".join(f"{ip} timeout 0s" for ip in valid)
    else:
        elements = ", ".join(valid)
    return f"add element {table} {set_name} {{ {elements} }}\n"

delete_elements(set_name, ips, table=NFT_TABLE)

Generate nft command to delete validated IPs from a set.

Both set_name and table are validated against injection. Returns empty string if no valid IPs.

Source code in src/terok_shield/nft/rules.py
def delete_elements(set_name: str, ips: list[str], table: str = NFT_TABLE) -> str:
    """Generate nft command to delete validated IPs from a set.

    Both ``set_name`` and ``table`` are validated against injection.
    Returns empty string if no valid IPs.
    """
    set_name = _safe_ident(set_name)
    table = " ".join(_safe_ident(part) for part in table.split())
    valid = [safe_ip(ip) for ip in ips if _try_validate(ip)]
    if not valid:
        return ""
    elements = ", ".join(valid)
    return f"delete element {table} {set_name} {{ {elements} }}\n"

safe_ip(value)

Validate and normalize an IPv4 or IPv6 address or CIDR notation.

Prevents nft command injection by ensuring the value is a valid IP address or network. Returns the canonical string form so that string comparisons across state files (profile.allowed, live.allowed, deny.list) are reliable regardless of input notation.

Raises ValueError on invalid input.

Source code in src/terok_shield/nft/rules.py
def safe_ip(value: str) -> str:
    """Validate and normalize an IPv4 or IPv6 address or CIDR notation.

    Prevents nft command injection by ensuring the value is a valid
    IP address or network.  Returns the canonical string form so that
    string comparisons across state files (profile.allowed, live.allowed,
    deny.list) are reliable regardless of input notation.

    Raises ValueError on invalid input.
    """
    v = value.strip()
    try:
        if "/" in v:
            return str(ipaddress.ip_network(v, strict=False))
        return str(ipaddress.ip_address(v))
    except ValueError as e:
        raise ValueError(f"Invalid IP/CIDR: {v!r}") from e