Skip to content

audit

audit

Per-container JSON-lines audit logging.

Writes structured events (setup, teardown, allow, deny) to a single file per container. Can be toggled on/off at runtime without losing the file handle.

AuditLogger(*, audit_path, enabled=True)

JSON-lines audit logger for a single container.

Writes to a single file (audit_path). When disabled, all write operations are no-ops.

Create an audit logger.

Parameters:

Name Type Description Default
audit_path Path

Path to the .jsonl audit log file.

required
enabled bool

Whether logging is active (can be toggled later).

True
Source code in src/terok_shield/lib/audit.py
def __init__(self, *, audit_path: Path, enabled: bool = True) -> None:
    """Create an audit logger.

    Args:
        audit_path: Path to the ``.jsonl`` audit log file.
        enabled: Whether logging is active (can be toggled later).
    """
    self._audit_path = audit_path
    self._enabled = enabled

enabled property writable

Whether audit logging is active.

log_event(container, action, *, dest=None, detail=None)

Write a single audit event to the log file.

No-op when audit is disabled.

Parameters:

Name Type Description Default
container str

Container name.

required
action str

Event type (setup, teardown, allowed, denied).

required
dest str | None

Destination IP/domain (optional).

None
detail str | None

Additional detail string (optional).

None
Source code in src/terok_shield/lib/audit.py
def log_event(
    self,
    container: str,
    action: str,
    *,
    dest: str | None = None,
    detail: str | None = None,
) -> None:
    """Write a single audit event to the log file.

    No-op when audit is disabled.

    Args:
        container: Container name.
        action: Event type (setup, teardown, allowed, denied).
        dest: Destination IP/domain (optional).
        detail: Additional detail string (optional).
    """
    if not self._enabled:
        return
    entry: dict = {
        "ts": datetime.now(UTC).isoformat(timespec="seconds"),
        "container": container,
        "action": action,
    }
    if dest is not None:
        entry["dest"] = dest
    if detail is not None:
        entry["detail"] = detail

    try:
        self._audit_path.parent.mkdir(parents=True, exist_ok=True)
        with self._audit_path.open("a") as f:
            f.write(json.dumps(entry, separators=(",", ":")) + "\n")
    except OSError as e:
        logger.warning("Failed to write audit log to %s: %s", self._audit_path, e)

tail_log(n=50)

Yield the last n audit events.

Parameters:

Name Type Description Default
n int

Number of recent events to yield.

50
Source code in src/terok_shield/lib/audit.py
def tail_log(self, n: int = 50) -> Iterator[dict]:
    """Yield the last *n* audit events.

    Args:
        n: Number of recent events to yield.
    """
    if not self._audit_path.is_file():
        return

    lines = self._audit_path.read_text().splitlines()
    for line in lines[-n:] if n > 0 else []:
        try:
            yield json.loads(line)
        except json.JSONDecodeError:
            continue