Skip to content

dnsmasq

dnsmasq

Per-container dnsmasq lifecycle: config generation, launch, and cleanup.

dnsmasq runs inside the container's network namespace (via nsenter), listens on 127.0.0.1:53, and uses --nftset to auto-populate nft allow sets on every DNS resolution. This handles IP rotation that static pre-start resolution cannot.

This module is the single owner of dnsmasq config format and CLI args.

has_nftset_support(runner)

Return True if the installed dnsmasq supports --nftset.

Parses dnsmasq --version compile-time options for the nftset feature flag. Returns False if dnsmasq is not installed or its output contains no-nftset (explicitly disabled).

Source code in src/terok_shield/core/dnsmasq.py
def has_nftset_support(runner: CommandRunner) -> bool:
    """Return True if the installed dnsmasq supports ``--nftset``.

    Parses ``dnsmasq --version`` compile-time options for the ``nftset``
    feature flag.  Returns False if dnsmasq is not installed or its
    output contains ``no-nftset`` (explicitly disabled).
    """
    out = runner.run(["dnsmasq", "--version"], check=False)
    return bool(re.search(r"\bnftset\b", out)) and not bool(re.search(r"\bno-nftset\b", out))

nftset_entry(domain)

Generate a dnsmasq nftset config line for a domain.

Maps A records to the IPv4 allow set and AAAA records to the IPv6 allow set. dnsmasq automatically matches the domain and all its subdomains.

Example::

nftset=/github.com/4#inet#terok_shield#allow_v4,6#inet#terok_shield#allow_v6
Source code in src/terok_shield/core/dnsmasq.py
def nftset_entry(domain: str) -> str:
    """Generate a dnsmasq ``nftset`` config line for a domain.

    Maps A records to the IPv4 allow set and AAAA records to the IPv6
    allow set.  dnsmasq automatically matches the domain and all its
    subdomains.

    Example::

        nftset=/github.com/4#inet#terok_shield#allow_v4,6#inet#terok_shield#allow_v6
    """
    domain = _validate_domain(domain)
    # Strip leading wildcard — dnsmasq nftset inherently matches subdomains.
    if domain.startswith("*."):
        domain = domain[2:]
    return f"nftset=/{domain}/4#inet#{NFT_TABLE_NAME}#allow_v4,6#inet#{NFT_TABLE_NAME}#allow_v6"

generate_config(upstream_dns, domains, pid_path, *, log_path=None)

Generate a complete dnsmasq configuration.

Parameters:

Name Type Description Default
upstream_dns str

Upstream DNS forwarder (pasta or slirp4netns address).

required
domains list[str]

Domain names for --nftset auto-population.

required
pid_path Path

Path for the dnsmasq PID file.

required
log_path Path | None

If set, enable query logging to this file (for shield watch).

None

Raises:

Type Description
ValueError

If upstream_dns is not a valid IP address.

Source code in src/terok_shield/core/dnsmasq.py
def generate_config(
    upstream_dns: str,
    domains: list[str],
    pid_path: Path,
    *,
    log_path: Path | None = None,
) -> str:
    """Generate a complete dnsmasq configuration.

    Args:
        upstream_dns: Upstream DNS forwarder (pasta or slirp4netns address).
        domains: Domain names for ``--nftset`` auto-population.
        pid_path: Path for the dnsmasq PID file.
        log_path: If set, enable query logging to this file (for ``shield watch``).

    Raises:
        ValueError: If *upstream_dns* is not a valid IP address.
    """
    ipaddress.ip_address(upstream_dns)
    lines = [
        f"# Generated by terok-shield (pid {os.getpid()})",
        f"listen-address={DNSMASQ_BIND}",
        "port=53",
        "bind-interfaces",
        "no-resolv",
        "no-hosts",
        f"server={upstream_dns}",
        f"pid-file={pid_path}",
    ]
    if log_path is not None:
        lines += ["log-queries", f"log-facility={log_path}"]
    for domain in domains:
        try:
            lines.append(nftset_entry(domain))
        except ValueError:
            logger.warning("generate_config: skipping invalid domain entry")
            continue
    return "\n".join(lines) + "\n"

launch(runner, pid, state_dir, upstream_dns, domains)

Generate config and launch dnsmasq in the container's network namespace.

dnsmasq daemonizes and writes its PID to the state directory. If dnsmasq lacks --nftset support, it will fail immediately with a clear "bad command line options" error (fail-closed).

Parameters:

Name Type Description Default
runner CommandRunner

Command runner for subprocess calls.

required
pid str

Container PID (for nsenter).

required
state_dir Path

Per-container state directory.

required
upstream_dns str

Upstream DNS forwarder address.

required
domains list[str]

Domain names for nftset auto-population.

required

Raises:

Type Description
RuntimeError

If dnsmasq fails to start or PID file is not written.

Source code in src/terok_shield/core/dnsmasq.py
def launch(
    runner: CommandRunner,
    pid: str,
    state_dir: Path,
    upstream_dns: str,
    domains: list[str],
) -> None:
    """Generate config and launch dnsmasq in the container's network namespace.

    dnsmasq daemonizes and writes its PID to the state directory.
    If dnsmasq lacks ``--nftset`` support, it will fail immediately with
    a clear "bad command line options" error (fail-closed).

    Args:
        runner: Command runner for subprocess calls.
        pid: Container PID (for nsenter).
        state_dir: Per-container state directory.
        upstream_dns: Upstream DNS forwarder address.
        domains: Domain names for nftset auto-population.

    Raises:
        RuntimeError: If dnsmasq fails to start or PID file is not written.
    """
    conf_path = state.dnsmasq_conf_path(state_dir)
    pid_path = state.dnsmasq_pid_path(state_dir)

    # Remove any PID file left by a previous run so the post-launch
    # existence check is not fooled by a stale file from a reused state dir.
    _clear_pid_file(state_dir)

    config = generate_config(upstream_dns, domains, pid_path)
    conf_path.write_text(config)

    # Launch dnsmasq inside the container's network namespace.
    # --keep-in-foreground is NOT used: dnsmasq daemonizes and writes PID.
    cmd = [
        "nsenter",
        "-t",
        pid,
        "-n",
        "--",
        "dnsmasq",
        f"--conf-file={conf_path}",
    ]
    try:
        runner.run(cmd, check=True)
    except ExecError as e:
        raise RuntimeError(f"dnsmasq failed to start: {e}") from e

    if not pid_path.is_file():
        raise RuntimeError(
            f"dnsmasq started but PID file not written at {pid_path}. "
            "The container's DNS may not be functional."
        )

kill(state_dir)

Kill dnsmasq for a container (best-effort cleanup).

Reads the PID from the state directory and sends SIGTERM. Silently ignores missing PID files, stale PIDs, and permission errors. Verifies PID identity before signaling to avoid killing unrelated processes.

Needed because dnsmasq runs in the host PID namespace (we only nsenter -n into the network namespace). When the container stops, podman kills container-PID-namespace processes, but dnsmasq is NOT among them — it becomes an orphan with a broken network socket.

Source code in src/terok_shield/core/dnsmasq.py
def kill(state_dir: Path) -> None:
    """Kill dnsmasq for a container (best-effort cleanup).

    Reads the PID from the state directory and sends SIGTERM.
    Silently ignores missing PID files, stale PIDs, and permission errors.
    Verifies PID identity before signaling to avoid killing unrelated
    processes.

    Needed because dnsmasq runs in the host PID namespace (we only
    ``nsenter -n`` into the network namespace).  When the container stops,
    podman kills container-PID-namespace processes, but dnsmasq is NOT
    among them — it becomes an orphan with a broken network socket.
    """
    pid_int = _read_pid(state_dir)
    if pid_int is None:
        return
    if not _is_our_dnsmasq(pid_int, state_dir):
        _clear_pid_file(state_dir)
        return
    try:
        os.kill(pid_int, signal.SIGTERM)
    except (ProcessLookupError, PermissionError):
        pass

reload(state_dir, upstream_dns, domains)

Regenerate dnsmasq config and signal the daemon to reload.

Sends SIGHUP to the running dnsmasq, which re-reads its config file. No-op if dnsmasq is not running (PID file absent).

Parameters:

Name Type Description Default
state_dir Path

Per-container state directory.

required
upstream_dns str

Upstream DNS forwarder address.

required
domains list[str]

Updated domain names for nftset auto-population.

required

Raises:

Type Description
RuntimeError

If dnsmasq PID exists but the process is gone (stale PID). The caller should log this — it means the container's DNS is broken.

Source code in src/terok_shield/core/dnsmasq.py
def reload(state_dir: Path, upstream_dns: str, domains: list[str]) -> None:
    """Regenerate dnsmasq config and signal the daemon to reload.

    Sends SIGHUP to the running dnsmasq, which re-reads its config file.
    No-op if dnsmasq is not running (PID file absent).

    Args:
        state_dir: Per-container state directory.
        upstream_dns: Upstream DNS forwarder address.
        domains: Updated domain names for nftset auto-population.

    Raises:
        RuntimeError: If dnsmasq PID exists but the process is gone
            (stale PID).  The caller should log this — it means the
            container's DNS is broken.
    """
    pid_int = _read_pid(state_dir)
    if pid_int is None:
        return

    if not _is_our_dnsmasq(pid_int, state_dir):
        _clear_pid_file(state_dir)
        raise RuntimeError(
            f"PID {pid_int} is not dnsmasq (stale PID file) — container DNS is broken. "
            "Restart the container to recover."
        )

    # Regenerate config, then signal dnsmasq to re-read it.
    # Preserve log-queries/log-facility if pre_start enabled them.
    pid_path = state.dnsmasq_pid_path(state_dir)
    conf_path = state.dnsmasq_conf_path(state_dir)
    old_conf = conf_path.read_text() if conf_path.is_file() else ""
    log_path = state.dnsmasq_log_path(state_dir) if "log-queries" in old_conf else None
    conf_path.write_text(generate_config(upstream_dns, domains, pid_path, log_path=log_path))
    try:
        os.kill(pid_int, signal.SIGHUP)
    except ProcessLookupError as e:
        raise RuntimeError(
            f"dnsmasq (pid {pid_int}) is dead — container DNS is broken. "
            "Restart the container to recover."
        ) from e

add_domain(state_dir, domain)

Append a domain to the live.domains file.

Writes to live.domains (not profile.domains) so that runtime additions survive container restarts without overwriting the profile-derived domain list.

Returns True if the domain was added, False if already present in the merged domain set (profile + live - denied).

Source code in src/terok_shield/core/dnsmasq.py
def add_domain(state_dir: Path, domain: str) -> bool:
    """Append a domain to the live.domains file.

    Writes to ``live.domains`` (not ``profile.domains``) so that
    runtime additions survive container restarts without overwriting
    the profile-derived domain list.

    Returns True if the domain was added, False if already present
    in the merged domain set (profile + live - denied).
    """
    domain = _validate_domain(domain)
    existing = read_merged_domains(state_dir)
    if domain in existing:
        return False

    # Remove from denied.domains if present (un-deny)
    denied_path = state.denied_domains_path(state_dir)
    if denied_path.is_file():
        denied = read_domains(denied_path)
        if domain in denied:
            denied.remove(domain)
            denied_path.write_text("\n".join(denied) + "\n" if denied else "")

    live_path = state.live_domains_path(state_dir)
    with live_path.open("a") as f:
        f.write(f"{domain}\n")
    return True

remove_domain(state_dir, domain)

Remove a domain by adding it to the denied.domains file.

Writes to denied.domains so the denial persists across dnsmasq reloads. Also removes from live.domains if present.

Returns True if the domain was removed, False if not found in the merged domain set.

Source code in src/terok_shield/core/dnsmasq.py
def remove_domain(state_dir: Path, domain: str) -> bool:
    """Remove a domain by adding it to the denied.domains file.

    Writes to ``denied.domains`` so the denial persists across
    dnsmasq reloads.  Also removes from ``live.domains`` if present.

    Returns True if the domain was removed, False if not found
    in the merged domain set.
    """
    domain = _validate_domain(domain)
    existing = read_merged_domains(state_dir)
    if domain not in existing:
        return False

    # Remove from live.domains if present
    live_path = state.live_domains_path(state_dir)
    if live_path.is_file():
        live = read_domains(live_path)
        if domain in live:
            live.remove(domain)
            live_path.write_text("\n".join(live) + "\n" if live else "")

    # Add to denied.domains
    denied_path = state.denied_domains_path(state_dir)
    denied = read_domains(denied_path)
    if domain not in denied:
        with denied_path.open("a") as f:
            f.write(f"{domain}\n")

    return True

read_domains(domains_path)

Read and normalize domain names from a domains file.

Validates and lowercases each entry so comparisons with add_domain()/remove_domain() are consistent. Invalid entries are silently skipped.

Source code in src/terok_shield/core/dnsmasq.py
def read_domains(domains_path: Path) -> list[str]:
    """Read and normalize domain names from a domains file.

    Validates and lowercases each entry so comparisons with
    ``add_domain()``/``remove_domain()`` are consistent.
    Invalid entries are silently skipped.
    """
    if not domains_path.is_file():
        return []
    domains: list[str] = []
    for line in domains_path.read_text().splitlines():
        if not line.strip():
            continue
        try:
            domains.append(_validate_domain(line))
        except ValueError:
            logger.warning("read_domains: skipping invalid entry in %s", domains_path)
            continue
    return list(dict.fromkeys(domains))

read_merged_domains(state_dir)

Compute effective domains: (profile + live) - denied.

Returns a deduplicated, stable-order list.

Source code in src/terok_shield/core/dnsmasq.py
def read_merged_domains(state_dir: Path) -> list[str]:
    """Compute effective domains: (profile + live) - denied.

    Returns a deduplicated, stable-order list.
    """
    profile = read_domains(state.profile_domains_path(state_dir))
    live = read_domains(state.live_domains_path(state_dir))
    denied = set(read_domains(state.denied_domains_path(state_dir)))

    merged: list[str] = []
    seen: set[str] = set()
    for d in profile + live:
        if d not in seen and d not in denied:
            seen.add(d)
            merged.append(d)
    return merged

write_resolv_conf(pid, nameserver=DNSMASQ_BIND)

Overwrite the container's resolv.conf to point to dnsmasq.

Safety measure backing up the --dns podman flag. Writes directly to /proc/{pid}/root/etc/resolv.conf from the host side.

Raises:

Type Description
ValueError

If pid is not a numeric string or nameserver is not a valid IPv4 or IPv6 address.

Source code in src/terok_shield/core/dnsmasq.py
def write_resolv_conf(pid: str, nameserver: str = DNSMASQ_BIND) -> None:
    """Overwrite the container's resolv.conf to point to dnsmasq.

    Safety measure backing up the ``--dns`` podman flag.  Writes directly
    to ``/proc/{pid}/root/etc/resolv.conf`` from the host side.

    Raises:
        ValueError: If *pid* is not a numeric string or *nameserver* is not
            a valid IPv4 or IPv6 address.
    """
    if not pid.isdigit():
        raise ValueError(f"pid must be numeric, got {pid!r}")
    try:
        ipaddress.ip_address(nameserver)
    except ValueError:
        raise ValueError(f"nameserver must be a valid IP address, got {nameserver!r}") from None
    resolv_path = Path(f"/proc/{pid}/root/etc/resolv.conf")
    resolv_path.write_text(f"nameserver {nameserver}\n")