Skip to content

credential_extractors

credential_extractors

Per-provider credential extractors for the auth interceptor.

Each extractor reads a vendor-specific credential file from a temporary auth container mount and returns a normalized dict suitable for storage in :class:~terok_sandbox.CredentialDB. The dict must contain at least one of access_token, token, or key — the credential proxy server uses these fields to inject the real auth header.

All extractors are pure functions: Path → dict. They raise ValueError if the file is missing, malformed, or empty.

extract_claude_oauth(base_dir)

Extract Claude credentials — OAuth tokens or API key.

Claude stores OAuth data in .credentials.json under claudeAiOauth.token.{accessToken, refreshToken}. If the user authenticated with an API key instead, config.json contains {"api_key": "..."}. Both paths are checked.

Source code in src/terok_agent/credential_extractors.py
def extract_claude_oauth(base_dir: Path) -> dict:
    """Extract Claude credentials — OAuth tokens or API key.

    Claude stores OAuth data in ``.credentials.json`` under
    ``claudeAiOauth.token.{accessToken, refreshToken}``.  If the user
    authenticated with an API key instead, ``config.json`` contains
    ``{"api_key": "..."}``.  Both paths are checked.
    """
    # Try OAuth first (.credentials.json)
    # Structure: {"claudeAiOauth": {"accessToken": "...", "refreshToken": "...", ...}}
    cred_file = base_dir / ".credentials.json"
    data = _try_read_json(cred_file)
    if data is not None:
        oauth = data.get("claudeAiOauth", {})
        if isinstance(oauth, dict):
            access_token = oauth.get("accessToken")
            if access_token:
                # Claude Code is a JS app: expiresAt is milliseconds since
                # epoch (Date.now() convention).  Values > 1e12 are
                # unambiguously ms; convert to POSIX seconds so the proxy
                # refresh check (time.time()) works correctly.
                expires_at_raw = oauth.get("expiresAt")
                expires_at: float | None = None
                if isinstance(expires_at_raw, (int, float)) and not isinstance(
                    expires_at_raw, bool
                ):
                    expires_at = expires_at_raw / 1000 if expires_at_raw > 1e12 else expires_at_raw
                return {
                    "type": "oauth",
                    "access_token": access_token,
                    "refresh_token": oauth.get("refreshToken", ""),
                    "expires_at": expires_at,
                    "scopes": oauth.get("scopes", ""),
                    "subscription_type": oauth.get("subscriptionType"),
                    "rate_limit_tier": oauth.get("rateLimitTier"),
                }

    # Fall back to API key (config.json)
    config_file = base_dir / "config.json"
    config_data = _try_read_json(config_file)
    if config_data is not None:
        api_key = config_data.get("api_key")
        if api_key:
            return {"type": "api_key", "key": api_key}

    raise ValueError(
        f"No Claude credentials found in {base_dir} "
        "(checked .credentials.json for OAuth, config.json for API key)"
    )

extract_codex_oauth(base_dir)

Extract Codex (OpenAI) OAuth tokens from auth.json.

Source code in src/terok_agent/credential_extractors.py
def extract_codex_oauth(base_dir: Path) -> dict:
    """Extract Codex (OpenAI) OAuth tokens from ``auth.json``."""
    cred_file = base_dir / "auth.json"
    data = _try_read_json(cred_file)
    if data is None:
        raise ValueError(f"Codex credential file not found or unreadable: {cred_file}")

    tokens = _expect_mapping(data.get("tokens", {}), context=f"{cred_file}:tokens")
    access_token = tokens.get("access_token")
    if not access_token:
        raise ValueError("Codex credential file has no access_token")

    return {
        "type": "oauth",
        "access_token": access_token,
        "refresh_token": tokens.get("refresh_token", ""),
    }

extract_api_key_env(base_dir, filename='.env', var_name='')

Extract an API key from a dotenv-style file (e.g. Vibe's .env).

Looks for VAR_NAME=value lines. If var_name is empty, takes the first non-comment, non-empty value.

Source code in src/terok_agent/credential_extractors.py
def extract_api_key_env(base_dir: Path, filename: str = ".env", var_name: str = "") -> dict:
    """Extract an API key from a dotenv-style file (e.g. Vibe's ``.env``).

    Looks for ``VAR_NAME=value`` lines.  If *var_name* is empty, takes
    the first non-comment, non-empty value.
    """
    env_file = base_dir / filename
    try:
        text = env_file.read_text(encoding="utf-8")
    except OSError:
        raise ValueError(f"Env file not found or unreadable: {env_file}")

    for line in text.splitlines():
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        if "=" not in line:
            continue
        key, _, value = line.partition("=")
        if var_name and key.strip() != var_name:
            continue
        value = value.strip().strip("'\"")
        if value:
            return {"type": "api_key", "key": value}

    raise ValueError(f"No API key found in {env_file}")

extract_json_api_key(base_dir, filename='config.json')

Extract an API key from a JSON config file (blablador, kisski).

Expects {"api_key": "..."} at the top level.

Source code in src/terok_agent/credential_extractors.py
def extract_json_api_key(base_dir: Path, filename: str = "config.json") -> dict:
    """Extract an API key from a JSON config file (blablador, kisski).

    Expects ``{"api_key": "..."}`` at the top level.
    """
    cred_file = base_dir / filename
    data = _try_read_json(cred_file)
    if data is None:
        raise ValueError(f"JSON config not found or unreadable: {cred_file}")
    key = data.get("api_key")
    if not key:
        raise ValueError(f"No api_key field in {cred_file}")

    return {"type": "api_key", "key": key}

extract_gh_token(base_dir)

Extract GitHub token from hosts.yml.

The gh CLI stores tokens per host under github.com.oauth_token.

Source code in src/terok_agent/credential_extractors.py
def extract_gh_token(base_dir: Path) -> dict:
    """Extract GitHub token from ``hosts.yml``.

    The gh CLI stores tokens per host under ``github.com.oauth_token``.
    """
    hosts_file = base_dir / "hosts.yml"
    from ruamel.yaml import YAML

    yaml = YAML(typ="safe")
    try:
        data = yaml.load(hosts_file)
    except OSError:
        raise ValueError(f"GitHub hosts file not found or unreadable: {hosts_file}")
    if not isinstance(data, dict):
        raise ValueError(f"Unexpected hosts.yml format: {type(data)}")

    # Try github.com first, then remaining hosts (deduplicated)
    ordered = ["github.com"] + [h for h in data if h != "github.com"]
    for host in ordered:
        host_data = data.get(host, {})
        if isinstance(host_data, dict):
            token = host_data.get("oauth_token")
            if token:
                return {"type": "oauth_token", "token": token, "host": host}

    raise ValueError("No oauth_token found in hosts.yml")

extract_glab_token(base_dir)

Extract GitLab token from config.yml.

The glab CLI stores tokens per host under hosts.<host>.token.

Source code in src/terok_agent/credential_extractors.py
def extract_glab_token(base_dir: Path) -> dict:
    """Extract GitLab token from ``config.yml``.

    The glab CLI stores tokens per host under ``hosts.<host>.token``.
    """
    config_file = base_dir / "config.yml"
    from ruamel.yaml import YAML

    yaml = YAML(typ="safe")
    try:
        data = yaml.load(config_file)
    except OSError:
        raise ValueError(f"GitLab config not found or unreadable: {config_file}")
    if not isinstance(data, dict):
        raise ValueError(f"Unexpected config.yml format: {type(data)}")

    hosts = _expect_mapping(data.get("hosts", {}), context=f"{config_file}:hosts")
    for host, host_data in hosts.items():
        if isinstance(host_data, dict):
            token = host_data.get("token")
            if token:
                return {"type": "pat", "token": token, "host": host}

    raise ValueError("No token found in glab config.yml")

extract_credential(provider, base_dir)

Run the appropriate extractor for provider against base_dir.

Raises ValueError if no extractor is registered or extraction fails.

Source code in src/terok_agent/credential_extractors.py
def extract_credential(provider: str, base_dir: Path) -> dict:
    """Run the appropriate extractor for *provider* against *base_dir*.

    Raises ``ValueError`` if no extractor is registered or extraction fails.
    """
    entry = EXTRACTORS.get(provider)
    if entry is None:
        raise ValueError(f"No credential extractor for provider {provider!r}")
    fn, *args = entry
    return fn(base_dir, *args)