Skip to content

db

db

SQLite-backed credential store, SSH key registry, and phantom token registry.

Provides host-side storage for the three kinds of secret material the vault mediates:

  • Provider credentials (API keys, OAuth tokens) stored as JSON blobs keyed by (credential_set, provider).
  • SSH keys stored as unencrypted PKCS#8 DER + SSH wire-format public blob, deduplicated by standard-format fingerprint, linked to project scopes through an assignments join table.
  • Phantom tokens minted per-(scope, subject) so containers can authenticate to the vault without ever seeing real credentials. subject is an opaque caller-supplied correlation label — the sandbox stores it verbatim and never interprets its contents. Callers (the orchestrator) decide what it identifies; today terok puts the task id there, but the sandbox treats it as a string label.

The database is never mounted into task containers — only the vault daemon reads it. sqlite3 in WAL mode gives lock-free concurrent reads across multiple terok processes (CLI commands, vault daemon, task runners).

Schema declarations and forward migrations live in terok_sandbox.vault.store.migrations — this module is the data-access layer only.

The on-disk file is always SQLCipher-encrypted; the passphrase resolution chain (keyring → credentials.passphrase config field) and the SQLCipher open helpers live in terok_sandbox.vault.store.encryption.

__all__ = ['CredentialDB', 'InvalidScopeName', 'NoPassphraseError', 'PlaintextDBFoundError', 'SSHKeyRecord', 'SSHKeyRow', 'UnsafeCommentError', 'WrongPassphraseError', 'ensure_credentials_schema', 'migrate_credential_db_schema', 'open_credential_db', 'open_credential_db_with_source'] module-attribute

NoPassphraseError

Bases: RuntimeError

No SQLCipher passphrase resolved — the DB cannot be opened.

WrongPassphraseError

Bases: RuntimeError

SQLCipher could not decrypt the DB — passphrase doesn't match its encryption key.

InvalidScopeName

Bases: ValueError

Raised when a scope name would be unsafe as a filesystem path segment.

Scopes are embedded verbatim in per-scope Unix-socket paths (ssh-agent-local-<scope>.sock), so unrestricted input could lead to traversal (../) or oversized sockaddr strings. Every write path that persists a scope validates through this helper first, so a malicious or buggy caller can't slip a hostile name past the CLI.

UnsafeCommentError

Bases: ValueError

Raised when a comment contains control characters or is too long.

Comments flow into SSH authorized_keys lines, public-line rendering, ssh-add -L output, and terminal summaries — so embedded newlines or escape sequences could break the wire format or spoof terminal output. Rejection happens at the storage entry points; every display site then trusts the DB to hold only safe strings.

SSHKeyRow(id, key_type, fingerprint, comment, created_at) dataclass

SSH key metadata — everything except the private material.

Returned from listing operations where the caller wants to render information about what is stored without decoding the private key.

id instance-attribute

key_type instance-attribute

fingerprint instance-attribute

comment instance-attribute

created_at instance-attribute

SSHKeyRecord(id, key_type, private_der, public_blob, comment, fingerprint) dataclass

SSH key record carrying both metadata and raw key bytes.

Returned from loading operations that feed the signer. The raw bytes are not decoded here — decoding is the signer's responsibility so the storage layer stays free of cryptography imports.

id instance-attribute

key_type instance-attribute

private_der instance-attribute

public_blob instance-attribute

comment instance-attribute

fingerprint instance-attribute

PlaintextDBFoundError

Bases: RuntimeError

A legacy plaintext sqlite DB was found where an encrypted one was expected.

CredentialDB(db_path, *, passphrase)

SQLite-backed store for provider credentials, SSH keys, and phantom tokens.

The on-disk file is always SQLCipher-encrypted. Callers either supply passphrase explicitly or leave it None to walk the runtime resolution chain (keyring → credentials.passphrase). A missing passphrase raises NoPassphraseError; a stale plaintext file raises PlaintextDBFoundError — both are diagnostic-only. Operator-facing remediation (which CLI verb to run, which doc page to read) is the caller's job: library code shouldn't bake one frontend's verbs into its exception text.

Source code in src/terok_sandbox/vault/store/db.py
def __init__(self, db_path: Path, *, passphrase: str) -> None:
    if not passphrase:
        raise NoPassphraseError(f"no SQLCipher passphrase available for {db_path}")
    db_path.parent.mkdir(parents=True, exist_ok=True)
    self._conn = _open_connection(db_path, passphrase)
    # Set by ``transaction()`` so write methods know whether to
    # commit themselves or defer to the outer scope.  Bool is fine
    # — ``BEGIN IMMEDIATE`` rejects nested calls, so the flag never
    # needs to count.
    self._in_outer_tx: bool = False
    try:
        self._conn.execute("PRAGMA journal_mode=WAL")
        self._conn.execute("PRAGMA foreign_keys=ON")
        ensure_credentials_schema(self._conn)
        migrate_credential_db_schema(self._conn)
    except _DB_ERRORS as exc:
        self._conn.close()
        if _looks_like_plaintext_db(db_path):
            raise PlaintextDBFoundError(
                f"{db_path} is a legacy plaintext sqlite DB — run "
                "`terok-sandbox credentials encrypt-db` to migrate it.\n"
                "  The migration path is deprecated in 0.8.0 and will be "
                "removed in 0.9.0; run it before upgrading past 0.8.x."
            ) from exc
        raise WrongPassphraseError(
            f"could not decrypt {db_path} — wrong passphrase, or the DB was"
            " created with a different key"
        ) from exc

transaction()

Run the body in an explicit BEGIN IMMEDIATE transaction.

Take the write lock up front so callers can compose read-then-write sequences and trust the whole thing serialises against concurrent writers. Every mutating method on this class (credentials, SSH keys, phantom tokens) consults the self._in_outer_tx flag this context manager sets and skips its own per-call commit — so the API contract is "any composition of write methods inside with db.transaction(): is atomic", with no kwarg plumbing at the call site.

On exit: COMMIT on clean exit, ROLLBACK on any BaseException (KeyboardInterrupt / SystemExit included — leaving a half-written %scope keypair around would be worse than a re-mint on retry).

Source code in src/terok_sandbox/vault/store/db.py
@contextlib.contextmanager
def transaction(self) -> Iterator[Any]:
    """Run the body in an explicit ``BEGIN IMMEDIATE`` transaction.

    Take the write lock up front so callers can compose
    read-then-write sequences and trust the whole thing serialises
    against concurrent writers.  Every mutating method on this
    class (credentials, SSH keys, phantom tokens) consults the
    ``self._in_outer_tx`` flag this context manager sets and skips
    its own per-call commit — so the API contract is "any
    composition of write methods inside ``with db.transaction():``
    is atomic", with no kwarg plumbing at the call site.

    On exit: ``COMMIT`` on clean exit, ``ROLLBACK`` on any
    ``BaseException`` (``KeyboardInterrupt`` / ``SystemExit``
    included — leaving a half-written ``%scope`` keypair around
    would be worse than a re-mint on retry).
    """
    self._conn.execute("BEGIN IMMEDIATE")
    self._in_outer_tx = True
    try:
        yield self._conn
    except BaseException:
        self._conn.execute("ROLLBACK")
        raise
    else:
        self._conn.execute("COMMIT")
    finally:
        self._in_outer_tx = False

store_credential(credential_set, provider, data)

Insert or replace a credential entry.

Source code in src/terok_sandbox/vault/store/db.py
def store_credential(self, credential_set: str, provider: str, data: dict) -> None:
    """Insert or replace a credential entry."""
    self._conn.execute(
        "INSERT OR REPLACE INTO credentials (credential_set, provider, data) VALUES (?, ?, ?)",
        (credential_set, provider, json.dumps(data)),
    )
    if not self._in_outer_tx:
        self._conn.commit()

load_credential(credential_set, provider)

Return the credential dict, or None if not found.

Source code in src/terok_sandbox/vault/store/db.py
def load_credential(self, credential_set: str, provider: str) -> dict | None:
    """Return the credential dict, or ``None`` if not found."""
    row = self._conn.execute(
        "SELECT data FROM credentials WHERE credential_set = ? AND provider = ?",
        (credential_set, provider),
    ).fetchone()
    return json.loads(row[0]) if row else None

list_credentials(credential_set)

Return provider names that have stored credentials.

Source code in src/terok_sandbox/vault/store/db.py
def list_credentials(self, credential_set: str) -> list[str]:
    """Return provider names that have stored credentials."""
    rows = self._conn.execute(
        "SELECT provider FROM credentials WHERE credential_set = ? ORDER BY provider",
        (credential_set,),
    ).fetchall()
    return [r[0] for r in rows]

list_credential_sets()

Return distinct credential-set names with at least one stored credential.

Source code in src/terok_sandbox/vault/store/db.py
def list_credential_sets(self) -> list[str]:
    """Return distinct credential-set names with at least one stored credential."""
    rows = self._conn.execute(
        "SELECT DISTINCT credential_set FROM credentials ORDER BY credential_set"
    ).fetchall()
    return [r[0] for r in rows]

delete_credential(credential_set, provider)

Remove a credential entry (idempotent).

Source code in src/terok_sandbox/vault/store/db.py
def delete_credential(self, credential_set: str, provider: str) -> None:
    """Remove a credential entry (idempotent)."""
    self._conn.execute(
        "DELETE FROM credentials WHERE credential_set = ? AND provider = ?",
        (credential_set, provider),
    )
    if not self._in_outer_tx:
        self._conn.commit()

store_ssh_key(key_type, private_der, public_blob, comment, fingerprint)

Register a keypair, dedup-by-fingerprint; return the ssh_keys.id.

When a row with the same fingerprint already exists the stored bytes and comment are left untouched (the caller is re-asserting an already-known key, which is expected on repeat ssh-import).

Auto-commits unless called inside a transaction() scope — in which case the outer block owns the commit.

Source code in src/terok_sandbox/vault/store/db.py
def store_ssh_key(
    self,
    key_type: str,
    private_der: bytes,
    public_blob: bytes,
    comment: str,
    fingerprint: str,
) -> int:
    """Register a keypair, dedup-by-fingerprint; return the ``ssh_keys.id``.

    When a row with the same fingerprint already exists the stored bytes
    and comment are left untouched (the caller is re-asserting an
    already-known key, which is expected on repeat ``ssh-import``).

    Auto-commits unless called inside a
    [`transaction()`][terok_sandbox.vault.store.db.CredentialDB.transaction]
    scope — in which case the outer block owns the commit.
    """
    self._conn.execute(
        "INSERT OR IGNORE INTO ssh_keys"
        " (key_type, private_der, public_blob, comment, fingerprint)"
        " VALUES (?, ?, ?, ?, ?)",
        (key_type, private_der, public_blob, comment, fingerprint),
    )
    if not self._in_outer_tx:
        self._conn.commit()
    row = self._conn.execute(
        "SELECT id FROM ssh_keys WHERE fingerprint = ?",
        (fingerprint,),
    ).fetchone()
    return row[0]

get_ssh_key_by_fingerprint(fingerprint)

Look up a key by fingerprint; returns metadata only.

Source code in src/terok_sandbox/vault/store/db.py
def get_ssh_key_by_fingerprint(self, fingerprint: str) -> SSHKeyRow | None:
    """Look up a key by fingerprint; returns metadata only."""
    row = self._conn.execute(
        "SELECT id, key_type, fingerprint, comment, created_at"
        " FROM ssh_keys WHERE fingerprint = ?",
        (fingerprint,),
    ).fetchone()
    return SSHKeyRow(*row) if row else None

set_ssh_key_comment(fingerprint, comment)

Update the comment of the key with fingerprint.

Returns True if a row was updated, False if the fingerprint is unknown. The comment is validated by the same safety helper that gates import_ssh_keypair — control characters and overlong strings raise UnsafeCommentError so the storage-entry-point invariant holds for this path too.

The new comment surfaces to subsequent ssh-add -L queries from the container because the signer resolves keys fresh from the DB on every request.

Source code in src/terok_sandbox/vault/store/db.py
def set_ssh_key_comment(self, fingerprint: str, comment: str) -> bool:
    """Update the comment of the key with *fingerprint*.

    Returns ``True`` if a row was updated, ``False`` if the fingerprint
    is unknown.  The comment is validated by the same safety helper
    that gates ``import_ssh_keypair`` — control characters and
    overlong strings raise
    [`UnsafeCommentError`][terok_sandbox.vault.store.db.UnsafeCommentError]
    so the storage-entry-point invariant holds for this path too.

    The new comment surfaces to subsequent ``ssh-add -L`` queries from
    the container because the signer resolves keys fresh from the DB
    on every request.
    """
    _require_safe_comment(comment)
    cur = self._conn.execute(
        "UPDATE ssh_keys SET comment = ? WHERE fingerprint = ?",
        (comment, fingerprint),
    )
    if not self._in_outer_tx:
        self._conn.commit()
    return bool(cur.rowcount)

assign_ssh_key(scope, key_id, *, allow_infra=False)

Grant scope access to key_id (idempotent).

Rejects unsafe scope names with InvalidScopeName — the value is later embedded in per-scope Unix-socket paths, so traversal-like strings (../, /) must not be persisted.

By default also rejects %-prefixed infrastructure scopes so callers driven by user input can't write to sandbox-reserved names (%host for the krun host-side keypair, future %name slots). Sandbox internals that legitimately provision infrastructure scopes pass allow_infra=True.

Auto-commits unless called inside a transaction() scope — in which case the outer block owns the commit.

Source code in src/terok_sandbox/vault/store/db.py
def assign_ssh_key(self, scope: str, key_id: int, *, allow_infra: bool = False) -> None:
    """Grant *scope* access to *key_id* (idempotent).

    Rejects unsafe scope names with [`InvalidScopeName`][terok_sandbox.vault.store.db.InvalidScopeName] — the
    value is later embedded in per-scope Unix-socket paths, so
    traversal-like strings (``../``, ``/``) must not be persisted.

    By default also rejects ``%``-prefixed infrastructure scopes so
    callers driven by user input can't write to sandbox-reserved
    names (``%host`` for the krun host-side keypair, future
    ``%name`` slots).  Sandbox internals that legitimately provision
    infrastructure scopes pass ``allow_infra=True``.

    Auto-commits unless called inside a
    [`transaction()`][terok_sandbox.vault.store.db.CredentialDB.transaction]
    scope — in which case the outer block owns the commit.
    """
    if allow_infra:
        _require_safe_scope(scope)
    else:
        _require_user_scope(scope)
    self._conn.execute(
        "INSERT OR IGNORE INTO ssh_key_assignments (scope, key_id) VALUES (?, ?)",
        (scope, key_id),
    )
    if not self._in_outer_tx:
        self._conn.commit()

unassign_ssh_key(scope, key_id, *, allow_infra=False)

Revoke scope's access to key_id; drop the key row if orphaned.

Refuses %-prefixed infrastructure scopes by default — pair with allow_infra=True for sandbox internals that need to decommission a reserved scope.

Source code in src/terok_sandbox/vault/store/db.py
def unassign_ssh_key(self, scope: str, key_id: int, *, allow_infra: bool = False) -> None:
    """Revoke *scope*'s access to *key_id*; drop the key row if orphaned.

    Refuses ``%``-prefixed infrastructure scopes by default — pair
    with ``allow_infra=True`` for sandbox internals that need to
    decommission a reserved scope.
    """
    if allow_infra:
        _require_safe_scope(scope)
    else:
        _require_user_scope(scope)
    cur = self._conn.execute(
        "DELETE FROM ssh_key_assignments WHERE scope = ? AND key_id = ?",
        (scope, key_id),
    )
    if cur.rowcount:
        self._conn.execute(
            "DELETE FROM ssh_keys WHERE id = ? AND NOT EXISTS ("
            "  SELECT 1 FROM ssh_key_assignments WHERE key_id = ?"
            ")",
            (key_id, key_id),
        )
    if not self._in_outer_tx:
        self._conn.commit()

replace_ssh_keys_for_scope(scope, *, keep_key_id, allow_infra=False)

Atomically make keep_key_id the scope's sole assigned key.

Wraps the "assign new + revoke every other" sequence in a single SQLite transaction so two concurrent init(force=True) calls can't both leave their own keys assigned — whichever transaction commits last wins the scope, and exactly one primary survives. Orphaned ssh_keys rows for revoked keys are cleaned up in the same step via unassign_ssh_key semantics.

Refuses %-prefixed infrastructure scopes by default; sandbox internals provisioning infra keys pass allow_infra=True.

Source code in src/terok_sandbox/vault/store/db.py
def replace_ssh_keys_for_scope(
    self, scope: str, *, keep_key_id: int, allow_infra: bool = False
) -> None:
    """Atomically make *keep_key_id* the scope's sole assigned key.

    Wraps the "assign new + revoke every other" sequence in a single
    SQLite transaction so two concurrent ``init(force=True)`` calls
    can't both leave their own keys assigned — whichever transaction
    commits last wins the scope, and exactly one primary survives.
    Orphaned ``ssh_keys`` rows for revoked keys are cleaned up in the
    same step via ``unassign_ssh_key`` semantics.

    Refuses ``%``-prefixed infrastructure scopes by default; sandbox
    internals provisioning infra keys pass ``allow_infra=True``.
    """
    if allow_infra:
        _require_safe_scope(scope)
    else:
        _require_user_scope(scope)

    def _body() -> None:
        self._conn.execute(
            "INSERT OR IGNORE INTO ssh_key_assignments (scope, key_id) VALUES (?, ?)",
            (scope, keep_key_id),
        )
        stale_ids = [
            r[0]
            for r in self._conn.execute(
                "SELECT key_id FROM ssh_key_assignments WHERE scope = ? AND key_id != ?",
                (scope, keep_key_id),
            ).fetchall()
        ]
        if stale_ids:
            # ``placeholders`` is a fixed-length string of ``?`` marks,
            # never user input — the variadic IN() clause is the reason
            # we build the SQL with f-string instead of plain params.
            placeholders = ",".join("?" * len(stale_ids))
            self._conn.execute(
                f"DELETE FROM ssh_key_assignments"  # nosec B608
                f" WHERE scope = ? AND key_id IN ({placeholders})",
                (scope, *stale_ids),
            )
            self._conn.execute(
                f"DELETE FROM ssh_keys WHERE id IN ({placeholders})"  # nosec B608
                f" AND NOT EXISTS ("
                f"  SELECT 1 FROM ssh_key_assignments WHERE key_id = ssh_keys.id"
                f")",
                tuple(stale_ids),
            )

    # Same ``_in_outer_tx`` pattern as the rest of the write methods.
    # ``with self._conn:`` is the sqlite3 connection's own auto-commit
    # context — it would clobber the outer ``BEGIN IMMEDIATE`` that
    # ``transaction()`` started.  When inside an outer scope, run the
    # body raw and let the outer block own the commit; standalone
    # callers still get the self-contained connection-managed
    # transaction they used to.
    if self._in_outer_tx:
        _body()
    else:
        with self._conn:
            _body()

unassign_all_ssh_keys(scope, *, allow_infra=False)

Revoke every key currently assigned to scope. Returns count removed.

Refuses %-prefixed infrastructure scopes by default — pair with allow_infra=True for sandbox internals.

Source code in src/terok_sandbox/vault/store/db.py
def unassign_all_ssh_keys(self, scope: str, *, allow_infra: bool = False) -> int:
    """Revoke every key currently assigned to *scope*.  Returns count removed.

    Refuses ``%``-prefixed infrastructure scopes by default — pair
    with ``allow_infra=True`` for sandbox internals.
    """
    if allow_infra:
        _require_safe_scope(scope)
    else:
        _require_user_scope(scope)
    key_ids = [
        r[0]
        for r in self._conn.execute(
            "SELECT key_id FROM ssh_key_assignments WHERE scope = ?",
            (scope,),
        ).fetchall()
    ]
    for kid in key_ids:
        self.unassign_ssh_key(scope, kid, allow_infra=allow_infra)
    return len(key_ids)

list_ssh_keys_for_scope(scope)

Return metadata rows for every key assigned to scope.

Ordered by assigned_at with k.id as a secondary key so two assignments inside the same SQLite-second (datetime('now') has 1-second resolution) sort by insert order rather than implementation-defined order. Callers that do rows[-1] to pick "the most recently assigned" get a deterministic answer even under sub-second concurrency.

Source code in src/terok_sandbox/vault/store/db.py
def list_ssh_keys_for_scope(self, scope: str) -> list[SSHKeyRow]:
    """Return metadata rows for every key assigned to *scope*.

    Ordered by ``assigned_at`` with ``k.id`` as a secondary key so
    two assignments inside the same SQLite-second (``datetime('now')``
    has 1-second resolution) sort by insert order rather than
    implementation-defined order.  Callers that do ``rows[-1]`` to
    pick "the most recently assigned" get a deterministic answer
    even under sub-second concurrency.
    """
    rows = self._conn.execute(
        "SELECT k.id, k.key_type, k.fingerprint, k.comment, k.created_at"
        " FROM ssh_keys k"
        " JOIN ssh_key_assignments a ON a.key_id = k.id"
        " WHERE a.scope = ?"
        " ORDER BY a.assigned_at, k.id",
        (scope,),
    ).fetchall()
    return [SSHKeyRow(*r) for r in rows]

load_ssh_keys_for_scope(scope)

Return full records (with raw bytes) for every key assigned to scope.

Same deterministic ordering as list_ssh_keys_for_scopeassigned_at first, then k.id as the sub-second tiebreak.

Source code in src/terok_sandbox/vault/store/db.py
def load_ssh_keys_for_scope(self, scope: str) -> list[SSHKeyRecord]:
    """Return full records (with raw bytes) for every key assigned to *scope*.

    Same deterministic ordering as
    [`list_ssh_keys_for_scope`][terok_sandbox.vault.store.db.CredentialDB.list_ssh_keys_for_scope]
    — ``assigned_at`` first, then ``k.id`` as the sub-second tiebreak.
    """
    rows = self._conn.execute(
        "SELECT k.id, k.key_type, k.private_der, k.public_blob,"
        " k.comment, k.fingerprint"
        " FROM ssh_keys k"
        " JOIN ssh_key_assignments a ON a.key_id = k.id"
        " WHERE a.scope = ?"
        " ORDER BY a.assigned_at, k.id",
        (scope,),
    ).fetchall()
    return [SSHKeyRecord(*r) for r in rows]

list_scopes_with_ssh_keys()

Return every scope that currently has at least one assigned key.

Source code in src/terok_sandbox/vault/store/db.py
def list_scopes_with_ssh_keys(self) -> list[str]:
    """Return every scope that currently has at least one assigned key."""
    rows = self._conn.execute(
        "SELECT DISTINCT scope FROM ssh_key_assignments ORDER BY scope",
    ).fetchall()
    return [r[0] for r in rows]

count_ssh_keys()

Return the number of distinct keypairs stored in the DB.

Counts ssh_keys rows (deduplicated by fingerprint) rather than ssh_key_assignments rows — a single key shared across scopes is one stored key, not N. Surfaces to TUI/CLI status consumers so they can show a count without opening the DB themselves.

Source code in src/terok_sandbox/vault/store/db.py
def count_ssh_keys(self) -> int:
    """Return the number of distinct keypairs stored in the DB.

    Counts ``ssh_keys`` rows (deduplicated by fingerprint) rather
    than ``ssh_key_assignments`` rows — a single key shared across
    scopes is one stored key, not N.  Surfaces to TUI/CLI status
    consumers so they can show a count without opening the DB
    themselves.
    """
    row = self._conn.execute("SELECT count(*) FROM ssh_keys").fetchone()
    return row[0] if row else 0

create_token(scope, subject, credential_set, provider)

Mint a phantom token bound to (scope, subject, credential_set, provider).

subject is an opaque caller-supplied correlation label — the sandbox stores it verbatim and never interprets its contents. Today terok puts the orchestrator's task id there; the sandbox treats the value as a string.

Token format: terok-p-<32 hex chars>.

Source code in src/terok_sandbox/vault/store/db.py
def create_token(self, scope: str, subject: str, credential_set: str, provider: str) -> str:
    """Mint a phantom token bound to ``(scope, subject, credential_set, provider)``.

    ``subject`` is an opaque caller-supplied correlation label — the
    sandbox stores it verbatim and never interprets its contents.
    Today terok puts the orchestrator's task id there; the sandbox
    treats the value as a string.

    Token format: ``terok-p-<32 hex chars>``.
    """
    token = f"terok-p-{secrets.token_hex(16)}"
    self._conn.execute(
        "INSERT INTO proxy_tokens (token, scope, subject, credential_set, provider)"
        " VALUES (?, ?, ?, ?, ?)",
        (token, scope, subject, credential_set, provider),
    )
    if not self._in_outer_tx:
        self._conn.commit()
    return token

lookup_token(token)

Return {scope, subject, credential_set, provider} or None.

Source code in src/terok_sandbox/vault/store/db.py
def lookup_token(self, token: str) -> dict | None:
    """Return ``{scope, subject, credential_set, provider}`` or ``None``."""
    row = self._conn.execute(
        "SELECT scope, subject, credential_set, provider FROM proxy_tokens WHERE token = ?",
        (token,),
    ).fetchone()
    if row is None:
        return None
    return {
        "scope": row[0],
        "subject": row[1],
        "credential_set": row[2],
        "provider": row[3],
    }

list_tokens()

Return every proxy-token row as a list of dicts.

Read-only inventory for operator-facing CLI inspection (terok vault list --include-tokens). The raw token value is included so the operator can cross-reference what's actually mounted into containers; callers MUST mask it before display.

Source code in src/terok_sandbox/vault/store/db.py
def list_tokens(self) -> list[dict]:
    """Return every proxy-token row as a list of dicts.

    Read-only inventory for operator-facing CLI inspection
    (``terok vault list --include-tokens``).  The raw token value
    is included so the operator can cross-reference what's actually
    mounted into containers; callers MUST mask it before display.
    """
    rows = self._conn.execute(
        "SELECT token, scope, subject, credential_set, provider"
        " FROM proxy_tokens ORDER BY scope, subject, provider, token"
    ).fetchall()
    return [
        {
            "token": r[0],
            "scope": r[1],
            "subject": r[2],
            "credential_set": r[3],
            "provider": r[4],
        }
        for r in rows
    ]

revoke_tokens(scope, subject)

Revoke every phantom token bound to (scope, subject).

Returns the number of rows removed. The sandbox makes no claim about what subject identifies; callers (the orchestrator) pass whatever opaque label they used at create_token time.

Source code in src/terok_sandbox/vault/store/db.py
def revoke_tokens(self, scope: str, subject: str) -> int:
    """Revoke every phantom token bound to ``(scope, subject)``.

    Returns the number of rows removed.  The sandbox makes no claim
    about what ``subject`` identifies; callers (the orchestrator) pass
    whatever opaque label they used at
    [`create_token`][terok_sandbox.vault.store.db.CredentialDB.create_token]
    time.
    """
    cur = self._conn.execute(
        "DELETE FROM proxy_tokens WHERE scope = ? AND subject = ?",
        (scope, subject),
    )
    if not self._in_outer_tx:
        self._conn.commit()
    return cur.rowcount

close()

Close the database connection.

Source code in src/terok_sandbox/vault/store/db.py
def close(self) -> None:
    """Close the database connection."""
    self._conn.close()

__del__()

Best-effort close on garbage collection.

Source code in src/terok_sandbox/vault/store/db.py
def __del__(self) -> None:
    """Best-effort close on garbage collection."""
    try:
        self._conn.close()
    except Exception:  # noqa: BLE001  # nosec B110 — best-effort __del__ close on GC
        pass

ensure_credentials_schema(conn)

Create the credential / SSH-key / phantom-token tables if missing.

Idempotent — every statement is IF NOT EXISTS. Exposed at module level so every opener of the DB file runs it before issuing queries. Without this, a daemon that opens an empty DB on a fresh install (before any CLI command has touched the file) hits no such table: credentials on the first query and crashes the unit.

Source code in src/terok_sandbox/vault/store/migrations.py
def ensure_credentials_schema(conn: sqlite3.Connection) -> None:
    """Create the credential / SSH-key / phantom-token tables if missing.

    Idempotent — every statement is ``IF NOT EXISTS``.  Exposed at module
    level so every opener of the DB file runs it before issuing queries.
    Without this, a daemon that opens an empty DB on a fresh install
    (before any CLI command has touched the file) hits ``no such table:
    credentials`` on the first query and crashes the unit.
    """
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS credentials (
            credential_set TEXT NOT NULL,
            provider       TEXT NOT NULL,
            data           TEXT NOT NULL,
            PRIMARY KEY (credential_set, provider)
        );
        CREATE TABLE IF NOT EXISTS ssh_keys (
            id           INTEGER PRIMARY KEY AUTOINCREMENT,
            key_type     TEXT    NOT NULL CHECK (key_type IN ('ed25519','rsa')),
            private_der  BLOB    NOT NULL,
            public_blob  BLOB    NOT NULL,
            comment      TEXT    NOT NULL DEFAULT '',
            fingerprint  TEXT    NOT NULL UNIQUE,
            created_at   TEXT    NOT NULL DEFAULT (datetime('now'))
        );
        CREATE TABLE IF NOT EXISTS ssh_key_assignments (
            scope        TEXT    NOT NULL,
            key_id       INTEGER NOT NULL REFERENCES ssh_keys(id) ON DELETE CASCADE,
            assigned_at  TEXT    NOT NULL DEFAULT (datetime('now')),
            PRIMARY KEY (scope, key_id)
        );
        CREATE TABLE IF NOT EXISTS proxy_tokens (
            token          TEXT PRIMARY KEY,
            scope          TEXT NOT NULL,
            subject        TEXT NOT NULL,
            credential_set TEXT NOT NULL,
            provider       TEXT NOT NULL
        );
    """)
    conn.commit()

migrate_credential_db_schema(conn)

Walk legacy credential-DB rows forward to the current schema.

Tracked via PRAGMA user_version so the whole function is a no-op on already-upgraded DBs. Each current < N branch handles one forward step; the final PRAGMA user_version set commits the whole upgrade in one go.

Exposed at module level so every opener of the DB file (CredentialDB for writers, _TokenDB in the vault daemon for readers) runs it before issuing queries — otherwise a daemon that restarts before any CLI command has touched the DB would hit "no such column: …" on a freshly-upgraded host.

The cryptography import is scoped to the v0 → v1 branch so already-migrated DBs (the common case) don't pay an import cost, and the storage module keeps tach-clean at import time.

Source code in src/terok_sandbox/vault/store/migrations.py
def migrate_credential_db_schema(conn: sqlite3.Connection) -> None:
    """Walk legacy credential-DB rows forward to the current schema.

    Tracked via ``PRAGMA user_version`` so the whole function is a no-op
    on already-upgraded DBs.  Each ``current < N`` branch handles one
    forward step; the final ``PRAGMA user_version`` set commits the
    whole upgrade in one go.

    Exposed at module level so every opener of the DB file
    ([`CredentialDB`][terok_sandbox.vault.store.db.CredentialDB] for
    writers, ``_TokenDB`` in the vault daemon for readers) runs it
    before issuing queries — otherwise a daemon that restarts before any
    CLI command has touched the DB would hit "no such column: …" on a
    freshly-upgraded host.

    The ``cryptography`` import is scoped to the v0 → v1 branch so
    already-migrated DBs (the common case) don't pay an import cost,
    and the storage module keeps tach-clean at import time.
    """
    (current,) = conn.execute("PRAGMA user_version").fetchone()
    if current >= SCHEMA_VERSION:
        return

    if current < 1:
        _migrate_v0_to_v1(conn)

    if current < 2:
        _migrate_v1_to_v2(conn)

    conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}")
    conn.commit()

open_credential_db_with_source(db_path, *, passphrase_file=None, systemd_creds_file=None, use_keyring=False, passphrase_command=None, config_fallback=None, prompt_on_tty=False)

Same as open_credential_db but also returns which tier the passphrase came from.

Lets a TUI/CLI status display label the unlocked vault by its source without re-walking the chain itself.

Source code in src/terok_sandbox/vault/store/db.py
def open_credential_db_with_source(
    db_path: Path,
    *,
    passphrase_file: Path | None = None,
    systemd_creds_file: Path | None = None,
    use_keyring: bool = False,
    passphrase_command: str | None = None,
    config_fallback: str | None = None,
    prompt_on_tty: bool = False,
) -> tuple[CredentialDB, PassphraseSource]:
    """Same as [`open_credential_db`][terok_sandbox.vault.store.db.open_credential_db]
    but also returns which tier the passphrase came from.

    Lets a TUI/CLI status display label the unlocked vault by its source
    without re-walking the chain itself.
    """
    from .encryption import resolve_passphrase_with_source  # noqa: PLC0415

    passphrase, source = resolve_passphrase_with_source(
        passphrase_file=passphrase_file,
        systemd_creds_file=systemd_creds_file,
        use_keyring=use_keyring,
        passphrase_command=passphrase_command,
        config_fallback=config_fallback,
        prompt_on_tty=prompt_on_tty,
    )
    if passphrase is None or source is None:
        raise NoPassphraseError(f"no SQLCipher passphrase available for {db_path}")
    return CredentialDB(db_path, passphrase=passphrase), source

open_credential_db(db_path, *, passphrase_file=None, systemd_creds_file=None, use_keyring=False, passphrase_command=None, config_fallback=None, prompt_on_tty=False)

Open the credential DB, resolving the passphrase via the runtime chain.

Walks: passphrase_file (tmpfs session-unlock) → systemd_creds_file (sealed credential decrypted via systemd-creds(1)) → OS keyring (when use_keyring) → passphrase_command (operator-supplied helper, e.g. pass show … / op read …) → config_fallback → (when prompt_on_tty and a TTY is attached) interactive prompt. CLI consumers pass prompt_on_tty=True; daemons leave it False so they fail fast instead of blocking on stdin.

Source code in src/terok_sandbox/vault/store/db.py
def open_credential_db(
    db_path: Path,
    *,
    passphrase_file: Path | None = None,
    systemd_creds_file: Path | None = None,
    use_keyring: bool = False,
    passphrase_command: str | None = None,
    config_fallback: str | None = None,
    prompt_on_tty: bool = False,
) -> CredentialDB:
    """Open the credential DB, resolving the passphrase via the runtime chain.

    Walks: *passphrase_file* (tmpfs session-unlock) → *systemd_creds_file*
    (sealed credential decrypted via ``systemd-creds(1)``) → OS keyring
    (when *use_keyring*) → *passphrase_command* (operator-supplied
    helper, e.g. ``pass show …`` / ``op read …``) → *config_fallback*
    → (when *prompt_on_tty* and a TTY is attached) interactive prompt.
    CLI consumers pass ``prompt_on_tty=True``; daemons leave it
    ``False`` so they fail fast instead of blocking on stdin.
    """
    db, _source = open_credential_db_with_source(
        db_path,
        passphrase_file=passphrase_file,
        systemd_creds_file=systemd_creds_file,
        use_keyring=use_keyring,
        passphrase_command=passphrase_command,
        config_fallback=config_fallback,
        prompt_on_tty=prompt_on_tty,
    )
    return db