Skip to content

mode_hook

mode_hook

Hook mode: OCI hooks + per-container netns.

Uses OCI hooks to apply per-container nftables rules inside each container's network namespace. No root required — only podman and nft. The stdlib-only hook_entrypoint.py applies the pre-generated ruleset at container creation; this module handles DNS pre-resolution, live allow/deny, and shield lifecycle (up/down/state).

HookMode(*, config, runner, audit, dns, profiles, ruleset)

Hook-mode shield backend (Strategy, implements ShieldModeBackend).

Manages the full lifecycle of OCI-hook-based container firewalling: pre-start DNS resolution and hook installation, live allow/deny, bypass (down/up), and ruleset preview.

Create a hook mode backend with all collaborators.

Parameters:

Name Type Description Default
config ShieldConfig

Shield configuration (provides state_dir).

required
runner CommandRunner

Command runner for subprocess calls.

required
audit AuditLogger

Audit logger for event logging.

required
dns DnsResolver

DNS resolver for domain resolution and caching.

required
profiles ProfileLoader

Profile loader for allowlist profiles.

required
ruleset RulesetBuilder

Ruleset builder for nft generation and verification.

required
Source code in src/terok_shield/core/mode_hook.py
def __init__(
    self,
    *,
    config: ShieldConfig,
    runner: "CommandRunner",
    audit: "AuditLogger",
    dns: "DnsResolver",
    profiles: "ProfileLoader",
    ruleset: RulesetBuilder,
) -> None:
    """Create a hook mode backend with all collaborators.

    Args:
        config: Shield configuration (provides state_dir).
        runner: Command runner for subprocess calls.
        audit: Audit logger for event logging.
        dns: DNS resolver for domain resolution and caching.
        profiles: Profile loader for allowlist profiles.
        ruleset: Ruleset builder for nft generation and verification.
    """
    self._config = config
    self._runner = runner
    self._audit = audit
    self._dns = dns
    self._profiles = profiles
    self._ruleset = ruleset
    self._podman_info: PodmanInfo | None = None

pre_start(container, profiles)

Prepare for container start in hook mode.

Installs hooks, composes profiles, resolves DNS, writes allowlist, detects DNS tier, sets annotations, and returns the podman CLI arguments needed for shield protection.

Raises:

Type Description
ShieldNeedsSetup

When global hooks are not installed (see WORKAROUND(hooks-dir-persist)).

Source code in src/terok_shield/core/mode_hook.py
def pre_start(self, container: str, profiles: list[str]) -> list[str]:
    """Prepare for container start in hook mode.

    Installs hooks, composes profiles, resolves DNS, writes
    allowlist, detects DNS tier, sets annotations, and returns
    the podman CLI arguments needed for shield protection.

    Raises:
        ShieldNeedsSetup: When global hooks are not installed
            (see ``WORKAROUND(hooks-dir-persist)``).
    """
    sd = self._config.state_dir.resolve()
    info = self._get_podman_info()
    interactive = self._config.interactive

    # Ensure state dirs and install hooks (idempotent)
    state.ensure_state_dirs(sd)
    install_hooks(
        hook_entrypoint=state.hook_entrypoint(sd),
        hooks_dir=state.hooks_dir(sd),
    )

    # Detect DNS tier and upstream DNS
    tier = self._detect_dns_tier()
    mode = info.network_mode or "pasta"
    upstream_dns = _upstream_dns_for_mode(mode)

    # Resolve DNS, write allowlists, generate ruleset + dnsmasq config
    entries = self._profiles.compose_profiles(profiles)
    self._resolve_and_write_allowlists(sd, tier, entries)
    state.upstream_dns_path(sd).write_text(f"{upstream_dns}\n")
    state.dns_tier_path(sd).write_text(f"{tier.value}\n")
    self._write_ruleset(sd, tier, upstream_dns)
    self._write_dnsmasq_config_or_scrub(sd, tier, upstream_dns)

    # Build podman args
    args = self._build_network_args(mode)

    # Redirect container DNS through per-container dnsmasq via volume mount.
    # See commit history for detailed rationale on why --dns cannot be used.
    if tier == DnsTier.DNSMASQ:
        args += ["--volume", f"{state.resolv_conf_path(sd)}:/etc/resolv.conf:ro,Z"]

    # Annotations: profiles, name, state_dir, loopback_ports, version, dns
    ports_str = ",".join(str(p) for p in self._config.loopback_ports)
    args += [
        "--annotation",
        f"{ANNOTATION_KEY}={','.join(profiles)}",
        "--annotation",
        f"{ANNOTATION_NAME_KEY}={container}",
        "--annotation",
        f"{ANNOTATION_STATE_DIR_KEY}={sd}",
        "--annotation",
        f"{ANNOTATION_LOOPBACK_PORTS_KEY}={ports_str}",
        "--annotation",
        f"{ANNOTATION_VERSION_KEY}={state.BUNDLE_VERSION}",
        "--annotation",
        f"{ANNOTATION_AUDIT_ENABLED_KEY}={str(self._config.audit_enabled).lower()}",
        "--annotation",
        f"{ANNOTATION_UPSTREAM_DNS_KEY}={upstream_dns}",
        "--annotation",
        f"{ANNOTATION_DNS_TIER_KEY}={tier.value}",
        "--annotation",
        f"{ANNOTATION_INTERACTIVE_KEY}={str(interactive).lower()}",
    ]

    # WORKAROUND(hooks-dir-persist): currently always takes the global path
    if info.hooks_dir_persists:
        args += ["--hooks-dir", str(state.hooks_dir(sd))]
    elif has_global_hooks():
        self._audit.log_event(
            container,
            "setup",
            detail=(
                f"podman {'.'.join(str(v) for v in info.version)}: "
                "using global hooks dir (--hooks-dir does not persist on restart)"
            ),
        )
    else:
        raise ShieldNeedsSetup(
            f"Podman {'.'.join(str(v) for v in info.version)} detected.\n\n"
            + global_hooks_hint()
        )

    args += [
        "--cap-drop",
        "NET_ADMIN",
        "--cap-drop",
        "NET_RAW",
    ]
    return args

allow_domain(domain)

Add a domain to the dnsmasq config and signal reload.

Delegates to dnsmasq.add_domain(), which persists the domain to live.domains (not profile.domains) and removes any matching entry from denied.domains. When dnsmasq is running, a SIGHUP is sent so the change takes effect immediately without a container restart. These entries are runtime additions: they survive dnsmasq reloads but are separate from the pre-start profile.domains list.

The IP-level allow (nft set update) is handled separately by allow_ip() — this method is the domain-tracking counterpart that ensures future IP rotations are also captured.

No-op when the container is not using the dnsmasq DNS tier (the static IP-level allow already happened via allow_ip()).

Source code in src/terok_shield/core/mode_hook.py
def allow_domain(self, domain: str) -> None:
    """Add a domain to the dnsmasq config and signal reload.

    Delegates to ``dnsmasq.add_domain()``, which persists the domain to
    ``live.domains`` (not ``profile.domains``) and removes any matching
    entry from ``denied.domains``.  When dnsmasq is running, a SIGHUP is
    sent so the change takes effect immediately without a container restart.
    These entries are runtime additions: they survive dnsmasq reloads but
    are separate from the pre-start ``profile.domains`` list.

    The IP-level allow (nft set update) is handled separately by
    ``allow_ip()`` — this method is the domain-tracking counterpart
    that ensures future IP rotations are also captured.

    No-op when the container is not using the dnsmasq DNS tier (the
    static IP-level allow already happened via ``allow_ip()``).
    """
    sd = self._config.state_dir.resolve()
    if not _is_dnsmasq_tier(sd):
        return
    if not dnsmasq.add_domain(sd, domain):
        return  # already present
    self._reload_dnsmasq(sd)

deny_domain(domain)

Remove a domain from the dnsmasq config and signal reload.

Counterpart of allow_domain(). Removes the domain so dnsmasq stops auto-populating nft sets for it on future DNS queries.

No-op when the container is not using the dnsmasq DNS tier.

Source code in src/terok_shield/core/mode_hook.py
def deny_domain(self, domain: str) -> None:
    """Remove a domain from the dnsmasq config and signal reload.

    Counterpart of ``allow_domain()``.  Removes the domain so dnsmasq
    stops auto-populating nft sets for it on future DNS queries.

    No-op when the container is not using the dnsmasq DNS tier.
    """
    sd = self._config.state_dir.resolve()
    if not _is_dnsmasq_tier(sd):
        return
    if not dnsmasq.remove_domain(sd, domain):
        return  # not present
    self._reload_dnsmasq(sd)

allow_ip(container, ip)

Live-allow an IP for a running container via nsenter.

Source code in src/terok_shield/core/mode_hook.py
def allow_ip(self, container: str, ip: str) -> None:
    """Live-allow an IP for a running container via nsenter."""
    ip = safe_ip(ip)

    # Un-deny: remove from deny.list and nft deny set if present
    sd = self._config.state_dir.resolve()
    dp = state.deny_path(sd)
    if dp.is_file():
        denied = state.read_denied_ips(sd)
        if ip in denied:
            denied.discard(ip)
            dp.write_text("".join(f"{d}\n" for d in sorted(denied)))
            nft_cmd = delete_deny_elements_dual([ip])
            if nft_cmd:
                self._nft_apply_best_effort(container, nft_cmd)

    # When the dnsmasq set has a default timeout (30 m), permanent IPs must use
    # 'timeout 0s' so they are never evicted by the set's per-element expiry clock.
    tier_path = state.dns_tier_path(sd)
    if tier_path.is_file() and tier_path.read_text().strip() == DnsTier.DNSMASQ.value:
        element = f"{{ {ip} timeout 0s }}"
    else:
        element = f"{{ {ip} }}"

    self._runner.nft_via_nsenter(
        container,
        "add",
        "element",
        "inet",
        "terok_shield",
        self._set_for_ip(ip),
        element,
    )
    # Persist to live.allowed (skip if already present)
    live_path = self._live_path()
    live_path.parent.mkdir(parents=True, exist_ok=True)
    existing = set(live_path.read_text().splitlines()) if live_path.is_file() else set()
    if ip not in existing:
        with live_path.open("a") as f:
            f.write(f"{ip}\n")

deny_ip(container, ip)

Live-deny an IP for a running container via nsenter.

Removes from the nft allow set (best-effort) and from live.allowed. Adds to the nft deny set. Persists to deny.list when the IP would otherwise be re-allowed (profile IP or interactive mode).

Source code in src/terok_shield/core/mode_hook.py
def deny_ip(self, container: str, ip: str) -> None:
    """Live-deny an IP for a running container via nsenter.

    Removes from the nft allow set (best-effort) and from live.allowed.
    Adds to the nft deny set.  Persists to deny.list when the IP would
    otherwise be re-allowed (profile IP or interactive mode).
    """
    ip = safe_ip(ip)
    sd = self._config.state_dir.resolve()

    # Best-effort nft delete (IP may not be in the set)
    try:
        self._runner.nft_via_nsenter(
            container,
            "delete",
            "element",
            "inet",
            "terok_shield",
            self._set_for_ip(ip),
            f"{{ {ip} }}",
        )
    except ExecError as e:
        stderr = str(e).lower()
        if not any(
            pat in stderr for pat in ("no such file", "element does not exist", "not in set")
        ):
            logger.warning("nft delete element failed for %s: %s", ip, e)

    # Remove from live.allowed
    live_path = self._live_path()
    if live_path.is_file():
        lines = live_path.read_text().splitlines()
        lines = [line for line in lines if line.strip() != ip]
        live_path.write_text("\n".join(lines) + "\n" if lines else "")

    # Add to nft deny set (prevents dnsmasq from re-allowing)
    nft_cmd = add_deny_elements_dual([ip])
    if nft_cmd:
        self._nft_apply_best_effort(container, nft_cmd)

    # Persist to deny.list so deny sets survive shield_up / restart.
    # In interactive mode: always persist (operator rejects must stick).
    # In strict mode: only persist when the IP came from a profile.
    should_persist = state.interactive_path(sd).is_file()
    if not should_persist:
        profile_path = state.profile_allowed_path(sd)
        should_persist = profile_path.is_file() and ip in {
            ln.strip() for ln in profile_path.read_text().splitlines() if ln.strip()
        }
    if should_persist:
        dp = state.deny_path(sd)
        if ip not in state.read_denied_ips(sd):
            with dp.open("a") as f:
                f.write(f"{ip}\n")

shield_down(container, *, allow_all=False)

Switch a running container to bypass mode.

Source code in src/terok_shield/core/mode_hook.py
def shield_down(self, container: str, *, allow_all: bool = False) -> None:
    """Switch a running container to bypass mode."""
    ruleset = self._container_ruleset(container)
    rs = ruleset.build_bypass(allow_all=allow_all)
    current = self.shield_state(container)
    if current == ShieldState.INACTIVE:
        stdin = rs
    else:
        stdin = f"delete table {NFT_TABLE}\n{rs}"
    self._runner.nft_via_nsenter(container, stdin=stdin)
    output = self._runner.nft_via_nsenter(container, "list", "ruleset")
    errors = ruleset.verify_bypass(output, allow_all=allow_all)
    if errors:
        raise RuntimeError(f"Bypass ruleset verification failed: {'; '.join(errors)}")

shield_up(container)

Restore normal deny-all mode for a running container.

Source code in src/terok_shield/core/mode_hook.py
def shield_up(self, container: str) -> None:
    """Restore normal deny-all mode for a running container."""
    sd = self._config.state_dir.resolve()
    interactive = state.interactive_path(sd).is_file()

    ruleset = self._container_ruleset(container)
    rs = ruleset.build_hook(interactive=interactive)
    current = self.shield_state(container)
    if current == ShieldState.INACTIVE:
        stdin = rs
    else:
        stdin = f"delete table {NFT_TABLE}\n{rs}"
    self._runner.nft_via_nsenter(container, stdin=stdin)

    # Re-add effective IPs (allowed minus denied)
    unique_ips = state.read_effective_ips(sd)
    if unique_ips:
        elements_cmd = ruleset.add_elements_dual(unique_ips)
        if elements_cmd:
            self._runner.nft_via_nsenter(container, stdin=elements_cmd)

    # Repopulate deny sets from deny.list
    denied_ips = list(state.read_denied_ips(sd))
    if denied_ips:
        deny_cmd = add_deny_elements_dual(denied_ips)
        if deny_cmd:
            self._runner.nft_via_nsenter(container, stdin=deny_cmd)

    # Repopulate gateway sets from persisted discovery (hook wrote them at container start)
    for gw_path, set_name in (
        (state.gateway_path(sd), "gateway_v4"),
        (state.gateway_v6_path(sd), "gateway_v6"),
    ):
        if gw_path.is_file():
            gw = gw_path.read_text().strip()
            if gw:
                self._runner.nft_via_nsenter(
                    container,
                    "add",
                    "element",
                    "inet",
                    "terok_shield",
                    set_name,
                    f"{{ {gw} }}",
                )

    output = self._runner.nft_via_nsenter(container, "list", "ruleset")
    errors = ruleset.verify_hook(output, interactive=interactive)
    if errors:
        raise RuntimeError(f"Ruleset verification failed: {'; '.join(errors)}")

shield_state(container)

Query the live nft ruleset to determine the container's shield state.

Source code in src/terok_shield/core/mode_hook.py
def shield_state(self, container: str) -> ShieldState:
    """Query the live nft ruleset to determine the container's shield state."""
    output = self.list_rules(container)
    if not output.strip():
        return ShieldState.INACTIVE

    # verify_* returns a list of errors; empty list = ruleset is valid
    if not self._ruleset.verify_bypass(output, allow_all=False):
        return ShieldState.DOWN
    if not self._ruleset.verify_bypass(output, allow_all=True):
        return ShieldState.DOWN_ALL

    # Check both strict and interactive hook rulesets
    if not self._ruleset.verify_hook(output):
        return ShieldState.UP
    if not self._ruleset.verify_hook(output, interactive=True):
        return ShieldState.UP

    return ShieldState.ERROR

list_rules(container)

List current nft rules for a running container.

Source code in src/terok_shield/core/mode_hook.py
def list_rules(self, container: str) -> str:
    """List current nft rules for a running container."""
    try:
        return self._runner.nft_via_nsenter(
            container,
            "list",
            "table",
            "inet",
            "terok_shield",
            check=False,
        )
    except ExecError:
        return ""

preview(*, down=False, allow_all=False)

Generate the ruleset that would be applied to a container.

Source code in src/terok_shield/core/mode_hook.py
def preview(self, *, down: bool = False, allow_all: bool = False) -> str:
    """Generate the ruleset that would be applied to a container."""
    if down:
        return self._ruleset.build_bypass(allow_all=allow_all)
    return self._ruleset.build_hook(interactive=self._config.interactive)