Source code for hubvault.server.auth

"""
Token parsing and authorization helpers for :mod:`hubvault.server`.

This module keeps bearer-token parsing and access-level checks independent from
the FastAPI app factory so the authorization policy can be tested without API
extras installed.

The module contains:

* :class:`AuthContext` - Resolved token identity for one request
* :class:`TokenAuthorizer` - Read/write token resolver
* :func:`parse_request_token` - Header parser for supported token inputs
"""

from dataclasses import dataclass
from typing import Optional


[docs] @dataclass(frozen=True) class AuthContext: """ Resolved token identity for one request. :param access: Access level string, currently ``"ro"`` or ``"rw"`` :type access: str :param token: Original bearer token text :type token: str """ access: str token: str @property def can_write(self) -> bool: """ Whether the token grants write access. :return: ``True`` when the token carries read-write permissions :rtype: bool """ return self.access == "rw"
[docs] def parse_request_token( authorization: Optional[str] = None, x_hubvault_token: Optional[str] = None, query_token: Optional[str] = None, ) -> Optional[str]: """ Extract a token from supported request inputs. ``X-HubVault-Token`` takes precedence over ``Authorization``, and the optional query-string token acts as a final read-only fallback for browser resource URLs that cannot attach authorization headers. :param authorization: Raw ``Authorization`` header value :type authorization: Optional[str] :param x_hubvault_token: Raw ``X-HubVault-Token`` header value :type x_hubvault_token: Optional[str] :param query_token: Raw ``token`` query-string value :type query_token: Optional[str] :return: Normalized token string or ``None`` when no supported token is present :rtype: Optional[str] """ if x_hubvault_token: token = x_hubvault_token.strip() return token or None if not authorization: if query_token: token = query_token.strip() return token or None return None scheme, _, value = authorization.partition(" ") if scheme.strip().lower() != "bearer": if query_token: token = query_token.strip() return token or None return None token = value.strip() if token: return token if query_token: token = query_token.strip() return token or None return None
[docs] class TokenAuthorizer: """ Resolve read-only and read-write API tokens. :param token_ro: Read-only token values :type token_ro: Iterable[str] :param token_rw: Read-write token values :type token_rw: Iterable[str] """
[docs] def __init__(self, token_ro, token_rw) -> None: """ Build one token authorizer from normalized token collections. :param token_ro: Read-only token values :type token_ro: Iterable[str] :param token_rw: Read-write token values :type token_rw: Iterable[str] :return: ``None``. :rtype: None """ self._token_ro = frozenset(token_ro) self._token_rw = frozenset(token_rw)
[docs] def resolve(self, token: Optional[str]) -> AuthContext: """ Resolve one raw token into an :class:`AuthContext`. :param token: Raw token string extracted from the request :type token: Optional[str] :return: Resolved authorization context :rtype: AuthContext :raises PermissionError: Raised when the token is missing or invalid. """ if not token: raise PermissionError("Missing authentication token.") if token in self._token_rw: return AuthContext(access="rw", token=token) if token in self._token_ro: return AuthContext(access="ro", token=token) raise PermissionError("Invalid authentication token.")
[docs] def require_write(self, context: AuthContext) -> AuthContext: """ Ensure the current token grants write access. :param context: Previously resolved authorization context :type context: AuthContext :return: The same authorization context when write access is allowed :rtype: AuthContext :raises PermissionError: Raised when the token is read-only. """ if not context.can_write: raise PermissionError("Write access is required for this operation.") return context
[docs] def build_read_auth_dependency(authorizer: TokenAuthorizer): """ Create a FastAPI dependency that enforces read access. :param authorizer: Token authorizer shared by the server app :type authorizer: TokenAuthorizer :return: FastAPI dependency callable that returns :class:`AuthContext` :rtype: Callable[..., Awaitable[AuthContext]] :raises hubvault.optional.MissingOptionalDependencyError: Raised when the API extra is not installed. """ from ..optional import import_optional_dependency fastapi = import_optional_dependency( "fastapi", extra="api", feature="server authentication dependencies", missing_names={"starlette", "pydantic"}, ) Header = fastapi.Header HTTPException = fastapi.HTTPException Query = fastapi.Query status = fastapi.status async def _dependency( authorization: Optional[str] = Header(default=None), x_hubvault_token: Optional[str] = Header(default=None, alias="X-HubVault-Token"), token: Optional[str] = Query(default=None), ) -> AuthContext: """ Resolve read access for one request. :param authorization: Raw bearer authorization header :type authorization: Optional[str] :param x_hubvault_token: Raw direct token header :type x_hubvault_token: Optional[str] :param token: Raw ``token`` query-string value :type token: Optional[str] :return: Resolved authorization context :rtype: AuthContext :raises fastapi.HTTPException: Raised when the token is missing or invalid. """ token = parse_request_token( authorization=authorization, x_hubvault_token=x_hubvault_token, query_token=token, ) try: return authorizer.resolve(token) except PermissionError as err: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(err)) return _dependency
[docs] def build_write_auth_dependency(authorizer: TokenAuthorizer): """ Create a FastAPI dependency that enforces write access. :param authorizer: Token authorizer shared by the server app :type authorizer: TokenAuthorizer :return: FastAPI dependency callable that returns :class:`AuthContext` :rtype: Callable[..., Awaitable[AuthContext]] :raises hubvault.optional.MissingOptionalDependencyError: Raised when the API extra is not installed. """ from ..optional import import_optional_dependency fastapi = import_optional_dependency( "fastapi", extra="api", feature="server authentication dependencies", missing_names={"starlette", "pydantic"}, ) Header = fastapi.Header HTTPException = fastapi.HTTPException status = fastapi.status async def _dependency( authorization: Optional[str] = Header(default=None), x_hubvault_token: Optional[str] = Header(default=None, alias="X-HubVault-Token"), ) -> AuthContext: """ Resolve write access for one request. :param authorization: Raw bearer authorization header :type authorization: Optional[str] :param x_hubvault_token: Raw direct token header :type x_hubvault_token: Optional[str] :return: Resolved authorization context with write access :rtype: AuthContext :raises fastapi.HTTPException: Raised when authentication fails or the token is read-only. """ token = parse_request_token(authorization=authorization, x_hubvault_token=x_hubvault_token) try: context = authorizer.resolve(token) return authorizer.require_write(context) except PermissionError as err: detail = str(err) status_code = status.HTTP_401_UNAUTHORIZED if "token" in detail.lower() else status.HTTP_403_FORBIDDEN raise HTTPException(status_code=status_code, detail=detail) return _dependency