Skip to content

terok_shield

terok_shield

terok-shield: nftables-based egress firewalling for Podman containers.

Public API for standalone use and integration with terok.

The primary entry point is the Shield facade class:

>>> from terok_shield import Shield, ShieldConfig
>>> shield = Shield(ShieldConfig(state_dir=Path("/tmp/my-container")))
>>> shield.pre_start("my-container", ["dev-standard"])

Core and support layer modules are imported lazily — from terok_shield import ShieldConfig does not pull in nft, dnsmasq, or subprocess helpers. Heavy imports are deferred until Shield is instantiated.

AuditFileConfig

Bases: BaseModel

Audit section of config.yml.

DnsTier

Bases: Enum

DNS resolution tier for egress control.

Determines how domain-based allowlists are enforced:

Per-container dnsmasq with --nftset auto-populates nft

allow sets on every DNS query. Handles IP rotation.

DIG: Static resolution at pre-start via dig (current fallback). GETENT: Single-IP resolution via getent hosts (minimal fallback).

ShieldConfig(state_dir, mode=ShieldMode.HOOK, default_profiles=('dev-standard',), loopback_ports=(), audit_enabled=True, profiles_dir=None, interactive=False) dataclass

Per-container shield configuration.

The library is a pure function of its inputs. Given a ShieldConfig with state_dir, it writes to that directory and nowhere else. No env-var reading, no config-file parsing.

ShieldFileConfig

Bases: BaseModel

Validated schema for config.yml.

Loaded by the CLI at startup. extra="forbid" rejects unknown keys so typos (e.g. mod: hook) produce a clear error instead of being silently ignored.

ShieldMode

Bases: Enum

Operating mode for the shield firewall.

Currently only HOOK is supported. Future modes (e.g. bridge) will add members here.

ShieldState

Bases: Enum

Per-container shield state, derived from the live nft ruleset.

UP: Normal enforcing mode (deny-all). DOWN: Bypass mode with private-range protection (RFC 1918 + RFC 4193). DOWN_ALL: Bypass mode without private-range protection. INACTIVE: No ruleset found (container stopped or unshielded). ERROR: Ruleset present but unrecognised.

DnsResolver(*, runner)

Stateless DNS resolver — all persistence lives in the cache file.

The only dependency is a :class:CommandRunner for dig / getent subprocess calls.

Inject the command runner used for all DNS subprocess calls.

Source code in src/terok_shield/core/dns.py
def __init__(self, *, runner: CommandRunner) -> None:
    """Inject the command runner used for all DNS subprocess calls."""
    self._runner = runner

resolve_and_cache(entries, cache_path, *, max_age=3600)

Resolve profile entries and cache the result.

Profiles mix domain names with literal IPs/CIDRs — domains go through DNS resolution, literals pass through unchanged.

Parameters:

Name Type Description Default
entries list[str]

Domain names and/or raw IPs from composed profiles.

required
cache_path Path

File to store resolved IPs in, per-container scoped.

required
max_age int

Cache freshness threshold in seconds (default: 1 hour).

3600

Returns:

Type Description
list[str]

Resolved IPv4/IPv6 addresses combined with raw IPs/CIDRs.

Source code in src/terok_shield/core/dns.py
def resolve_and_cache(
    self,
    entries: list[str],
    cache_path: Path,
    *,
    max_age: int = 3600,
) -> list[str]:
    """Resolve profile entries and cache the result.

    Profiles mix domain names with literal IPs/CIDRs — domains go
    through DNS resolution, literals pass through unchanged.

    Args:
        entries: Domain names and/or raw IPs from composed profiles.
        cache_path: File to store resolved IPs in, per-container scoped.
        max_age: Cache freshness threshold in seconds (default: 1 hour).

    Returns:
        Resolved IPv4/IPv6 addresses combined with raw IPs/CIDRs.
    """
    if self._cache_fresh(cache_path, max_age):
        return self._read_cache(cache_path)

    domains, raw_ips = self._split_entries(entries)
    resolved = self.resolve_domains(domains)
    all_ips = raw_ips + resolved

    self._write_cache(cache_path, all_ips)
    return all_ips

resolve_domains(domains)

Resolve domain names to IP addresses (A + AAAA), best-effort.

Unresolvable domains are skipped with a warning. Results are deduplicated in first-seen order.

Source code in src/terok_shield/core/dns.py
def resolve_domains(self, domains: list[str]) -> list[str]:
    """Resolve domain names to IP addresses (A + AAAA), best-effort.

    Unresolvable domains are skipped with a warning.  Results are
    deduplicated in first-seen order.
    """
    seen: set[str] = set()
    result: list[str] = []
    use_getent = False
    for domain in domains:
        try:
            ips = self._resolve_one(domain, use_getent=use_getent)
        except DigNotFoundError:
            # dig missing — degrade gracefully for the rest of this batch
            logger.warning("dig not found — falling back to getent for DNS resolution")
            use_getent = True
            ips = self._resolve_one(domain, use_getent=True)
        if not ips:
            logger.warning("Domain %r resolved to no IPs (typo or DNS failure?)", domain)
        for ip in ips:
            if ip not in seen:
                seen.add(ip)
                result.append(ip)
    return result

RulesetBuilder(*, dns=PASTA_DNS, loopback_ports=(), set_timeout='')

Builder pattern for nftables ruleset generation and verification.

Security boundary: only stdlib + nft_constants imports. All inputs validated before interpolation.

Binds dns and loopback_ports once at construction -- these were previously threaded as parameters to every function call.

Create a builder with validated DNS and loopback port config.

Parameters:

Name Type Description Default
dns str

DNS server address (pasta default forwarder).

PASTA_DNS
loopback_ports tuple[int, ...]

TCP ports to allow on the loopback interface. When set, gateway port rules reference @gateway_v4/ @gateway_v6 sets populated at runtime by the OCI hook.

()
set_timeout str

nft set element timeout (e.g. 30m). When set, allow sets use flags interval, timeout so dnsmasq-populated IPs expire and are refreshed on the next DNS query.

''
Source code in src/terok_shield/core/nft.py
def __init__(
    self,
    *,
    dns: str = PASTA_DNS,
    loopback_ports: tuple[int, ...] = (),
    set_timeout: str = "",
) -> None:
    """Create a builder with validated DNS and loopback port config.

    Args:
        dns: DNS server address (pasta default forwarder).
        loopback_ports: TCP ports to allow on the loopback interface.
            When set, gateway port rules reference ``@gateway_v4``/
            ``@gateway_v6`` sets populated at runtime by the OCI hook.
        set_timeout: nft set element timeout (e.g. ``30m``).  When set,
            allow sets use ``flags interval, timeout`` so dnsmasq-populated
            IPs expire and are refreshed on the next DNS query.
    """
    dns = safe_ip(dns)
    for p in loopback_ports:
        _safe_port(p)
    if set_timeout:
        _safe_timeout(set_timeout)
    self._dns = dns
    self._loopback_ports = loopback_ports
    self._set_timeout = set_timeout

build_hook(*, interactive=False)

Generate the hook-mode (deny-all) nftables ruleset.

Source code in src/terok_shield/core/nft.py
def build_hook(self, *, interactive: bool = False) -> str:
    """Generate the hook-mode (deny-all) nftables ruleset."""
    return hook_ruleset(
        dns=self._dns,
        loopback_ports=self._loopback_ports,
        set_timeout=self._set_timeout,
        interactive=interactive,
    )

build_bypass(*, allow_all=False)

Generate the bypass-mode (accept-all + log) ruleset.

Source code in src/terok_shield/core/nft.py
def build_bypass(self, *, allow_all: bool = False) -> str:
    """Generate the bypass-mode (accept-all + log) ruleset."""
    return bypass_ruleset(
        dns=self._dns,
        loopback_ports=self._loopback_ports,
        allow_all=allow_all,
        set_timeout=self._set_timeout,
    )

verify_hook(nft_output, *, interactive=False)

Check applied hook ruleset invariants. Returns errors (empty = OK).

Source code in src/terok_shield/core/nft.py
def verify_hook(self, nft_output: str, *, interactive: bool = False) -> list[str]:
    """Check applied hook ruleset invariants.  Returns errors (empty = OK)."""
    return verify_ruleset(nft_output, interactive=interactive)

verify_bypass(nft_output, *, allow_all=False)

Check applied bypass ruleset invariants. Returns errors (empty = OK).

Source code in src/terok_shield/core/nft.py
def verify_bypass(self, nft_output: str, *, allow_all: bool = False) -> list[str]:
    """Check applied bypass ruleset invariants.  Returns errors (empty = OK)."""
    return verify_bypass_ruleset(nft_output, allow_all=allow_all)

add_elements_dual(ips)

Classify IPs by family and generate add-element commands for both sets.

When the builder has a set_timeout configured (dnsmasq tier), permanent IPs are written with timeout 0s so they do not auto-expire along with dnsmasq-learned entries.

Source code in src/terok_shield/core/nft.py
def add_elements_dual(self, ips: list[str]) -> str:
    """Classify IPs by family and generate add-element commands for both sets.

    When the builder has a ``set_timeout`` configured (dnsmasq tier),
    permanent IPs are written with ``timeout 0s`` so they do not auto-expire
    along with dnsmasq-learned entries.
    """
    return add_elements_dual(ips, permanent=bool(self._set_timeout))

CommandRunner

Bases: Protocol

Protocol for executing external commands.

Decouples all subprocess calls behind a testable interface.

run(cmd, *, check=True, stdin=None, timeout=None)

Run a command, return stdout.

Source code in src/terok_shield/core/run.py
def run(
    self,
    cmd: list[str],
    *,
    check: bool = True,
    stdin: str | None = None,
    timeout: int | None = None,
) -> str:
    """Run a command, return stdout."""
    ...

has(name)

Return True if an executable is on PATH.

Source code in src/terok_shield/core/run.py
def has(self, name: str) -> bool:
    """Return True if an executable is on PATH."""
    ...

nft(*args, stdin=None, check=True)

Run nft command directly (inside container netns).

Source code in src/terok_shield/core/run.py
def nft(self, *args: str, stdin: str | None = None, check: bool = True) -> str:
    """Run nft command directly (inside container netns)."""
    ...

nft_via_nsenter(container, *args, pid=None, stdin=None, check=True)

Run nft inside a running container's network namespace.

Source code in src/terok_shield/core/run.py
def nft_via_nsenter(
    self,
    container: str,
    *args: str,
    pid: str | None = None,
    stdin: str | None = None,
    check: bool = True,
) -> str:
    """Run nft inside a running container's network namespace."""
    ...

podman_inspect(container, fmt)

Inspect a container attribute via podman.

Source code in src/terok_shield/core/run.py
def podman_inspect(self, container: str, fmt: str) -> str:
    """Inspect a container attribute via podman."""
    ...

dig_all(domain, *, timeout=10)

Resolve domain to both IPv4 and IPv6 addresses.

Source code in src/terok_shield/core/run.py
def dig_all(self, domain: str, *, timeout: int = 10) -> list[str]:
    """Resolve domain to both IPv4 and IPv6 addresses."""
    ...

getent_hosts(domain)

Resolve domain via getent hosts (fallback when dig is missing).

Source code in src/terok_shield/core/run.py
def getent_hosts(self, domain: str) -> list[str]:
    """Resolve domain via ``getent hosts`` (fallback when dig is missing)."""
    ...

AuditLogger(*, audit_path, enabled=True)

JSON-lines audit logger for a single container.

Writes to a single file (audit_path). When disabled, all write operations are no-ops.

Create an audit logger.

Parameters:

Name Type Description Default
audit_path Path

Path to the .jsonl audit log file.

required
enabled bool

Whether logging is active (can be toggled later).

True
Source code in src/terok_shield/lib/audit.py
def __init__(self, *, audit_path: Path, enabled: bool = True) -> None:
    """Create an audit logger.

    Args:
        audit_path: Path to the ``.jsonl`` audit log file.
        enabled: Whether logging is active (can be toggled later).
    """
    self._audit_path = audit_path
    self._enabled = enabled

enabled property writable

Whether audit logging is active.

log_event(container, action, *, dest=None, detail=None)

Write a single audit event to the log file.

No-op when audit is disabled.

Parameters:

Name Type Description Default
container str

Container name.

required
action str

Event type (setup, teardown, allowed, denied).

required
dest str | None

Destination IP/domain (optional).

None
detail str | None

Additional detail string (optional).

None
Source code in src/terok_shield/lib/audit.py
def log_event(
    self,
    container: str,
    action: str,
    *,
    dest: str | None = None,
    detail: str | None = None,
) -> None:
    """Write a single audit event to the log file.

    No-op when audit is disabled.

    Args:
        container: Container name.
        action: Event type (setup, teardown, allowed, denied).
        dest: Destination IP/domain (optional).
        detail: Additional detail string (optional).
    """
    if not self._enabled:
        return
    entry: dict = {
        "ts": datetime.now(UTC).isoformat(timespec="seconds"),
        "container": container,
        "action": action,
    }
    if dest is not None:
        entry["dest"] = dest
    if detail is not None:
        entry["detail"] = detail

    try:
        self._audit_path.parent.mkdir(parents=True, exist_ok=True)
        with self._audit_path.open("a") as f:
            f.write(json.dumps(entry, separators=(",", ":")) + "\n")
    except OSError as e:
        logger.warning("Failed to write audit log to %s: %s", self._audit_path, e)

tail_log(n=50)

Yield the last n audit events.

Parameters:

Name Type Description Default
n int

Number of recent events to yield.

50
Source code in src/terok_shield/lib/audit.py
def tail_log(self, n: int = 50) -> Iterator[dict]:
    """Yield the last *n* audit events.

    Args:
        n: Number of recent events to yield.
    """
    if not self._audit_path.is_file():
        return

    lines = self._audit_path.read_text().splitlines()
    for line in lines[-n:] if n > 0 else []:
        try:
            yield json.loads(line)
        except json.JSONDecodeError:
            continue

ProfileLoader(*, user_dir, bundled_dir=None)

Loads and composes .txt allowlist profiles.

Searches user profiles first (overriding bundled), then falls back to the bundled profiles shipped with the package.

Create a profile loader.

Parameters:

Name Type Description Default
user_dir Path

User profiles directory (overrides bundled).

required
bundled_dir Path | None

Bundled profiles directory (auto-detected if None).

None
Source code in src/terok_shield/lib/profiles.py
def __init__(
    self,
    *,
    user_dir: Path,
    bundled_dir: Path | None = None,
) -> None:
    """Create a profile loader.

    Args:
        user_dir: User profiles directory (overrides bundled).
        bundled_dir: Bundled profiles directory (auto-detected if None).
    """
    self._user_dir = user_dir
    self._bundled_dir = bundled_dir or _bundled_dir()

load_profile(name)

Load a profile by name and return its entries.

User profiles take precedence over bundled profiles.

Raises:

Type Description
ValueError

If the name contains path separators or traversal.

FileNotFoundError

If the profile does not exist.

Source code in src/terok_shield/lib/profiles.py
def load_profile(self, name: str) -> list[str]:
    """Load a profile by name and return its entries.

    User profiles take precedence over bundled profiles.

    Raises:
        ValueError: If the name contains path separators or traversal.
        FileNotFoundError: If the profile does not exist.
    """
    path = self._find_profile(name)
    if path is None:
        raise FileNotFoundError(f"Profile not found: {name!r}")
    return _parse_entries(path.read_text())

compose_profiles(names)

Load and merge multiple profiles, deduplicating entries.

Preserves insertion order (first occurrence wins).

Raises:

Type Description
ValueError

If any name contains path separators or traversal.

FileNotFoundError

If any named profile does not exist.

Source code in src/terok_shield/lib/profiles.py
def compose_profiles(self, names: list[str]) -> list[str]:
    """Load and merge multiple profiles, deduplicating entries.

    Preserves insertion order (first occurrence wins).

    Raises:
        ValueError: If any name contains path separators or traversal.
        FileNotFoundError: If any named profile does not exist.
    """
    seen: set[str] = set()
    result: list[str] = []
    for name in names:
        for entry in self.load_profile(name):
            if entry not in seen:
                seen.add(entry)
                result.append(entry)
    return result

list_profiles()

List available profile names (bundled + user, deduplicated).

Source code in src/terok_shield/lib/profiles.py
def list_profiles(self) -> list[str]:
    """List available profile names (bundled + user, deduplicated)."""
    names: set[str] = set()
    for directory in (self._bundled_dir, self._user_dir):
        if directory.is_dir():
            names.update(f.stem for f in directory.glob("*.txt"))
    return sorted(names)

EnvironmentCheck(dns_tier='', ok=True, podman_version=(0,), hooks='per-container', health='ok', issues=list(), needs_setup=False, setup_hint='') dataclass

Result of :meth:Shield.check_environment.

Machine-readable fields for programmatic consumers (terok TUI, scripts). Human-readable issues and setup_hint for CLI display.

Attributes:

Name Type Description
ok bool

True if no issues found.

podman_version tuple[int, ...]

Detected podman version tuple.

hooks str

Hook installation type (per-container, global-system, global-user, not-installed).

health str

Environment health (ok, setup-needed, stale-hooks).

dns_tier str

Active DNS resolution tier (dnsmasq, dig, getent).

issues list[str]

List of human-readable issue descriptions.

needs_setup bool

True if one-time setup is required.

setup_hint str

Setup instructions (empty if not needed).

Shield(config, *, runner=None, audit=None, dns=None, profiles=None, ruleset=None)

Facade: primary public API for terok-shield.

Owns and wires together all service objects (audit, DNS, profiles, ruleset builder, mode backend). Construct once with a ShieldConfig and call methods for the full shield lifecycle.

All collaborators are injectable for testing.

Create the shield facade.

Parameters:

Name Type Description Default
config ShieldConfig

Shield configuration (must include state_dir).

required
runner CommandRunner | None

Command runner (default: SubprocessRunner).

None
audit AuditLogger | None

Audit logger (default: from config.state_dir).

None
dns DnsResolver | None

DNS resolver (default: from runner).

None
profiles ProfileLoader | None

Profile loader (default: from config.profiles_dir).

None
ruleset RulesetBuilder | None

Ruleset builder (default: from config loopback_ports).

None
Source code in src/terok_shield/__init__.py
def __init__(
    self,
    config: ShieldConfig,
    *,
    runner: "CommandRunner | None" = None,
    audit: "AuditLogger | None" = None,
    dns: "DnsResolver | None" = None,
    profiles: "ProfileLoader | None" = None,
    ruleset: "RulesetBuilder | None" = None,
) -> None:
    """Create the shield facade.

    Args:
        config: Shield configuration (must include state_dir).
        runner: Command runner (default: ``SubprocessRunner``).
        audit: Audit logger (default: from config.state_dir).
        dns: DNS resolver (default: from runner).
        profiles: Profile loader (default: from config.profiles_dir).
        ruleset: Ruleset builder (default: from config loopback_ports).
    """
    from .core import state
    from .core.dns import DnsResolver
    from .core.nft import RulesetBuilder
    from .core.run import SubprocessRunner
    from .lib.audit import AuditLogger
    from .lib.profiles import ProfileLoader

    self.config = config
    self.runner = runner or SubprocessRunner()
    self.audit = audit or AuditLogger(
        audit_path=state.audit_path(config.state_dir),
        enabled=config.audit_enabled,
    )
    self.dns = dns or DnsResolver(runner=self.runner)
    self.profiles = profiles or ProfileLoader(
        user_dir=config.profiles_dir or Path("/nonexistent"),
    )
    self.ruleset = ruleset or RulesetBuilder(loopback_ports=config.loopback_ports)
    self._mode = self._create_mode(config.mode)

check_environment()

Check the podman environment for compatibility issues.

Proactive check for API consumers (e.g. terok). Returns an :class:EnvironmentCheck with detected issues and setup hints. Does not raise — the caller decides how to handle issues.

Source code in src/terok_shield/__init__.py
def check_environment(self) -> EnvironmentCheck:
    """Check the podman environment for compatibility issues.

    Proactive check for API consumers (e.g. terok).  Returns an
    :class:`EnvironmentCheck` with detected issues and setup hints.
    Does not raise — the caller decides how to handle issues.
    """
    from .core import dnsmasq, state

    output = self.runner.run(["podman", "info", "-f", "json"], check=False)
    info = parse_podman_info(output)
    issues: list[str] = []
    needs_setup = False
    setup_hint = ""
    hooks = "per-container"
    health = "ok"

    tier = detect_dns_tier(self.runner.has, lambda: dnsmasq.has_nftset_support(self.runner))
    dns_tier = tier.value
    if tier == DnsTier.DIG:
        issues.append(
            "dnsmasq not found — domain allowlisting uses static pre-start "
            "resolution (no IP rotation handling). "
            "Install dnsmasq for dynamic domain-based egress control"
        )
    elif tier == DnsTier.GETENT:
        issues.append(
            "Neither dnsmasq nor dig found — DNS resolution uses getent "
            "(single IP, no AAAA). Install dnsmasq or at minimum dnsutils/bind-utils"
        )

    hooks_dirs = find_hooks_dirs()
    global_hooks = has_global_hooks(hooks_dirs)

    if not info.hooks_dir_persists:
        if global_hooks:
            sys_dir = system_hooks_dir()
            hooks = "global-system" if has_global_hooks([sys_dir]) else "global-user"
            health = "ok"
            # Check hook version matches current package
            hook_ver = _read_installed_hook_version(hooks_dirs)
            if hook_ver is not None and hook_ver != state.BUNDLE_VERSION:
                health = "stale-hooks"
                issues.append(
                    f"Installed hook version {hook_ver} != expected {state.BUNDLE_VERSION}. "
                    "Run `terok-shield setup` to update."
                )
        else:
            hooks = "not-installed"
            health = "setup-needed"
            needs_setup = True
            setup_hint = global_hooks_hint()
            issues.append(
                "Global hooks not installed - containers will lose firewall on restart"
            )
    elif global_hooks:
        health = "stale-hooks"
        issues.append(
            "Stale global hooks detected - not needed on podman >= 5.6.0. "
            "Consider removing them."
        )

    return EnvironmentCheck(
        ok=not issues,
        podman_version=info.version,
        hooks=hooks,
        health=health,
        dns_tier=dns_tier,
        issues=issues,
        needs_setup=needs_setup,
        setup_hint=setup_hint,
    )

status()

Return current shield status information.

Source code in src/terok_shield/__init__.py
def status(self) -> dict:
    """Return current shield status information."""
    return {
        "mode": self.config.mode.value,
        "profiles": self.profiles.list_profiles(),
        "audit_enabled": self.config.audit_enabled,
    }

pre_start(container, profiles=None)

Prepare shield for container start. Returns extra podman args.

Source code in src/terok_shield/__init__.py
def pre_start(self, container: str, profiles: list[str] | None = None) -> list[str]:
    """Prepare shield for container start.  Returns extra podman args."""
    if profiles is None:
        profiles = list(self.config.default_profiles)
    result = self._mode.pre_start(container, profiles)
    self.audit.log_event(container, "setup", detail=f"profiles={','.join(profiles)}")
    return result

allow(container, target)

Live-allow a domain or IP for a running container.

Source code in src/terok_shield/__init__.py
def allow(self, container: str, target: str) -> list[str]:
    """Live-allow a domain or IP for a running container."""
    from .core.run import ExecError

    is_domain = not _is_ip(target)
    ips = [target] if not is_domain else self.dns.resolve_domains([target])
    allowed: list[str] = []
    for ip in ips:
        try:
            self._mode.allow_ip(container, ip)
        except (ExecError, OSError) as exc:
            logger.warning("allow_ip failed for %s on %s: %s", ip, container, exc)
            continue
        allowed.append(ip)
        self.audit.log_event(container, "allowed", dest=ip, detail=f"target={target}")
    # Update dnsmasq config for domain targets (so future IP rotations are captured)
    if is_domain and allowed:
        self._mode.allow_domain(target)
    return allowed

deny(container, target)

Live-deny a domain or IP for a running container.

Source code in src/terok_shield/__init__.py
def deny(self, container: str, target: str) -> list[str]:
    """Live-deny a domain or IP for a running container."""
    from .core.run import ExecError

    is_domain = not _is_ip(target)
    ips = [target] if not is_domain else self.dns.resolve_domains([target])
    denied: list[str] = []
    for ip in ips:
        try:
            self._mode.deny_ip(container, ip)
        except (ExecError, OSError) as exc:
            logger.warning("deny_ip failed for %s on %s: %s", ip, container, exc)
            continue
        denied.append(ip)
        self.audit.log_event(container, "denied", dest=ip, detail=f"target={target}")
    # Remove domain from dnsmasq config (stops future auto-population)
    if is_domain and denied:
        self._mode.deny_domain(target)
    return denied

rules(container)

Return current nft rules for a container.

Source code in src/terok_shield/__init__.py
def rules(self, container: str) -> str:
    """Return current nft rules for a container."""
    return self._mode.list_rules(container)

down(container, *, allow_all=False)

Switch a running container to bypass mode.

Source code in src/terok_shield/__init__.py
def down(self, container: str, *, allow_all: bool = False) -> None:
    """Switch a running container to bypass mode."""
    self._mode.shield_down(container, allow_all=allow_all)
    self.audit.log_event(
        container,
        "shield_down",
        detail="allow_all=True" if allow_all else None,
    )

up(container)

Restore normal deny-all mode for a running container.

Source code in src/terok_shield/__init__.py
def up(self, container: str) -> None:
    """Restore normal deny-all mode for a running container."""
    self._mode.shield_up(container)
    self.audit.log_event(container, "shield_up")

state(container)

Query the live nft ruleset to determine a container's shield state.

Source code in src/terok_shield/__init__.py
def state(self, container: str) -> ShieldState:
    """Query the live nft ruleset to determine a container's shield state."""
    return self._mode.shield_state(container)

preview(*, down=False, allow_all=False)

Generate the ruleset that would be applied to a container.

Source code in src/terok_shield/__init__.py
def preview(self, *, down: bool = False, allow_all: bool = False) -> str:
    """Generate the ruleset that would be applied to a container."""
    return self._mode.preview(down=down, allow_all=allow_all)

resolve(profiles=None, *, force=False)

Resolve DNS profiles and cache the results.

Source code in src/terok_shield/__init__.py
def resolve(
    self,
    profiles: list[str] | None = None,
    *,
    force: bool = False,
) -> list[str]:
    """Resolve DNS profiles and cache the results."""
    from .core import state

    if profiles is None:
        profiles = list(self.config.default_profiles)
    entries = self.profiles.compose_profiles(profiles)
    if not entries:
        return []
    max_age = 0 if force else 3600
    cache_path = state.profile_allowed_path(self.config.state_dir)
    return self.dns.resolve_and_cache(entries, cache_path, max_age=max_age)

profiles_list()

List available profile names.

Source code in src/terok_shield/__init__.py
def profiles_list(self) -> list[str]:
    """List available profile names."""
    return self.profiles.list_profiles()

tail_log(n=50)

Yield the last n audit events.

Source code in src/terok_shield/__init__.py
def tail_log(self, n: int = 50) -> Iterator[dict]:
    """Yield the last *n* audit events."""
    return self.audit.tail_log(n)

compose_profiles(names)

Load and merge multiple profiles.

Source code in src/terok_shield/__init__.py
def compose_profiles(self, names: list[str]) -> list[str]:
    """Load and merge multiple profiles."""
    return self.profiles.compose_profiles(names)

ensure_containers_conf_hooks_dir(hooks_dir)

Ensure ~/.config/containers/containers.conf includes hooks_dir.

Creates the file if absent. Inserts hooks_dir into the existing [engine] section, or appends a new section if none exists. Warns (does not fail) if hooks_dir is already set differently.

Uses line-based text manipulation to preserve comments and formatting.

Source code in src/terok_shield/common/podman_info.py
def ensure_containers_conf_hooks_dir(hooks_dir: Path) -> None:
    """Ensure ``~/.config/containers/containers.conf`` includes *hooks_dir*.

    Creates the file if absent.  Inserts ``hooks_dir`` into the existing
    ``[engine]`` section, or appends a new section if none exists.
    Warns (does not fail) if ``hooks_dir`` is already set differently.

    Uses line-based text manipulation to preserve comments and formatting.
    """
    conf_path = _user_containers_conf()
    hooks_str = str(hooks_dir)
    hooks_line = f'hooks_dir = ["{hooks_str}"]'

    if not conf_path.is_file():
        conf_path.parent.mkdir(parents=True, exist_ok=True)
        conf_path.write_text(f"[engine]\n{hooks_line}\n")
        return

    existing = _parse_hooks_dir_from_conf(conf_path)
    if not existing:
        _insert_hooks_line(conf_path, hooks_line)
        return

    if hooks_str in existing or str(hooks_dir.expanduser()) in existing:
        return  # already configured
    print(
        f"Warning: {conf_path} already has hooks_dir = {existing}\n"
        f"Add {hooks_str!r} to the list manually if needed."
    )

system_hooks_dir()

Return the best system-level hooks directory.

Prefers existing directories; falls back to /etc/containers/oci/hooks.d.

Source code in src/terok_shield/common/podman_info.py
def system_hooks_dir() -> Path:
    """Return the best system-level hooks directory.

    Prefers existing directories; falls back to ``/etc/containers/oci/hooks.d``.
    """
    for d in _SYSTEM_HOOKS_DIRS:
        if d.is_dir():
            return d
    return _SYSTEM_HOOKS_DIRS[-1]

__getattr__(name)

Lazy import for re-exported core/support layer names.

Source code in src/terok_shield/__init__.py
def __getattr__(name: str) -> object:
    """Lazy import for re-exported core/support layer names."""
    if name in _LAZY_IMPORTS:
        mod_path, attr = _LAZY_IMPORTS[name]
        mod = _importlib.import_module(mod_path)
        value = getattr(mod, attr)
        globals()[name] = value  # cache for subsequent access
        return value
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")