nft
nft
¶
nftables ruleset generation.
+=====================================================+ | SECURITY BOUNDARY -- read this file first. | | | | Every nftables ruleset is generated here. | | All inputs are validated before interpolation. | | Only stdlib + nft_constants.py imports allowed. | +=====================================================+
RulesetBuilder(*, dns=PASTA_DNS, loopback_ports=(), set_timeout='')
¶
Builder pattern for nftables ruleset generation and verification.
Security boundary: only stdlib + nft_constants imports. All inputs validated before interpolation.
Binds dns and loopback_ports once at construction -- these
were previously threaded as parameters to every function call.
Create a builder with validated DNS and loopback port config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dns
|
str
|
DNS server address (pasta default forwarder). |
PASTA_DNS
|
loopback_ports
|
tuple[int, ...]
|
TCP ports to allow on the loopback interface.
When set, gateway port rules reference |
()
|
set_timeout
|
str
|
nft set element timeout (e.g. |
''
|
Source code in src/terok_shield/core/nft.py
build_hook(*, interactive=False)
¶
Generate the hook-mode (deny-all) nftables ruleset.
Source code in src/terok_shield/core/nft.py
build_bypass(*, allow_all=False)
¶
Generate the bypass-mode (accept-all + log) ruleset.
Source code in src/terok_shield/core/nft.py
verify_hook(nft_output, *, interactive=False)
¶
Check applied hook ruleset invariants. Returns errors (empty = OK).
verify_bypass(nft_output, *, allow_all=False)
¶
Check applied bypass ruleset invariants. Returns errors (empty = OK).
add_elements_dual(ips)
¶
Classify IPs by family and generate add-element commands for both sets.
When the builder has a set_timeout configured (dnsmasq tier),
permanent IPs are written with timeout 0s so they do not auto-expire
along with dnsmasq-learned entries.
Source code in src/terok_shield/core/nft.py
safe_ip(value)
¶
Validate and normalize an IPv4 or IPv6 address or CIDR notation.
Prevents nft command injection by ensuring the value is a valid IP address or network. Returns the canonical string form so that string comparisons across state files (profile.allowed, live.allowed, deny.list) are reliable regardless of input notation.
Raises ValueError on invalid input.
Source code in src/terok_shield/core/nft.py
hook_ruleset(dns=PASTA_DNS, loopback_ports=(), set_timeout='', *, interactive=False)
¶
Generate a per-container nftables ruleset for hook mode.
Applied by the OCI hook into the container's own netns. Dual-stack: both IPv4 and IPv6 use deny-all + allowlist.
gateway_v4 and gateway_v6 sets are always defined but start
empty. The OCI hook populates them from /proc/{pid}/net/route
after applying this ruleset; shield_up() repopulates them from
the persisted state/gateway file.
Chain order (output): loopback -> established -> DNS -> gateway ports -> loopback ports -> allow sets -> deny sets -> private-range reject -> deny/interactive-reject
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dns
|
str
|
DNS server address (pasta default forwarder). |
PASTA_DNS
|
loopback_ports
|
tuple[int, ...]
|
TCP ports to allow on the loopback interface. |
()
|
set_timeout
|
str
|
nft set element timeout (e.g. |
''
|
interactive
|
bool
|
When True, replaces the terminal deny-all rule with an NFLOG+reject rule using the QUEUED prefix for interactive handling. |
False
|
Source code in src/terok_shield/core/nft.py
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 | |
bypass_ruleset(dns=PASTA_DNS, loopback_ports=(), *, allow_all=False, set_timeout='')
¶
Generate a bypass (accept-all + log) nftables ruleset.
Same structure as hook_ruleset() but output chain policy is accept
and new connections are logged with the bypass prefix. Private-range
reject rules (private ranges) are kept unless
allow_all is True.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dns
|
str
|
DNS server address (pasta default forwarder). |
PASTA_DNS
|
loopback_ports
|
tuple[int, ...]
|
TCP ports to allow on the loopback interface. |
()
|
allow_all
|
bool
|
If True, remove private-range reject rules. |
False
|
set_timeout
|
str
|
nft set element timeout (e.g. |
''
|
Source code in src/terok_shield/core/nft.py
add_elements(set_name, ips, table=NFT_TABLE, *, timeout_zero=False)
¶
Generate nft command to add validated IPs to a set.
Both set_name and table are validated against injection.
Returns empty string if no valid IPs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout_zero
|
bool
|
When |
False
|
Source code in src/terok_shield/core/nft.py
add_elements_dual(ips, *, permanent=False)
¶
Classify IPs by family and generate add-element commands for both sets.
IPv4 addresses go to allow_v4, IPv6 to allow_v6.
Returns empty string if no valid IPs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
permanent
|
bool
|
When |
False
|
Source code in src/terok_shield/core/nft.py
add_deny_elements_dual(ips)
¶
Classify IPs by family and generate add-element commands for deny sets.
IPv4 addresses go to deny_v4, IPv6 to deny_v6.
Returns empty string if no valid IPs.
Source code in src/terok_shield/core/nft.py
delete_deny_elements_dual(ips)
¶
Classify IPs by family and generate delete-element commands for deny sets.
IPv4 addresses target deny_v4, IPv6 target deny_v6.
Returns empty string if no valid IPs.
Source code in src/terok_shield/core/nft.py
verify_ruleset(nft_output, *, interactive=False)
¶
Check applied ruleset invariants. Returns errors (empty = OK).
Verifies: - Default policy is drop - Both output and input chains exist - Reject type is present - Dual-stack allow sets are declared - Dual-stack deny sets are declared - All private ranges are present (RFC 1918 + RFC 4193/4291) - Interactive mode: queued nflog prefix present - Non-interactive mode: deny nflog prefix present
Source code in src/terok_shield/core/nft.py
verify_bypass_ruleset(nft_output, *, allow_all=False)
¶
Check applied bypass ruleset invariants. Returns errors (empty = OK).
Verifies: - Output chain has policy accept - Input chain has policy drop - Bypass nflog prefix is present - Dual-stack allow sets are declared - Private-range reject rules present (unless allow_all)