Skip to content

mode

mode

Hook mode: OCI hooks + per-container netns.

Uses OCI hooks to apply per-container nftables rules inside each container's network namespace. No root required — only podman and nft.

Orchestrates collaborators per lifecycle phase:

  • RulesetBuilder (nft.rules) — generates and verifies nft rulesets
  • DnsResolver (dns.resolver) — pre-start domain resolution
  • ProfileLoader (profiles) — allowlist profile composition
  • AuditLogger (audit) — event logging
  • CommandRunner (run) — subprocess execution (nft, nsenter)
  • dnsmasq (dns.dnsmasq) — runtime DNS with nftset auto-population
  • hook_install (hooks.install) — OCI hook file generation
  • state (state) — per-container state bundle I/O

logger = logging.getLogger(__name__) module-attribute

HookMode(*, config, runner, audit, dns, profiles, ruleset)

Hook-mode shield backend (Strategy, implements ShieldModeBackend).

Coordinates the full lifecycle of OCI-hook-based container firewalling. Delegates to RulesetBuilder for nft generation, DnsResolver for name resolution, ProfileLoader for allowlists, dnsmasq for runtime DNS, and state for per-container persistence.

Create a hook mode backend with all collaborators.

Parameters:

Name Type Description Default
config ShieldConfig

Shield configuration (provides state_dir).

required
runner CommandRunner

Command runner for subprocess calls.

required
audit AuditLogger

Audit logger for event logging.

required
dns DnsResolver

DNS resolver for domain resolution and caching.

required
profiles ProfileLoader

Profile loader for allowlist profiles.

required
ruleset RulesetBuilder

Ruleset builder for nft generation and verification.

required
Source code in src/terok_shield/hooks/mode.py
def __init__(
    self,
    *,
    config: ShieldConfig,
    runner: "CommandRunner",
    audit: "AuditLogger",
    dns: "DnsResolver",
    profiles: "ProfileLoader",
    ruleset: RulesetBuilder,
) -> None:
    """Create a hook mode backend with all collaborators.

    Args:
        config: Shield configuration (provides state_dir).
        runner: Command runner for subprocess calls.
        audit: Audit logger for event logging.
        dns: DNS resolver for domain resolution and caching.
        profiles: Profile loader for allowlist profiles.
        ruleset: Ruleset builder for nft generation and verification.
    """
    self._config = config
    self._runner = runner
    self._audit = audit
    self._dns = dns
    self._profiles = profiles
    self._ruleset = ruleset
    self._podman_info: PodmanInfo | None = None
    self._gateways: tuple[str, str] | None = None

pre_start(container, profiles)

Prepare for container start in hook mode.

Installs hooks, composes profiles, resolves DNS, writes allowlist, detects DNS tier, sets annotations, and returns the podman CLI arguments needed for shield protection.

Raises:

Type Description
ShieldNeedsSetup

When global hooks are not installed (see WORKAROUND(hooks-dir-persist)).

Source code in src/terok_shield/hooks/mode.py
def pre_start(self, container: str, profiles: list[str]) -> list[str]:
    """Prepare for container start in hook mode.

    Installs hooks, composes profiles, resolves DNS, writes
    allowlist, detects DNS tier, sets annotations, and returns
    the podman CLI arguments needed for shield protection.

    Raises:
        ShieldNeedsSetup: When global hooks are not installed
            (see ``WORKAROUND(hooks-dir-persist)``).
    """
    sd = self._config.state_dir.resolve()
    info = self._get_podman_info()

    # Ensure state dirs and install hooks (idempotent)
    StateBundle(sd).ensure_dirs()
    install_hooks(
        hook_entrypoint=StateBundle(sd).hook_entrypoint,
        hooks_dir=StateBundle(sd).hooks_dir,
    )

    # Detect DNS tier, upstream DNS, and gateway addresses
    tier = self._detect_dns_tier(container, sd)
    mode = info.network_mode or "pasta"
    upstream_dns = _upstream_dns_for_mode(mode)
    gw_v4, gw_v6 = self._gateways = _gateways_for_mode(mode)

    # Resolve DNS, write allowlists, generate ruleset + dnsmasq config.
    # ``loopback.ports`` is persisted before ``_write_ruleset`` runs so
    # the builder reads ports from the bundle (SSOT): later up/down
    # rebuilds use the same source.
    entries = self._profiles.compose_profiles(profiles)
    self._resolve_and_write_allowlists(sd, tier, entries)
    StateBundle(sd).upstream_dns.write_text(f"{upstream_dns}\n")
    StateBundle(sd).dns_tier.write_text(f"{tier.value}\n")
    StateBundle(sd).loopback_ports.write_text(
        "".join(f"{p}\n" for p in self._config.loopback_ports)
    )
    self._write_ruleset(sd, tier, upstream_dns, gw_v4, gw_v6)
    self._write_dnsmasq_config_or_scrub(sd, tier, upstream_dns)

    # Build podman args
    args = self._build_network_args(mode)

    # Redirect container DNS through per-container dnsmasq via volume mount.
    # See commit history for detailed rationale on why --dns cannot be used.
    if tier == DnsTier.DNSMASQ:
        args += ["--volume", f"{StateBundle(sd).resolv_conf}:/etc/resolv.conf:ro,Z"]

    # Annotations: profiles, name, state_dir, version, dns.  loopback_ports
    # lives in the state bundle (per-container, written above), not as an
    # annotation — annotations are write-only on shield's side.
    args += [
        "--annotation",
        f"{ANNOTATION_KEY}={ANNOTATION_LIST_SEP.join(profiles)}",
        "--annotation",
        f"{ANNOTATION_NAME_KEY}={container}",
        "--annotation",
        f"{ANNOTATION_STATE_DIR_KEY}={sd}",
        "--annotation",
        f"{ANNOTATION_VERSION_KEY}={state.BUNDLE_VERSION}",
        "--annotation",
        f"{ANNOTATION_AUDIT_ENABLED_KEY}={str(self._config.audit_enabled).lower()}",
        "--annotation",
        f"{ANNOTATION_UPSTREAM_DNS_KEY}={upstream_dns}",
        "--annotation",
        f"{ANNOTATION_DNS_TIER_KEY}={tier.value}",
    ]

    # WORKAROUND(hooks-dir-persist): currently always takes the global path
    if info.hooks_dir_persists:
        args += ["--hooks-dir", str(StateBundle(sd).hooks_dir)]
    elif has_global_hooks():
        self._audit.log_event(
            container,
            "setup",
            detail=(
                f"podman {'.'.join(str(v) for v in info.version)}: "
                "using global hooks dir (--hooks-dir does not persist on restart)"
            ),
        )
    else:
        raise ShieldNeedsSetup(
            f"Podman {'.'.join(str(v) for v in info.version)} detected.\n\n"
            + global_hooks_hint()
        )

    args += [
        "--cap-drop",
        "NET_ADMIN",
        "--cap-drop",
        "NET_RAW",
    ]
    return args

allow_domain(domain)

Add a domain to the dnsmasq config and signal reload.

Delegates to dnsmasq.add_domain(), which persists the domain to live.domains (not profile.domains) and removes any matching entry from denied.domains. When dnsmasq is running, a SIGHUP is sent so the change takes effect immediately without a container restart. These entries are runtime additions: they survive dnsmasq reloads but are separate from the pre-start profile.domains list.

The IP-level allow (nft set update) is handled separately by allow_ip() — this method is the domain-tracking counterpart that ensures future IP rotations are also captured.

No-op when the container is not using the dnsmasq DNS tier (the static IP-level allow already happened via allow_ip()).

Source code in src/terok_shield/hooks/mode.py
def allow_domain(self, domain: str) -> None:
    """Add a domain to the dnsmasq config and signal reload.

    Delegates to ``dnsmasq.add_domain()``, which persists the domain to
    ``live.domains`` (not ``profile.domains``) and removes any matching
    entry from ``denied.domains``.  When dnsmasq is running, a SIGHUP is
    sent so the change takes effect immediately without a container restart.
    These entries are runtime additions: they survive dnsmasq reloads but
    are separate from the pre-start ``profile.domains`` list.

    The IP-level allow (nft set update) is handled separately by
    ``allow_ip()`` — this method is the domain-tracking counterpart
    that ensures future IP rotations are also captured.

    No-op when the container is not using the dnsmasq DNS tier (the
    static IP-level allow already happened via ``allow_ip()``).
    """
    sd = self._config.state_dir.resolve()
    if not _is_dnsmasq_tier(sd):
        return
    if not dnsmasq.add_domain(sd, domain):
        return  # already present
    self._reload_dnsmasq(sd)

deny_domain(domain)

Remove a domain from the dnsmasq config and signal reload.

Counterpart of allow_domain(). Removes the domain so dnsmasq stops auto-populating nft sets for it on future DNS queries.

No-op when the container is not using the dnsmasq DNS tier.

Source code in src/terok_shield/hooks/mode.py
def deny_domain(self, domain: str) -> None:
    """Remove a domain from the dnsmasq config and signal reload.

    Counterpart of ``allow_domain()``.  Removes the domain so dnsmasq
    stops auto-populating nft sets for it on future DNS queries.

    No-op when the container is not using the dnsmasq DNS tier.
    """
    sd = self._config.state_dir.resolve()
    if not _is_dnsmasq_tier(sd):
        return
    if not dnsmasq.remove_domain(sd, domain):
        return  # not present
    self._reload_dnsmasq(sd)

allow_ip(container, ip)

Live-allow an IP for a running container via nsenter.

Source code in src/terok_shield/hooks/mode.py
def allow_ip(self, container: str, ip: str) -> None:
    """Live-allow an IP for a running container via nsenter."""
    ip = safe_ip(ip)

    # Un-deny: remove from deny.list and nft deny set if present
    sd = self._config.state_dir.resolve()
    dp = StateBundle(sd).deny
    if dp.is_file():
        denied = StateBundle(sd).read_denied_ips()
        if ip in denied:
            denied.discard(ip)
            dp.write_text("".join(f"{d}\n" for d in sorted(denied)))
            nft_cmd = delete_deny_elements_dual([ip])
            if nft_cmd:
                self._nft_apply_best_effort(container, nft_cmd)

    # When the dnsmasq set has a default timeout (30 m), permanent IPs must use
    # 'timeout 0s' so they are never evicted by the set's per-element expiry clock.
    tier_path = StateBundle(sd).dns_tier
    if tier_path.is_file() and tier_path.read_text().strip() == DnsTier.DNSMASQ.value:
        element = f"{{ {ip} timeout 0s }}"
    else:
        element = f"{{ {ip} }}"

    self._runner.nft_via_nsenter(
        container,
        "add",
        "element",
        "inet",
        "terok_shield",
        self._set_for_ip(ip),
        element,
    )
    # Persist to live.allowed (skip if already present)
    live_path = self._live_path()
    live_path.parent.mkdir(parents=True, exist_ok=True)
    existing = set(live_path.read_text().splitlines()) if live_path.is_file() else set()
    if ip not in existing:
        with live_path.open("a") as f:
            f.write(f"{ip}\n")

deny_ip(container, ip)

Live-deny an IP for a running container via nsenter.

Removes from the nft allow set (best-effort) and from live.allowed. Adds to the nft deny set. Always persists to deny.list so operator deny decisions stick across restarts.

Source code in src/terok_shield/hooks/mode.py
def deny_ip(self, container: str, ip: str) -> None:
    """Live-deny an IP for a running container via nsenter.

    Removes from the nft allow set (best-effort) and from live.allowed.
    Adds to the nft deny set.  Always persists to deny.list so operator
    deny decisions stick across restarts.
    """
    ip = safe_ip(ip)
    sd = self._config.state_dir.resolve()

    # Best-effort nft delete (IP may not be in the set)
    try:
        self._runner.nft_via_nsenter(
            container,
            "delete",
            "element",
            "inet",
            "terok_shield",
            self._set_for_ip(ip),
            f"{{ {ip} }}",
        )
    except ExecError as e:
        stderr = str(e).lower()
        if not any(
            pat in stderr for pat in ("no such file", "element does not exist", "not in set")
        ):
            logger.warning("nft delete element failed for %s: %s", ip, e)

    # Remove from live.allowed
    live_path = self._live_path()
    if live_path.is_file():
        lines = live_path.read_text().splitlines()
        lines = [line for line in lines if line.strip() != ip]
        live_path.write_text("\n".join(lines) + "\n" if lines else "")

    # Add to nft deny set (prevents dnsmasq from re-allowing)
    nft_cmd = add_deny_elements_dual([ip])
    if nft_cmd:
        self._nft_apply_best_effort(container, nft_cmd)

    # Persist to deny.list so deny sets survive shield_up / restart.
    # Operator deny decisions always stick.
    dp = StateBundle(sd).deny
    if ip not in StateBundle(sd).read_denied_ips():
        with dp.open("a") as f:
            f.write(f"{ip}\n")

shield_down(container, *, allow_all=False)

Switch a running container to shield-down (accept-all + deny.list).

Source code in src/terok_shield/hooks/mode.py
def shield_down(self, container: str, *, allow_all: bool = False) -> None:
    """Switch a running container to shield-down (accept-all + deny.list)."""
    sd = self._config.state_dir.resolve()
    ruleset = self._container_ruleset(container)
    rs = ruleset.build_bypass(allow_all=allow_all)
    current = self.shield_state(container)
    if current == ShieldState.OFFLINE:
        stdin = rs
    else:
        stdin = f"delete table {NFT_TABLE}\n{rs}"
    self._runner.nft_via_nsenter(container, stdin=stdin)

    # Repopulate deny sets so deny.list is enforced even when shield is down.
    denied_ips = list(StateBundle(sd).read_denied_ips())
    if denied_ips:
        deny_cmd = add_deny_elements_dual(denied_ips)
        if deny_cmd:
            self._runner.nft_via_nsenter(container, stdin=deny_cmd)

    output = self._runner.nft_via_nsenter(
        container,
        "list",
        "table",
        "inet",
        NFT_TABLE_NAME,
    )
    errors = ruleset.verify_bypass(output, allow_all=allow_all)
    if errors:
        raise RuntimeError(f"Shield-down ruleset verification failed: {'; '.join(errors)}")

shield_quarantine(container)

Total network blackout — drop all traffic, log dropped traffic.

Reads no settings — no DNS, no allowlists, no loopback ports, no gateway probe, no profile lookup. build_quarantine / verify_quarantine are static; the only inputs are the container name and the live ruleset state (table-or-no-table). Any config-conditional branch added here is a bug.

Source code in src/terok_shield/hooks/mode.py
def shield_quarantine(self, container: str) -> None:
    """Total network blackout — drop all traffic, log dropped traffic.

    Reads no settings — no DNS, no allowlists, no loopback ports,
    no gateway probe, no profile lookup.  ``build_quarantine`` /
    ``verify_quarantine`` are static; the only inputs are the
    container name and the live ruleset state (table-or-no-table).
    Any config-conditional branch added here is a bug.
    """
    rs = RulesetBuilder.build_quarantine()
    current = self.shield_state(container)
    stdin = rs if current == ShieldState.OFFLINE else f"delete table {NFT_TABLE}\n{rs}"
    self._runner.nft_via_nsenter(container, stdin=stdin)
    output = self._runner.nft_via_nsenter(
        container,
        "list",
        "table",
        "inet",
        NFT_TABLE_NAME,
    )
    errors = RulesetBuilder.verify_quarantine(output)
    if errors:
        raise RuntimeError(f"Quarantine ruleset verification failed: {'; '.join(errors)}")

shield_up(container)

Restore normal deny-all mode for a running container.

Source code in src/terok_shield/hooks/mode.py
def shield_up(self, container: str) -> None:
    """Restore normal deny-all mode for a running container."""
    sd = self._config.state_dir.resolve()

    ruleset = self._container_ruleset(container)
    rs = ruleset.build_hook()
    current = self.shield_state(container)
    if current == ShieldState.OFFLINE:
        stdin = rs
    else:
        stdin = f"delete table {NFT_TABLE}\n{rs}"
    self._runner.nft_via_nsenter(container, stdin=stdin)

    # Re-add effective IPs (allowed minus denied)
    unique_ips = StateBundle(sd).read_effective_ips()
    if unique_ips:
        elements_cmd = ruleset.add_elements_dual(unique_ips)
        if elements_cmd:
            self._runner.nft_via_nsenter(container, stdin=elements_cmd)

    # Repopulate deny sets from deny.list
    denied_ips = list(StateBundle(sd).read_denied_ips())
    if denied_ips:
        deny_cmd = add_deny_elements_dual(denied_ips)
        if deny_cmd:
            self._runner.nft_via_nsenter(container, stdin=deny_cmd)

    # Gateway addresses are baked into the ruleset — no repopulation needed.

    output = self._runner.nft_via_nsenter(
        container,
        "list",
        "table",
        "inet",
        NFT_TABLE_NAME,
    )
    errors = ruleset.verify_hook(output)
    if errors:
        raise RuntimeError(f"Ruleset verification failed: {'; '.join(errors)}")

shield_state(container)

Query the live nft ruleset to determine the container's shield state.

Source code in src/terok_shield/hooks/mode.py
def shield_state(self, container: str) -> ShieldState:
    """Query the live nft ruleset to determine the container's shield state."""
    output = self.list_rules(container)
    if not output.strip():
        return ShieldState.OFFLINE

    # verify_* returns a list of errors; empty list = ruleset is valid.
    # Block is checked first: its minimal ruleset (no sets, no DNS)
    # would fail all other verifiers.
    if not self._ruleset.verify_quarantine(output):
        return ShieldState.QUARANTINE

    if not self._ruleset.verify_bypass(output, allow_all=False):
        return ShieldState.DOWN
    if not self._ruleset.verify_bypass(output, allow_all=True):
        return ShieldState.DISENGAGED

    if not self._ruleset.verify_hook(output):
        return ShieldState.UP

    return ShieldState.ERROR

list_rules(container)

List current nft rules for a running container.

Source code in src/terok_shield/hooks/mode.py
def list_rules(self, container: str) -> str:
    """List current nft rules for a running container."""
    try:
        return self._runner.nft_via_nsenter(
            container,
            "list",
            "table",
            "inet",
            "terok_shield",
            check=False,
        )
    except ExecError:
        return ""

preview(*, down=False, allow_all=False)

Generate the ruleset that would be applied to a container.

Source code in src/terok_shield/hooks/mode.py
def preview(self, *, down: bool = False, allow_all: bool = False) -> str:
    """Generate the ruleset that would be applied to a container."""
    if down:
        return self._ruleset.build_bypass(allow_all=allow_all)
    return self._ruleset.build_hook()