Skip to content

nft

nft

nftables ruleset generation.

+=====================================================+ | SECURITY BOUNDARY -- read this file first. | | | | Every nftables ruleset is generated here. | | All inputs are validated before interpolation. | | Only stdlib + nft_constants.py imports allowed. | +=====================================================+

RulesetBuilder(*, dns=PASTA_DNS, loopback_ports=(), set_timeout='')

Builder pattern for nftables ruleset generation and verification.

Security boundary: only stdlib + nft_constants imports. All inputs validated before interpolation.

Binds dns and loopback_ports once at construction -- these were previously threaded as parameters to every function call.

Create a builder with validated DNS and loopback port config.

Parameters:

Name Type Description Default
dns str

DNS server address (pasta default forwarder).

PASTA_DNS
loopback_ports tuple[int, ...]

TCP ports to allow on the loopback interface. When set, gateway port rules reference @gateway_v4/ @gateway_v6 sets populated at runtime by the OCI hook.

()
set_timeout str

nft set element timeout (e.g. 30m). When set, allow sets use flags interval, timeout so dnsmasq-populated IPs expire and are refreshed on the next DNS query.

''
Source code in src/terok_shield/core/nft.py
def __init__(
    self,
    *,
    dns: str = PASTA_DNS,
    loopback_ports: tuple[int, ...] = (),
    set_timeout: str = "",
) -> None:
    """Create a builder with validated DNS and loopback port config.

    Args:
        dns: DNS server address (pasta default forwarder).
        loopback_ports: TCP ports to allow on the loopback interface.
            When set, gateway port rules reference ``@gateway_v4``/
            ``@gateway_v6`` sets populated at runtime by the OCI hook.
        set_timeout: nft set element timeout (e.g. ``30m``).  When set,
            allow sets use ``flags interval, timeout`` so dnsmasq-populated
            IPs expire and are refreshed on the next DNS query.
    """
    dns = safe_ip(dns)
    for p in loopback_ports:
        _safe_port(p)
    if set_timeout:
        _safe_timeout(set_timeout)
    self._dns = dns
    self._loopback_ports = loopback_ports
    self._set_timeout = set_timeout

build_hook(*, interactive=False)

Generate the hook-mode (deny-all) nftables ruleset.

Source code in src/terok_shield/core/nft.py
def build_hook(self, *, interactive: bool = False) -> str:
    """Generate the hook-mode (deny-all) nftables ruleset."""
    return hook_ruleset(
        dns=self._dns,
        loopback_ports=self._loopback_ports,
        set_timeout=self._set_timeout,
        interactive=interactive,
    )

build_bypass(*, allow_all=False)

Generate the bypass-mode (accept-all + log) ruleset.

Source code in src/terok_shield/core/nft.py
def build_bypass(self, *, allow_all: bool = False) -> str:
    """Generate the bypass-mode (accept-all + log) ruleset."""
    return bypass_ruleset(
        dns=self._dns,
        loopback_ports=self._loopback_ports,
        allow_all=allow_all,
        set_timeout=self._set_timeout,
    )

verify_hook(nft_output, *, interactive=False)

Check applied hook ruleset invariants. Returns errors (empty = OK).

Source code in src/terok_shield/core/nft.py
def verify_hook(self, nft_output: str, *, interactive: bool = False) -> list[str]:
    """Check applied hook ruleset invariants.  Returns errors (empty = OK)."""
    return verify_ruleset(nft_output, interactive=interactive)

verify_bypass(nft_output, *, allow_all=False)

Check applied bypass ruleset invariants. Returns errors (empty = OK).

Source code in src/terok_shield/core/nft.py
def verify_bypass(self, nft_output: str, *, allow_all: bool = False) -> list[str]:
    """Check applied bypass ruleset invariants.  Returns errors (empty = OK)."""
    return verify_bypass_ruleset(nft_output, allow_all=allow_all)

add_elements_dual(ips)

Classify IPs by family and generate add-element commands for both sets.

When the builder has a set_timeout configured (dnsmasq tier), permanent IPs are written with timeout 0s so they do not auto-expire along with dnsmasq-learned entries.

Source code in src/terok_shield/core/nft.py
def add_elements_dual(self, ips: list[str]) -> str:
    """Classify IPs by family and generate add-element commands for both sets.

    When the builder has a ``set_timeout`` configured (dnsmasq tier),
    permanent IPs are written with ``timeout 0s`` so they do not auto-expire
    along with dnsmasq-learned entries.
    """
    return add_elements_dual(ips, permanent=bool(self._set_timeout))

safe_ip(value)

Validate and normalize an IPv4 or IPv6 address or CIDR notation.

Prevents nft command injection by ensuring the value is a valid IP address or network. Returns the canonical string form so that string comparisons across state files (profile.allowed, live.allowed, deny.list) are reliable regardless of input notation.

Raises ValueError on invalid input.

Source code in src/terok_shield/core/nft.py
def safe_ip(value: str) -> str:
    """Validate and normalize an IPv4 or IPv6 address or CIDR notation.

    Prevents nft command injection by ensuring the value is a valid
    IP address or network.  Returns the canonical string form so that
    string comparisons across state files (profile.allowed, live.allowed,
    deny.list) are reliable regardless of input notation.

    Raises ValueError on invalid input.
    """
    v = value.strip()
    try:
        if "/" in v:
            return str(ipaddress.ip_network(v, strict=False))
        return str(ipaddress.ip_address(v))
    except ValueError as e:
        raise ValueError(f"Invalid IP/CIDR: {v!r}") from e

hook_ruleset(dns=PASTA_DNS, loopback_ports=(), set_timeout='', *, interactive=False)

Generate a per-container nftables ruleset for hook mode.

Applied by the OCI hook into the container's own netns. Dual-stack: both IPv4 and IPv6 use deny-all + allowlist.

gateway_v4 and gateway_v6 sets are always defined but start empty. The OCI hook populates them from /proc/{pid}/net/route after applying this ruleset; shield_up() repopulates them from the persisted state/gateway file.

Chain order (output): loopback -> established -> DNS -> gateway ports -> loopback ports -> allow sets -> deny sets -> private-range reject -> deny/interactive-reject

Parameters:

Name Type Description Default
dns str

DNS server address (pasta default forwarder).

PASTA_DNS
loopback_ports tuple[int, ...]

TCP ports to allow on the loopback interface.

()
set_timeout str

nft set element timeout (e.g. 30m).

''
interactive bool

When True, replaces the terminal deny-all rule with an NFLOG+reject rule using the QUEUED prefix for interactive handling.

False
Source code in src/terok_shield/core/nft.py
def hook_ruleset(
    dns: str = PASTA_DNS,
    loopback_ports: tuple[int, ...] = (),
    set_timeout: str = "",
    *,
    interactive: bool = False,
) -> str:
    """Generate a per-container nftables ruleset for hook mode.

    Applied by the OCI hook into the container's own netns.
    Dual-stack: both IPv4 and IPv6 use deny-all + allowlist.

    ``gateway_v4`` and ``gateway_v6`` sets are always defined but start
    empty.  The OCI hook populates them from ``/proc/{pid}/net/route``
    after applying this ruleset; ``shield_up()`` repopulates them from
    the persisted ``state/gateway`` file.

    Chain order (output):
        loopback -> established -> DNS -> gateway ports -> loopback ports
        -> allow sets -> deny sets -> private-range reject -> deny/interactive-reject

    Args:
        dns: DNS server address (pasta default forwarder).
        loopback_ports: TCP ports to allow on the loopback interface.
        set_timeout: nft set element timeout (e.g. ``30m``).
        interactive: When True, replaces the terminal deny-all rule with an
            NFLOG+reject rule using the QUEUED prefix for interactive handling.
    """
    dns = safe_ip(dns)
    if set_timeout:
        _safe_timeout(set_timeout)
    for p in loopback_ports:
        _safe_port(p)
    port_rules = _loopback_port_rules(loopback_ports)
    gw_rules = _gateway_port_rules(loopback_ports)
    infra_block = ""
    if gw_rules:
        infra_block += f"\n{gw_rules}"
    if port_rules:
        infra_block += f"\n{port_rules}"
    infra_block += "\n"
    dns_af = "ip" if _is_v4(dns) else "ip6"
    set_v4 = _set_declaration("allow_v4", "ipv4_addr", set_timeout)
    set_v6 = _set_declaration("allow_v6", "ipv6_addr", set_timeout)
    set_deny_v4 = _set_declaration("deny_v4", "ipv4_addr")
    set_deny_v6 = _set_declaration("deny_v6", "ipv6_addr")
    terminal_rule = _interactive_reject_rule() if interactive else _audit_deny_rule()
    return textwrap.dedent(f"""\
        table {NFT_TABLE} {{
            {set_v4}
            {set_v6}
            {set_deny_v4}
            {set_deny_v6}
            set gateway_v4 {{ type ipv4_addr; }}
            set gateway_v6 {{ type ipv6_addr; }}

            chain output {{
                type filter hook output priority filter; policy drop;
                oifname "lo" accept
                ct state established,related accept
                udp dport 53 {dns_af} daddr {dns} accept
                tcp dport 53 {dns_af} daddr {dns} accept{infra_block}\
        {_audit_allow_rules()}
        {_deny_set_rules()}
        {_private_range_rules()}
        {terminal_rule}
            }}

            chain input {{
                type filter hook input priority filter; policy drop;
                iifname "lo" accept
                ct state established,related accept
                udp sport 53 accept
                tcp sport 53 accept
                drop
            }}
        }}
    """)

bypass_ruleset(dns=PASTA_DNS, loopback_ports=(), *, allow_all=False, set_timeout='')

Generate a bypass (accept-all + log) nftables ruleset.

Same structure as hook_ruleset() but output chain policy is accept and new connections are logged with the bypass prefix. Private-range reject rules (private ranges) are kept unless allow_all is True.

Parameters:

Name Type Description Default
dns str

DNS server address (pasta default forwarder).

PASTA_DNS
loopback_ports tuple[int, ...]

TCP ports to allow on the loopback interface.

()
allow_all bool

If True, remove private-range reject rules.

False
set_timeout str

nft set element timeout (e.g. 30m).

''
Source code in src/terok_shield/core/nft.py
def bypass_ruleset(
    dns: str = PASTA_DNS,
    loopback_ports: tuple[int, ...] = (),
    *,
    allow_all: bool = False,
    set_timeout: str = "",
) -> str:
    """Generate a bypass (accept-all + log) nftables ruleset.

    Same structure as ``hook_ruleset()`` but output chain policy is accept
    and new connections are logged with the bypass prefix.  Private-range
    reject rules (private ranges) are kept unless
    *allow_all* is True.

    Args:
        dns: DNS server address (pasta default forwarder).
        loopback_ports: TCP ports to allow on the loopback interface.
        allow_all: If True, remove private-range reject rules.
        set_timeout: nft set element timeout (e.g. ``30m``).
    """
    dns = safe_ip(dns)
    if set_timeout:
        _safe_timeout(set_timeout)
    for p in loopback_ports:
        _safe_port(p)
    port_rules = _loopback_port_rules(loopback_ports)
    gw_rules = _gateway_port_rules(loopback_ports)
    infra_block = ""
    if gw_rules:
        infra_block += f"\n{gw_rules}"
    if port_rules:
        infra_block += f"\n{port_rules}"
    infra_block += "\n"
    dns_af = "ip" if _is_v4(dns) else "ip6"
    set_v4 = _set_declaration("allow_v4", "ipv4_addr", set_timeout)
    set_v6 = _set_declaration("allow_v6", "ipv6_addr", set_timeout)
    private_block = "" if allow_all else f"\n{_private_range_rules()}"
    bypass_log = (
        f'        ct state new log group {NFLOG_GROUP} prefix "{BYPASS_LOG_PREFIX}: " counter'
    )
    return textwrap.dedent(f"""\
        table {NFT_TABLE} {{
            {set_v4}
            {set_v6}
            set gateway_v4 {{ type ipv4_addr; }}
            set gateway_v6 {{ type ipv6_addr; }}

            chain output {{
                type filter hook output priority filter; policy accept;
                oifname "lo" accept
                ct state established,related accept
                udp dport 53 {dns_af} daddr {dns} accept
                tcp dport 53 {dns_af} daddr {dns} accept{infra_block}\
        {bypass_log}{private_block}
            }}

            chain input {{
                type filter hook input priority filter; policy drop;
                iifname "lo" accept
                ct state established,related accept
                udp sport 53 accept
                tcp sport 53 accept
                drop
            }}
        }}
    """)

add_elements(set_name, ips, table=NFT_TABLE, *, timeout_zero=False)

Generate nft command to add validated IPs to a set.

Both set_name and table are validated against injection. Returns empty string if no valid IPs.

Parameters:

Name Type Description Default
timeout_zero bool

When True, each element is annotated with timeout 0s so it never expires, even in sets that carry a default element timeout (dnsmasq tier).

False
Source code in src/terok_shield/core/nft.py
def add_elements(
    set_name: str, ips: list[str], table: str = NFT_TABLE, *, timeout_zero: bool = False
) -> str:
    """Generate nft command to add validated IPs to a set.

    Both ``set_name`` and ``table`` are validated against injection.
    Returns empty string if no valid IPs.

    Args:
        timeout_zero: When ``True``, each element is annotated with
            ``timeout 0s`` so it never expires, even in sets that carry a
            default element timeout (dnsmasq tier).
    """
    _safe_ident(set_name)
    for part in table.split():
        _safe_ident(part)
    valid = [safe_ip(ip) for ip in ips if _try_validate(ip)]
    if not valid:
        return ""
    if timeout_zero:
        elements = ", ".join(f"{ip} timeout 0s" for ip in valid)
    else:
        elements = ", ".join(valid)
    return f"add element {table} {set_name} {{ {elements} }}\n"

add_elements_dual(ips, *, permanent=False)

Classify IPs by family and generate add-element commands for both sets.

IPv4 addresses go to allow_v4, IPv6 to allow_v6. Returns empty string if no valid IPs.

Parameters:

Name Type Description Default
permanent bool

When True, elements are annotated with timeout 0s so they never expire in sets that carry a default timeout (dnsmasq tier). Permanent IPs (profile/live allowlists) must not be evicted by the same 30-minute expiry used for dnsmasq-learned IPs.

False
Source code in src/terok_shield/core/nft.py
def add_elements_dual(ips: list[str], *, permanent: bool = False) -> str:
    """Classify IPs by family and generate add-element commands for both sets.

    IPv4 addresses go to ``allow_v4``, IPv6 to ``allow_v6``.
    Returns empty string if no valid IPs.

    Args:
        permanent: When ``True``, elements are annotated with ``timeout 0s``
            so they never expire in sets that carry a default timeout
            (dnsmasq tier).  Permanent IPs (profile/live allowlists) must
            not be evicted by the same 30-minute expiry used for
            dnsmasq-learned IPs.
    """
    v4: list[str] = []
    v6: list[str] = []
    for ip in ips:
        try:
            sanitized = safe_ip(ip)
        except ValueError:
            continue
        (v4 if _is_v4(sanitized) else v6).append(sanitized)
    parts: list[str] = []
    cmd = add_elements(_ALLOW_V4, v4, timeout_zero=permanent)
    if cmd:
        parts.append(cmd)
    cmd = add_elements(_ALLOW_V6, v6, timeout_zero=permanent)
    if cmd:
        parts.append(cmd)
    return "".join(parts)

add_deny_elements_dual(ips)

Classify IPs by family and generate add-element commands for deny sets.

IPv4 addresses go to deny_v4, IPv6 to deny_v6. Returns empty string if no valid IPs.

Source code in src/terok_shield/core/nft.py
def add_deny_elements_dual(ips: list[str]) -> str:
    """Classify IPs by family and generate add-element commands for deny sets.

    IPv4 addresses go to ``deny_v4``, IPv6 to ``deny_v6``.
    Returns empty string if no valid IPs.
    """
    v4: list[str] = []
    v6: list[str] = []
    for ip in ips:
        try:
            sanitized = safe_ip(ip)
        except ValueError:
            continue
        (v4 if _is_v4(sanitized) else v6).append(sanitized)
    parts: list[str] = []
    cmd = add_elements(_DENY_V4, v4)
    if cmd:
        parts.append(cmd)
    cmd = add_elements(_DENY_V6, v6)
    if cmd:
        parts.append(cmd)
    return "".join(parts)

delete_deny_elements_dual(ips)

Classify IPs by family and generate delete-element commands for deny sets.

IPv4 addresses target deny_v4, IPv6 target deny_v6. Returns empty string if no valid IPs.

Source code in src/terok_shield/core/nft.py
def delete_deny_elements_dual(ips: list[str]) -> str:
    """Classify IPs by family and generate delete-element commands for deny sets.

    IPv4 addresses target ``deny_v4``, IPv6 target ``deny_v6``.
    Returns empty string if no valid IPs.
    """
    v4: list[str] = []
    v6: list[str] = []
    for ip in ips:
        try:
            sanitized = safe_ip(ip)
        except ValueError:
            continue
        (v4 if _is_v4(sanitized) else v6).append(sanitized)
    parts: list[str] = []
    if v4:
        elements = ", ".join(v4)
        parts.append(f"delete element {NFT_TABLE} {_DENY_V4} {{ {elements} }}\n")
    if v6:
        elements = ", ".join(v6)
        parts.append(f"delete element {NFT_TABLE} {_DENY_V6} {{ {elements} }}\n")
    return "".join(parts)

verify_ruleset(nft_output, *, interactive=False)

Check applied ruleset invariants. Returns errors (empty = OK).

Verifies: - Default policy is drop - Both output and input chains exist - Reject type is present - Dual-stack allow sets are declared - Dual-stack deny sets are declared - All private ranges are present (RFC 1918 + RFC 4193/4291) - Interactive mode: queued nflog prefix present - Non-interactive mode: deny nflog prefix present

Source code in src/terok_shield/core/nft.py
def verify_ruleset(nft_output: str, *, interactive: bool = False) -> list[str]:
    """Check applied ruleset invariants.  Returns errors (empty = OK).

    Verifies:
    - Default policy is drop
    - Both output and input chains exist
    - Reject type is present
    - Dual-stack allow sets are declared
    - Dual-stack deny sets are declared
    - All private ranges are present (RFC 1918 + RFC 4193/4291)
    - Interactive mode: queued nflog prefix present
    - Non-interactive mode: deny nflog prefix present
    """
    errors: list[str] = []
    if "policy drop" not in nft_output:
        errors.append("policy is not drop")
    for chain in ("output", "input"):
        if f"chain {chain}" not in nft_output:
            errors.append(f"{chain} chain missing")
    if "admin-prohibited" not in nft_output:
        errors.append("reject type missing")
    if "allow_v4" not in nft_output:
        errors.append("allow_v4 set missing")
    if "allow_v6" not in nft_output:
        errors.append("allow_v6 set missing")
    if "deny_v4" not in nft_output:
        errors.append("deny_v4 set missing")
    if "deny_v6" not in nft_output:
        errors.append("deny_v6 set missing")
    if interactive:
        if QUEUED_LOG_PREFIX not in nft_output:
            errors.append("queued nflog prefix missing")
    else:
        # Verify the terminal deny-all rule specifically — not just the prefix
        # string, which also appears in deny-set rules (ip daddr @deny_v4 ...).
        # The terminal rule is a standalone log+reject without a daddr selector.
        # nft output ordering varies by version (group before/after prefix),
        # so match any log line containing the denied prefix.
        _terminal_deny_re = re.compile(
            rf'^\s*log\s+.*prefix\s+"{re.escape(DENIED_LOG_PREFIX)}',
            re.MULTILINE,
        )
        if not _terminal_deny_re.search(nft_output):
            errors.append("terminal deny-all rule missing")
    errors.extend(_verify_private_blocks(nft_output))
    return errors

verify_bypass_ruleset(nft_output, *, allow_all=False)

Check applied bypass ruleset invariants. Returns errors (empty = OK).

Verifies: - Output chain has policy accept - Input chain has policy drop - Bypass nflog prefix is present - Dual-stack allow sets are declared - Private-range reject rules present (unless allow_all)

Source code in src/terok_shield/core/nft.py
def verify_bypass_ruleset(nft_output: str, *, allow_all: bool = False) -> list[str]:
    """Check applied bypass ruleset invariants.  Returns errors (empty = OK).

    Verifies:
    - Output chain has policy accept
    - Input chain has policy drop
    - Bypass nflog prefix is present
    - Dual-stack allow sets are declared
    - Private-range reject rules present (unless *allow_all*)
    """
    errors: list[str] = []
    if "policy accept" not in nft_output:
        errors.append("output policy is not accept")
    if "policy drop" not in nft_output:
        errors.append("input policy is not drop")
    for chain in ("output", "input"):
        if f"chain {chain}" not in nft_output:
            errors.append(f"{chain} chain missing")
    if BYPASS_LOG_PREFIX not in nft_output:
        errors.append("bypass nflog prefix missing")
    if "allow_v4" not in nft_output:
        errors.append("allow_v4 set missing")
    if "allow_v6" not in nft_output:
        errors.append("allow_v6 set missing")
    if not allow_all:
        errors.extend(_verify_private_blocks(nft_output))
    return errors