Skip to content

dnsmasq

dnsmasq

Per-container dnsmasq config generation, reload, and domain management.

dnsmasq runs inside the container's network namespace (via nsenter) on a runtime-dependent listen address — 127.0.0.1:53 for ordinary runtimes that share the netns loopback, a link-local address under krun whose guest can't reach netns 127.0.0.1. --nftset auto-populates nft allow sets on every DNS resolution to handle IP rotation that static pre-start resolution cannot.

This module is the single package-side owner of dnsmasq config format and CLI args; the per-container start/stop dance is owned by the OCI hook resource (resources/nft_hook.py), which has its own stdlib- only copy because hook scripts run outside the package venv.

logger = logging.getLogger(__name__) module-attribute

reload(state_dir, upstream_dns, domains)

Regenerate dnsmasq config and signal the daemon to reload.

Sends SIGHUP to the running dnsmasq, which re-reads its config file. No-op if dnsmasq is not running (PID file absent).

Parameters:

Name Type Description Default
state_dir Path

Per-container state directory.

required
upstream_dns str

Upstream DNS forwarder address.

required
domains list[str]

Updated domain names for nftset auto-population.

required

Raises:

Type Description
RuntimeError

If dnsmasq PID exists but the process is gone (stale PID). The caller should log this — it means the container's DNS is broken.

Source code in src/terok_shield/dns/dnsmasq.py
def reload(state_dir: Path, upstream_dns: str, domains: list[str]) -> None:
    """Regenerate dnsmasq config and signal the daemon to reload.

    Sends SIGHUP to the running dnsmasq, which re-reads its config file.
    No-op if dnsmasq is not running (PID file absent).

    Args:
        state_dir: Per-container state directory.
        upstream_dns: Upstream DNS forwarder address.
        domains: Updated domain names for nftset auto-population.

    Raises:
        RuntimeError: If dnsmasq PID exists but the process is gone
            (stale PID).  The caller should log this — it means the
            container's DNS is broken.
    """
    pid_int = _read_pid(state_dir)
    if pid_int is None:
        return

    if not _is_our_dnsmasq(pid_int, state_dir):
        _clear_pid_file(state_dir)
        raise RuntimeError(
            f"PID {pid_int} is not dnsmasq (stale PID file) — container DNS is broken. "
            "Restart the container to recover."
        )

    # Regenerate config, then signal dnsmasq to re-read it.  Preserve
    # log-queries / log-facility and the listen address so a live
    # reload never rebinds dnsmasq onto a different interface.
    pid_path = StateBundle(state_dir).dnsmasq_pid
    conf_path = StateBundle(state_dir).dnsmasq_conf
    old_conf = conf_path.read_text() if conf_path.is_file() else ""
    log_path = StateBundle(state_dir).dnsmasq_log if "log-queries" in old_conf else None
    listen_address = _extract_listen_address(old_conf) or DNSMASQ_BIND_DEFAULT
    conf_path.write_text(
        generate_config(
            upstream_dns,
            domains,
            pid_path,
            listen_address=listen_address,
            log_path=log_path,
        )
    )
    try:
        os.kill(pid_int, signal.SIGHUP)
    except ProcessLookupError as e:
        raise RuntimeError(
            f"dnsmasq (pid {pid_int}) is dead — container DNS is broken. "
            "Restart the container to recover."
        ) from e

add_domain(state_dir, domain)

Append a domain to the live.domains file.

Writes to live.domains (not profile.domains) so that runtime additions survive container restarts without overwriting the profile-derived domain list.

Returns True if the domain was added, False if already present in the merged domain set (profile + live - denied).

Source code in src/terok_shield/dns/dnsmasq.py
def add_domain(state_dir: Path, domain: str) -> bool:
    """Append a domain to the live.domains file.

    Writes to ``live.domains`` (not ``profile.domains``) so that
    runtime additions survive container restarts without overwriting
    the profile-derived domain list.

    Returns True if the domain was added, False if already present
    in the merged domain set (profile + live - denied).
    """
    domain = _validate_domain(domain)
    existing = read_merged_domains(state_dir)
    if domain in existing:
        return False

    # Remove from denied.domains if present (un-deny)
    denied_path = StateBundle(state_dir).denied_domains
    if denied_path.is_file():
        denied = read_domains(denied_path)
        if domain in denied:
            denied.remove(domain)
            denied_path.write_text("\n".join(denied) + "\n" if denied else "")

    live_path = StateBundle(state_dir).live_domains
    with live_path.open("a") as f:
        f.write(f"{domain}\n")
    return True

remove_domain(state_dir, domain)

Remove a domain by adding it to the denied.domains file.

Writes to denied.domains so the denial persists across dnsmasq reloads. Also removes from live.domains if present.

Returns True if the domain was removed, False if not found in the merged domain set.

Source code in src/terok_shield/dns/dnsmasq.py
def remove_domain(state_dir: Path, domain: str) -> bool:
    """Remove a domain by adding it to the denied.domains file.

    Writes to ``denied.domains`` so the denial persists across
    dnsmasq reloads.  Also removes from ``live.domains`` if present.

    Returns True if the domain was removed, False if not found
    in the merged domain set.
    """
    domain = _validate_domain(domain)
    existing = read_merged_domains(state_dir)
    if domain not in existing:
        return False

    # Remove from live.domains if present
    live_path = StateBundle(state_dir).live_domains
    if live_path.is_file():
        live = read_domains(live_path)
        if domain in live:
            live.remove(domain)
            live_path.write_text("\n".join(live) + "\n" if live else "")

    # Add to denied.domains
    denied_path = StateBundle(state_dir).denied_domains
    denied = read_domains(denied_path)
    if domain not in denied:
        with denied_path.open("a") as f:
            f.write(f"{domain}\n")

    return True

read_domains(domains_path)

Read and normalize domain names from a domains file.

Validates and lowercases each entry so comparisons with add_domain()/remove_domain() are consistent. Invalid entries are silently skipped.

Source code in src/terok_shield/dns/dnsmasq.py
def read_domains(domains_path: Path) -> list[str]:
    """Read and normalize domain names from a domains file.

    Validates and lowercases each entry so comparisons with
    ``add_domain()``/``remove_domain()`` are consistent.
    Invalid entries are silently skipped.
    """
    if not domains_path.is_file():
        return []
    domains: list[str] = []
    for line in domains_path.read_text().splitlines():
        if not line.strip():
            continue
        try:
            domains.append(_validate_domain(line))
        except ValueError:
            logger.warning("read_domains: skipping invalid entry in %s", domains_path)
            continue
    return list(dict.fromkeys(domains))

read_merged_domains(state_dir)

Compute effective domains: (profile + live) - denied.

Returns a deduplicated, stable-order list.

Source code in src/terok_shield/dns/dnsmasq.py
def read_merged_domains(state_dir: Path) -> list[str]:
    """Compute effective domains: (profile + live) - denied.

    Returns a deduplicated, stable-order list.
    """
    profile = read_domains(StateBundle(state_dir).profile_domains)
    live = read_domains(StateBundle(state_dir).live_domains)
    denied = set(read_domains(StateBundle(state_dir).denied_domains))

    merged: list[str] = []
    seen: set[str] = set()
    for d in profile + live:
        if d not in seen and d not in denied:
            seen.add(d)
            merged.append(d)
    return merged

generate_config(upstream_dns, domains, pid_path, *, listen_address, log_path=None)

Generate a complete dnsmasq configuration.

Parameters:

Name Type Description Default
upstream_dns str

Upstream DNS forwarder (pasta or slirp4netns address).

required
domains list[str]

Domain names for --nftset auto-population.

required
pid_path Path

Path for the dnsmasq PID file.

required
listen_address str

Address dnsmasq binds to inside the netns. See DNSMASQ_BIND_DEFAULT / DNSMASQ_BIND_KRUN.

required
log_path Path | None

If set, enable query logging to this file (for shield watch).

None

Raises:

Type Description
ValueError

If upstream_dns or listen_address is not a valid IP address.

Source code in src/terok_shield/dns/dnsmasq.py
def generate_config(
    upstream_dns: str,
    domains: list[str],
    pid_path: Path,
    *,
    listen_address: str,
    log_path: Path | None = None,
) -> str:
    """Generate a complete dnsmasq configuration.

    Args:
        upstream_dns: Upstream DNS forwarder (pasta or slirp4netns address).
        domains: Domain names for ``--nftset`` auto-population.
        pid_path: Path for the dnsmasq PID file.
        listen_address: Address dnsmasq binds to inside the netns.  See
            [`DNSMASQ_BIND_DEFAULT`][terok_shield.nft.constants.DNSMASQ_BIND_DEFAULT]
            /
            [`DNSMASQ_BIND_KRUN`][terok_shield.nft.constants.DNSMASQ_BIND_KRUN].
        log_path: If set, enable query logging to this file (for ``shield watch``).

    Raises:
        ValueError: If *upstream_dns* or *listen_address* is not a valid IP address.
    """
    ipaddress.ip_address(upstream_dns)
    ipaddress.ip_address(listen_address)
    lines = [
        f"# Generated by terok-shield (pid {os.getpid()})",
        f"listen-address={listen_address}",
        "port=53",
        "bind-interfaces",
        "no-resolv",
        "no-hosts",
        f"server={upstream_dns}",
        f"pid-file={pid_path}",
    ]
    if log_path is not None:
        lines += ["log-queries", f"log-facility={log_path}"]
    for domain in domains:
        try:
            lines.append(nftset_entry(domain))
        except ValueError:
            logger.warning("generate_config: skipping invalid domain entry")
            continue
    return "\n".join(lines) + "\n"

nftset_entry(domain)

Generate a dnsmasq nftset config line for a domain.

Maps A records to the IPv4 allow set and AAAA records to the IPv6 allow set. dnsmasq automatically matches the domain and all its subdomains.

Example::

nftset=/github.com/4#inet#terok_shield#allow_v4,6#inet#terok_shield#allow_v6
Source code in src/terok_shield/dns/dnsmasq.py
def nftset_entry(domain: str) -> str:
    """Generate a dnsmasq ``nftset`` config line for a domain.

    Maps A records to the IPv4 allow set and AAAA records to the IPv6
    allow set.  dnsmasq automatically matches the domain and all its
    subdomains.

    Example::

        nftset=/github.com/4#inet#terok_shield#allow_v4,6#inet#terok_shield#allow_v6
    """
    domain = _validate_domain(domain)
    # Strip leading wildcard — dnsmasq nftset inherently matches subdomains.
    if domain.startswith("*."):
        domain = domain[2:]
    return f"nftset=/{domain}/4#inet#{NFT_TABLE_NAME}#allow_v4,6#inet#{NFT_TABLE_NAME}#allow_v6"

has_nftset_support(runner)

Return True if the installed dnsmasq supports --nftset.

Parses dnsmasq --version compile-time options for the nftset feature flag. Returns False if dnsmasq is not installed or its output contains no-nftset (explicitly disabled).

Source code in src/terok_shield/dns/dnsmasq.py
def has_nftset_support(runner: CommandRunner) -> bool:
    """Return True if the installed dnsmasq supports ``--nftset``.

    Parses ``dnsmasq --version`` compile-time options for the ``nftset``
    feature flag.  Returns False if dnsmasq is not installed or its
    output contains ``no-nftset`` (explicitly disabled).
    """
    out = runner.run(["dnsmasq", "--version"], check=False)
    return bool(re.search(r"\bnftset\b", out)) and not bool(re.search(r"\bno-nftset\b", out))