rules
rules
¶
nftables ruleset generation and verification.
Generates per-container nftables rulesets (deny-all and bypass modes), provides set operations for runtime allowlist/denylist management, and verifies applied rulesets against security invariants.
Security boundary: only stdlib + nft_constants.py imports allowed. All inputs are validated before interpolation into nft commands.
RulesetBuilder(*, dns=PASTA_DNS, loopback_ports=(), gateway_v4='', gateway_v6='', set_timeout='')
¶
Builder for nftables ruleset generation and verification.
Security boundary: only stdlib + nft_constants imports. All inputs validated before interpolation.
Binds dns and loopback_ports once at construction so
callers do not repeat them on every generation or verification call.
Create a builder with validated DNS, gateway, and port config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dns
|
str
|
DNS server address (pasta default forwarder). |
PASTA_DNS
|
loopback_ports
|
tuple[int, ...]
|
TCP ports to allow on the loopback interface. |
()
|
gateway_v4
|
str
|
IPv4 gateway address (e.g. slirp4netns |
''
|
gateway_v6
|
str
|
IPv6 gateway address (e.g. slirp4netns |
''
|
set_timeout
|
str
|
nft set element timeout (e.g. |
''
|
Source code in src/terok_shield/nft/rules.py
build_hook()
¶
Generate the hook-mode (deny-all) nftables ruleset.
Applied by the OCI hook into the container's own netns. Dual-stack: both IPv4 and IPv6 use deny-all + allowlist.
Gateway addresses are baked directly into the ruleset as literal accept rules -- no dynamic sets or runtime discovery needed.
Chain order (output): loopback -> established -> DNS -> gateway ports -> loopback ports -> allow sets -> deny sets -> private-range reject -> terminal deny
Source code in src/terok_shield/nft/rules.py
build_bypass(*, allow_all=False)
¶
Generate the bypass-mode (accept-all + log) ruleset.
Same structure as build_hook() but output chain policy is accept,
new connections are logged with the bypass prefix, and deny sets are
enforced so deny.list entries still block traffic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
allow_all
|
bool
|
If True, remove private-range reject rules. |
False
|
Source code in src/terok_shield/nft/rules.py
build_quarantine()
staticmethod
¶
Generate the quarantine-mode (total blackout) ruleset.
Drops all traffic except loopback and established connections. No DNS, no allowlists, no gateway ports. All dropped packets are tagged for the audit log.
Source code in src/terok_shield/nft/rules.py
verify_hook(nft_output)
¶
Check applied hook ruleset invariants. Returns errors (empty = OK).
Expects output from nft list table inet terok_shield (scoped to the
managed table), not nft list ruleset.
Verifies: - Managed table header is present - Default policy is drop - Both output and input chains exist - Reject type is present - Dual-stack allow sets are declared - Dual-stack deny sets are declared - All private ranges are present (RFC 1918 + RFC 4193/4291) - Terminal deny-all rule with BLOCKED prefix present
Source code in src/terok_shield/nft/rules.py
verify_bypass(nft_output, *, allow_all=False)
¶
Check applied bypass ruleset invariants. Returns errors (empty = OK).
Expects output from nft list table inet terok_shield (scoped to the
managed table), not nft list ruleset.
Verifies: - Managed table header is present - Output chain has policy accept - Input chain has policy drop - Bypass nflog prefix is present - Dual-stack allow sets are declared - Private-range reject rules present (unless allow_all)
Source code in src/terok_shield/nft/rules.py
verify_quarantine(nft_output)
staticmethod
¶
Check applied quarantine ruleset invariants. Returns errors (empty = OK).
Expects output from nft list table inet terok_shield (scoped to the
managed table), not nft list ruleset.
Verifies: - Managed table header is present - Both chains present with policy drop - Blocked log prefix present - No allow sets (total blackout means no allowlists)
Source code in src/terok_shield/nft/rules.py
add_elements_dual(ips)
¶
Classify IPs by family and generate add-element commands for both sets.
When the builder has a set_timeout configured (dnsmasq tier),
permanent IPs are written with timeout 0s so they do not auto-expire
along with dnsmasq-learned entries.
Source code in src/terok_shield/nft/rules.py
add_elements_dual(ips, *, permanent=False)
¶
Classify IPs by family and generate add-element commands for both sets.
IPv4 addresses go to allow_v4, IPv6 to allow_v6.
Returns empty string if no valid IPs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
permanent
|
bool
|
When |
False
|
Source code in src/terok_shield/nft/rules.py
add_deny_elements_dual(ips)
¶
Classify IPs by family and generate add-element commands for deny sets.
IPv4 addresses go to deny_v4, IPv6 to deny_v6.
Returns empty string if no valid IPs.
Source code in src/terok_shield/nft/rules.py
delete_deny_elements_dual(ips)
¶
Classify IPs by family and generate delete-element commands for deny sets.
IPv4 addresses target deny_v4, IPv6 target deny_v6.
Returns empty string if no valid IPs.
Source code in src/terok_shield/nft/rules.py
add_elements(set_name, ips, table=NFT_TABLE, *, timeout_zero=False)
¶
Generate nft command to add validated IPs to a set.
Both set_name and table are validated against injection.
Returns empty string if no valid IPs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout_zero
|
bool
|
When |
False
|
Source code in src/terok_shield/nft/rules.py
delete_elements(set_name, ips, table=NFT_TABLE)
¶
Generate nft command to delete validated IPs from a set.
Both set_name and table are validated against injection.
Returns empty string if no valid IPs.
Source code in src/terok_shield/nft/rules.py
safe_ip(value)
¶
Validate and normalize an IPv4 or IPv6 address or CIDR notation.
Prevents nft command injection by ensuring the value is a valid IP address or network. Returns the canonical string form so that string comparisons across state files (profile.allowed, live.allowed, deny.list) are reliable regardless of input notation.
Raises ValueError on invalid input.