Skip to content

audit_log

audit_log

Tail audit.jsonl and emit events for shield lifecycle changes.

Watches for new JSON-lines entries written by :class:~terok_shield.lib.audit.AuditLogger and surfaces them as :class:WatchEvent instances with source="audit".

AuditLogWatcher(audit_path, container)

Tail audit.jsonl and yield events for shield lifecycle changes.

Open audit_path and seek to end.

Parameters:

Name Type Description Default
audit_path Path

Path to the per-container audit.jsonl file.

required
container str

Container name (for event metadata).

required
Source code in src/terok_shield/lib/watchers/audit_log.py
def __init__(self, audit_path: Path, container: str) -> None:
    """Open *audit_path* and seek to end.

    Args:
        audit_path: Path to the per-container ``audit.jsonl`` file.
        container: Container name (for event metadata).
    """
    self._audit_path = audit_path
    self._container = container
    audit_path.touch(exist_ok=True)
    self._fh = open(audit_path)  # noqa: SIM115 — needs fileno() for select
    self._fh.seek(0, os.SEEK_END)

fileno()

Return the file descriptor for select.select() multiplexing.

Source code in src/terok_shield/lib/watchers/audit_log.py
def fileno(self) -> int:
    """Return the file descriptor for ``select.select()`` multiplexing."""
    return self._fh.fileno()

close()

Close the underlying file handle.

Source code in src/terok_shield/lib/watchers/audit_log.py
def close(self) -> None:
    """Close the underlying file handle."""
    self._fh.close()

poll()

Read new audit lines and return watch events.

Source code in src/terok_shield/lib/watchers/audit_log.py
def poll(self) -> list[WatchEvent]:
    """Read new audit lines and return watch events."""
    events: list[WatchEvent] = []
    while line := self._fh.readline():
        line = line.strip()
        if not line:
            continue
        try:
            entry = json.loads(line)
        except json.JSONDecodeError:
            continue
        if not isinstance(entry, dict):
            continue
        events.append(
            WatchEvent(
                ts=entry.get("ts", datetime.now(UTC).isoformat()),
                source="audit",
                action=entry.get("action", "unknown"),
                container=entry.get("container", self._container),
                dest=entry.get("dest", ""),
                detail=entry.get("detail", ""),
            )
        )
    return events