Source code for hubvault.server.routes.writes

"""
Write-route factory for :mod:`hubvault.server`.

This module exposes the authenticated mutation endpoints used by the remote
client and frontend write flows. Upload-like operations use a two-step
``commit-plan`` / ``commit`` protocol so callers can avoid stale preflight
results and reduce transferred bytes through exact-copy and chunk-reuse fast
paths.

The module contains:

* :func:`create_writes_router` - Build the ``/api/v1/write`` router
"""

import json

from ..auth import build_write_auth_dependency
from ..deps import build_repo_api_getter
from ..schemas import normalize_commit_manifest_request
from ..serde import encode_commit_info, encode_merge_result
from ..uploads import apply_commit_manifest, plan_commit_manifest
from ...errors import HubVaultValidationError


def _normalize_json_object(payload, endpoint_name: str) -> dict:
    """
    Normalize one route payload that must be a JSON object.

    :param payload: Raw route payload
    :type payload: object
    :param endpoint_name: Endpoint label used in validation messages
    :type endpoint_name: str
    :return: Normalized JSON object payload
    :rtype: dict
    :raises HubVaultValidationError: Raised when the payload is not an object.
    """

    if not isinstance(payload, dict):
        raise HubVaultValidationError("%s request body must be a JSON object." % (endpoint_name,))
    return payload


def _require_string_field(payload: dict, field_name: str, endpoint_name: str) -> str:
    """
    Require one string field from a route payload.

    :param payload: Normalized JSON object payload
    :type payload: dict
    :param field_name: Required field name
    :type field_name: str
    :param endpoint_name: Endpoint label used in validation messages
    :type endpoint_name: str
    :return: Normalized string field
    :rtype: str
    :raises HubVaultValidationError: Raised when the field is missing or not a
        string.
    """

    value = payload.get(field_name)
    if not isinstance(value, str):
        raise HubVaultValidationError("%s.%s must be a string." % (endpoint_name, field_name))
    return value


def _optional_string_field(payload: dict, field_name: str, endpoint_name: str):
    """
    Normalize one optional string field from a route payload.

    :param payload: Normalized JSON object payload
    :type payload: dict
    :param field_name: Optional field name
    :type field_name: str
    :param endpoint_name: Endpoint label used in validation messages
    :type endpoint_name: str
    :return: Normalized string value or ``None``
    :rtype: Optional[str]
    :raises HubVaultValidationError: Raised when the field is not a string.
    """

    value = payload.get(field_name)
    if value is None:
        return None
    if not isinstance(value, str):
        raise HubVaultValidationError("%s.%s must be a string." % (endpoint_name, field_name))
    return value


def _optional_bool_field(payload: dict, field_name: str, endpoint_name: str, default: bool = False) -> bool:
    """
    Normalize one optional boolean field from a route payload.

    :param payload: Normalized JSON object payload
    :type payload: dict
    :param field_name: Optional field name
    :type field_name: str
    :param endpoint_name: Endpoint label used in validation messages
    :type endpoint_name: str
    :param default: Default value when the field is missing
    :type default: bool
    :return: Normalized boolean field
    :rtype: bool
    :raises HubVaultValidationError: Raised when the field is not boolean.
    """

    value = payload.get(field_name)
    if value is None:
        return bool(default)
    if not isinstance(value, bool):
        raise HubVaultValidationError("%s.%s must be a boolean." % (endpoint_name, field_name))
    return value


async def _parse_commit_apply_payload(request) -> tuple:
    """
    Parse a write-commit apply payload from JSON or multipart form data.

    :param request: Incoming FastAPI request
    :type request: fastapi.Request
    :return: Tuple of ``(payload, uploads)``
    :rtype: tuple
    :raises HubVaultValidationError: Raised when the payload is malformed.
    """

    content_type = str(request.headers.get("content-type", ""))
    if content_type.startswith("multipart/form-data"):
        try:
            form = await request.form()
        except ValueError as err:
            raise HubVaultValidationError("Invalid multipart payload: %s." % (err,))
        manifest_text = form.get("manifest")
        if not isinstance(manifest_text, str):
            raise HubVaultValidationError("multipart form field 'manifest' must contain JSON text.")
        try:
            payload = json.loads(manifest_text)
        except ValueError as err:
            raise HubVaultValidationError("multipart form field 'manifest' must contain valid JSON: %s." % (err,))
        uploads = {}
        for field_name, value in form.items():
            if field_name == "manifest":
                continue
            read = getattr(value, "read", None)
            if read is None:
                continue
            uploads[field_name] = bytes(await read())
        return payload, uploads

    try:
        return await request.json(), {}
    except ValueError as err:
        raise HubVaultValidationError("Request body must contain valid JSON: %s." % (err,))


[docs] def create_writes_router(*, api=None, api_factory=None, authorizer): """ Build the write router for the server app. :param api: Optional repository API reused by the router :type api: Optional[hubvault.api.HubVaultApi] :param api_factory: Optional zero-argument factory returning one fresh repository API per request :type api_factory: Optional[Callable[[], hubvault.api.HubVaultApi]] :param authorizer: Shared token authorizer :type authorizer: hubvault.server.auth.TokenAuthorizer :return: Router exposing authenticated write endpoints :rtype: fastapi.APIRouter :raises hubvault.optional.MissingOptionalDependencyError: Raised when the API extra is not installed. :raises TypeError: Raised when both ``api`` and ``api_factory`` are provided or when neither input is provided. """ from ...optional import import_optional_dependency fastapi = import_optional_dependency( "fastapi", extra="api", feature="server write routes", missing_names={"starlette", "pydantic"}, ) APIRouter = fastapi.APIRouter Body = fastapi.Body Depends = fastapi.Depends Request = fastapi.Request router = APIRouter(prefix="/api/v1/write", tags=["write"]) get_api = build_repo_api_getter(api=api, api_factory=api_factory) require_write = build_write_auth_dependency(authorizer) @router.post("/commit-plan") def commit_plan(payload=Body(...), auth=Depends(require_write)): """ Plan one write-commit upload session. :param payload: Raw write-manifest payload :type payload: object :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible upload plan :rtype: dict """ del auth return plan_commit_manifest(get_api(), normalize_commit_manifest_request(payload)) @router.post("/commit") async def create_commit(request: Request, auth=Depends(require_write)): """ Apply one previously planned write-commit upload session. :param request: Incoming HTTP request :type request: fastapi.Request :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible commit metadata :rtype: dict """ del auth payload, uploads = await _parse_commit_apply_payload(request) commit_info = apply_commit_manifest(get_api(), normalize_commit_manifest_request(payload), uploads) return encode_commit_info(commit_info) @router.post("/branches") def create_branch(payload=Body(...), auth=Depends(require_write)): """ Create one branch ref. :param payload: Raw branch-create payload :type payload: object :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible success marker :rtype: dict """ del auth data = _normalize_json_object(payload, "create_branch") get_api().create_branch( branch=_require_string_field(data, "branch", "create_branch"), revision=_optional_string_field(data, "revision", "create_branch"), exist_ok=_optional_bool_field(data, "exist_ok", "create_branch", default=False), ) return {"ok": True} @router.delete("/branches/{branch:path}") def delete_branch(branch: str, auth=Depends(require_write)): """ Delete one branch ref. :param branch: Branch name to delete :type branch: str :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible success marker :rtype: dict """ del auth get_api().delete_branch(branch=branch) return {"ok": True} @router.post("/tags") def create_tag(payload=Body(...), auth=Depends(require_write)): """ Create one lightweight tag. :param payload: Raw tag-create payload :type payload: object :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible success marker :rtype: dict """ del auth data = _normalize_json_object(payload, "create_tag") get_api().create_tag( tag=_require_string_field(data, "tag", "create_tag"), tag_message=_optional_string_field(data, "tag_message", "create_tag"), revision=_optional_string_field(data, "revision", "create_tag"), exist_ok=_optional_bool_field(data, "exist_ok", "create_tag", default=False), ) return {"ok": True} @router.delete("/tags/{tag:path}") def delete_tag(tag: str, auth=Depends(require_write)): """ Delete one lightweight tag. :param tag: Tag name to delete :type tag: str :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible success marker :rtype: dict """ del auth get_api().delete_tag(tag=tag) return {"ok": True} @router.post("/merge") def merge(payload=Body(...), auth=Depends(require_write)): """ Merge one source revision into a target branch. :param payload: Raw merge payload :type payload: object :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible merge result :rtype: dict """ del auth data = _normalize_json_object(payload, "merge") result = get_api().merge( source_revision=_require_string_field(data, "source_revision", "merge"), target_revision=_optional_string_field(data, "target_revision", "merge"), parent_commit=_optional_string_field(data, "parent_commit", "merge"), commit_message=_optional_string_field(data, "commit_message", "merge"), commit_description=_optional_string_field(data, "commit_description", "merge"), ) return encode_merge_result(result) @router.post("/reset-ref") def reset_ref(payload=Body(...), auth=Depends(require_write)): """ Reset one branch ref to a target revision. :param payload: Raw reset-ref payload :type payload: object :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible commit metadata :rtype: dict """ del auth data = _normalize_json_object(payload, "reset_ref") result = get_api().reset_ref( _require_string_field(data, "ref_name", "reset_ref"), to_revision=_require_string_field(data, "to_revision", "reset_ref"), ) return encode_commit_info(result) @router.post("/delete-file") def delete_file(payload=Body(...), auth=Depends(require_write)): """ Delete one file path through the public write API. :param payload: Raw delete-file payload :type payload: object :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible commit metadata :rtype: dict """ del auth data = _normalize_json_object(payload, "delete_file") result = get_api().delete_file( _require_string_field(data, "path_in_repo", "delete_file"), revision=_optional_string_field(data, "revision", "delete_file"), commit_message=_optional_string_field(data, "commit_message", "delete_file"), commit_description=_optional_string_field(data, "commit_description", "delete_file"), parent_commit=_optional_string_field(data, "parent_commit", "delete_file"), ) return encode_commit_info(result) @router.post("/delete-folder") def delete_folder(payload=Body(...), auth=Depends(require_write)): """ Delete one folder subtree through the public write API. :param payload: Raw delete-folder payload :type payload: object :param auth: Resolved caller authorization context :type auth: hubvault.server.auth.AuthContext :return: JSON-compatible commit metadata :rtype: dict """ del auth data = _normalize_json_object(payload, "delete_folder") result = get_api().delete_folder( _require_string_field(data, "path_in_repo", "delete_folder"), revision=_optional_string_field(data, "revision", "delete_folder"), commit_message=_optional_string_field(data, "commit_message", "delete_folder"), commit_description=_optional_string_field(data, "commit_description", "delete_folder"), parent_commit=_optional_string_field(data, "parent_commit", "delete_folder"), ) return encode_commit_info(result) return router