terok_shield
terok_shield
¶
terok-shield: nftables-based egress firewalling for Podman containers.
Public API for standalone use and integration with terok.
The primary entry point is the Shield facade class:
>>> from terok_shield import Shield, ShieldConfig
>>> shield = Shield(ShieldConfig(state_dir=Path("/tmp/my-container")))
>>> shield.pre_start("my-container", ["dev-standard"])
Core and support layer modules are imported lazily — from terok_shield
import ShieldConfig does not pull in nft, dnsmasq, or subprocess
helpers. Heavy imports are deferred until Shield is instantiated.
AuditFileConfig
¶
Bases: BaseModel
Audit section of config.yml.
DnsTier
¶
Bases: Enum
DNS resolution tier for egress control.
Determines how domain-based allowlists are enforced:
Per-container dnsmasq with --nftset auto-populates nft
allow sets on every DNS query. Handles IP rotation.
DIG: Static resolution at pre-start via dig (current fallback).
GETENT: Single-IP resolution via getent hosts (minimal fallback).
ShieldConfig(state_dir, mode=ShieldMode.HOOK, default_profiles=('dev-standard',), loopback_ports=(), audit_enabled=True, profiles_dir=None, interactive=False)
dataclass
¶
Per-container shield configuration.
The library is a pure function of its inputs. Given a
ShieldConfig with state_dir, it writes to that directory
and nowhere else. No env-var reading, no config-file parsing.
ShieldFileConfig
¶
Bases: BaseModel
Validated schema for config.yml.
Loaded by the CLI at startup. extra="forbid" rejects unknown
keys so typos (e.g. mod: hook) produce a clear error instead
of being silently ignored.
ShieldMode
¶
Bases: Enum
Operating mode for the shield firewall.
Currently only HOOK is supported. Future modes (e.g. bridge) will add members here.
ShieldState
¶
Bases: Enum
Per-container shield state, derived from the live nft ruleset.
UP: Normal enforcing mode (deny-all). DOWN: Bypass mode with private-range protection (RFC 1918 + RFC 4193). DOWN_ALL: Bypass mode without private-range protection. INACTIVE: No ruleset found (container stopped or unshielded). ERROR: Ruleset present but unrecognised.
DnsResolver(*, runner)
¶
Stateless DNS resolver — all persistence lives in the cache file.
The only dependency is a :class:CommandRunner for dig / getent
subprocess calls.
Inject the command runner used for all DNS subprocess calls.
Source code in src/terok_shield/core/dns.py
resolve_and_cache(entries, cache_path, *, max_age=3600)
¶
Resolve profile entries and cache the result.
Profiles mix domain names with literal IPs/CIDRs — domains go through DNS resolution, literals pass through unchanged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entries
|
list[str]
|
Domain names and/or raw IPs from composed profiles. |
required |
cache_path
|
Path
|
File to store resolved IPs in, per-container scoped. |
required |
max_age
|
int
|
Cache freshness threshold in seconds (default: 1 hour). |
3600
|
Returns:
| Type | Description |
|---|---|
list[str]
|
Resolved IPv4/IPv6 addresses combined with raw IPs/CIDRs. |
Source code in src/terok_shield/core/dns.py
resolve_domains(domains)
¶
Resolve domain names to IP addresses (A + AAAA), best-effort.
Unresolvable domains are skipped with a warning. Results are deduplicated in first-seen order.
Source code in src/terok_shield/core/dns.py
RulesetBuilder(*, dns=PASTA_DNS, loopback_ports=(), set_timeout='')
¶
Builder pattern for nftables ruleset generation and verification.
Security boundary: only stdlib + nft_constants imports. All inputs validated before interpolation.
Binds dns and loopback_ports once at construction -- these
were previously threaded as parameters to every function call.
Create a builder with validated DNS and loopback port config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dns
|
str
|
DNS server address (pasta default forwarder). |
PASTA_DNS
|
loopback_ports
|
tuple[int, ...]
|
TCP ports to allow on the loopback interface.
When set, gateway port rules reference |
()
|
set_timeout
|
str
|
nft set element timeout (e.g. |
''
|
Source code in src/terok_shield/core/nft.py
build_hook(*, interactive=False)
¶
Generate the hook-mode (deny-all) nftables ruleset.
Source code in src/terok_shield/core/nft.py
build_bypass(*, allow_all=False)
¶
Generate the bypass-mode (accept-all + log) ruleset.
Source code in src/terok_shield/core/nft.py
verify_hook(nft_output, *, interactive=False)
¶
Check applied hook ruleset invariants. Returns errors (empty = OK).
verify_bypass(nft_output, *, allow_all=False)
¶
Check applied bypass ruleset invariants. Returns errors (empty = OK).
add_elements_dual(ips)
¶
Classify IPs by family and generate add-element commands for both sets.
When the builder has a set_timeout configured (dnsmasq tier),
permanent IPs are written with timeout 0s so they do not auto-expire
along with dnsmasq-learned entries.
Source code in src/terok_shield/core/nft.py
CommandRunner
¶
Bases: Protocol
Protocol for executing external commands.
Decouples all subprocess calls behind a testable interface.
AuditLogger(*, audit_path, enabled=True)
¶
JSON-lines audit logger for a single container.
Writes to a single file (audit_path). When disabled, all
write operations are no-ops.
Create an audit logger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
audit_path
|
Path
|
Path to the |
required |
enabled
|
bool
|
Whether logging is active (can be toggled later). |
True
|
Source code in src/terok_shield/lib/audit.py
enabled
property
writable
¶
Whether audit logging is active.
log_event(container, action, *, dest=None, detail=None)
¶
Write a single audit event to the log file.
No-op when audit is disabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container
|
str
|
Container name. |
required |
action
|
str
|
Event type (setup, teardown, allowed, denied). |
required |
dest
|
str | None
|
Destination IP/domain (optional). |
None
|
detail
|
str | None
|
Additional detail string (optional). |
None
|
Source code in src/terok_shield/lib/audit.py
tail_log(n=50)
¶
Yield the last n audit events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n
|
int
|
Number of recent events to yield. |
50
|
Source code in src/terok_shield/lib/audit.py
ProfileLoader(*, user_dir, bundled_dir=None)
¶
Loads and composes .txt allowlist profiles.
Searches user profiles first (overriding bundled), then falls back to the bundled profiles shipped with the package.
Create a profile loader.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_dir
|
Path
|
User profiles directory (overrides bundled). |
required |
bundled_dir
|
Path | None
|
Bundled profiles directory (auto-detected if None). |
None
|
Source code in src/terok_shield/lib/profiles.py
load_profile(name)
¶
Load a profile by name and return its entries.
User profiles take precedence over bundled profiles.
Raises:
| Type | Description |
|---|---|
ValueError
|
If the name contains path separators or traversal. |
FileNotFoundError
|
If the profile does not exist. |
Source code in src/terok_shield/lib/profiles.py
compose_profiles(names)
¶
Load and merge multiple profiles, deduplicating entries.
Preserves insertion order (first occurrence wins).
Raises:
| Type | Description |
|---|---|
ValueError
|
If any name contains path separators or traversal. |
FileNotFoundError
|
If any named profile does not exist. |
Source code in src/terok_shield/lib/profiles.py
list_profiles()
¶
List available profile names (bundled + user, deduplicated).
Source code in src/terok_shield/lib/profiles.py
EnvironmentCheck(dns_tier='', ok=True, podman_version=(0,), hooks='per-container', health='ok', issues=list(), needs_setup=False, setup_hint='')
dataclass
¶
Result of :meth:Shield.check_environment.
Machine-readable fields for programmatic consumers (terok TUI, scripts).
Human-readable issues and setup_hint for CLI display.
Attributes:
| Name | Type | Description |
|---|---|---|
ok |
bool
|
True if no issues found. |
podman_version |
tuple[int, ...]
|
Detected podman version tuple. |
hooks |
str
|
Hook installation type ( |
health |
str
|
Environment health ( |
dns_tier |
str
|
Active DNS resolution tier ( |
issues |
list[str]
|
List of human-readable issue descriptions. |
needs_setup |
bool
|
True if one-time setup is required. |
setup_hint |
str
|
Setup instructions (empty if not needed). |
Shield(config, *, runner=None, audit=None, dns=None, profiles=None, ruleset=None)
¶
Facade: primary public API for terok-shield.
Owns and wires together all service objects (audit, DNS, profiles,
ruleset builder, mode backend). Construct once with a
ShieldConfig and call methods for the full shield lifecycle.
All collaborators are injectable for testing.
Create the shield facade.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ShieldConfig
|
Shield configuration (must include state_dir). |
required |
runner
|
CommandRunner | None
|
Command runner (default: |
None
|
audit
|
AuditLogger | None
|
Audit logger (default: from config.state_dir). |
None
|
dns
|
DnsResolver | None
|
DNS resolver (default: from runner). |
None
|
profiles
|
ProfileLoader | None
|
Profile loader (default: from config.profiles_dir). |
None
|
ruleset
|
RulesetBuilder | None
|
Ruleset builder (default: from config loopback_ports). |
None
|
Source code in src/terok_shield/__init__.py
check_environment()
¶
Check the podman environment for compatibility issues.
Proactive check for API consumers (e.g. terok). Returns an
:class:EnvironmentCheck with detected issues and setup hints.
Does not raise — the caller decides how to handle issues.
Source code in src/terok_shield/__init__.py
status()
¶
Return current shield status information.
pre_start(container, profiles=None)
¶
Prepare shield for container start. Returns extra podman args.
Source code in src/terok_shield/__init__.py
allow(container, target)
¶
Live-allow a domain or IP for a running container.
Source code in src/terok_shield/__init__.py
deny(container, target)
¶
Live-deny a domain or IP for a running container.
Source code in src/terok_shield/__init__.py
rules(container)
¶
down(container, *, allow_all=False)
¶
Switch a running container to bypass mode.
Source code in src/terok_shield/__init__.py
up(container)
¶
state(container)
¶
preview(*, down=False, allow_all=False)
¶
Generate the ruleset that would be applied to a container.
resolve(profiles=None, *, force=False)
¶
Resolve DNS profiles and cache the results.
Source code in src/terok_shield/__init__.py
profiles_list()
¶
tail_log(n=50)
¶
ensure_containers_conf_hooks_dir(hooks_dir)
¶
Ensure ~/.config/containers/containers.conf includes hooks_dir.
Creates the file if absent. Inserts hooks_dir into the existing
[engine] section, or appends a new section if none exists.
Warns (does not fail) if hooks_dir is already set differently.
Uses line-based text manipulation to preserve comments and formatting.
Source code in src/terok_shield/common/podman_info.py
system_hooks_dir()
¶
Return the best system-level hooks directory.
Prefers existing directories; falls back to /etc/containers/oci/hooks.d.
Source code in src/terok_shield/common/podman_info.py
__getattr__(name)
¶
Lazy import for re-exported core/support layer names.