Source code for hubvault.server.schemas

"""
Request-normalization and small payload helpers for :mod:`hubvault.server`.

The current server phases keep request and response handling explicit instead
of introducing a large schema layer all at once. This module centralizes the
small validation helpers that readonly route modules need.

The module contains:

* :func:`normalize_paths_request` - Normalize path-selection request bodies
* :func:`normalize_snapshot_plan_request` - Normalize snapshot-plan request bodies
* :func:`normalize_commit_manifest_request` - Normalize write-commit manifests
* :func:`normalize_gc_request` - Normalize GC request bodies
* :func:`normalize_squash_history_request` - Normalize history-squash request bodies
* :func:`build_error_payload` - Build the stable JSON error wrapper
"""

from typing import Iterable, List, Optional, Sequence

from ..errors import HubVaultValidationError


def _normalize_pattern_list(values, field_name: str) -> List[str]:
    """
    Normalize one optional glob-pattern field.

    :param values: Raw pattern value or values
    :type values: Optional[Union[Sequence[str], str]]
    :param field_name: Request field name used in validation messages
    :type field_name: str
    :return: Normalized pattern list
    :rtype: List[str]
    :raises HubVaultValidationError: Raised when the input is not a string or
        list of strings.
    """

    if values is None:
        return []
    if isinstance(values, str):
        return [values]
    if isinstance(values, Sequence):
        normalized = []
        for item in values:
            if not isinstance(item, str):
                raise HubVaultValidationError("%s items must be strings." % (field_name,))
            normalized.append(item)
        return normalized
    raise HubVaultValidationError("%s must be a string or a list of strings." % (field_name,))


[docs] def normalize_paths_request(payload) -> List[str]: """ Normalize a ``paths-info`` request body. The body may be either a raw JSON array or an object carrying a ``paths`` field. :param payload: Raw decoded JSON payload :type payload: object :return: Normalized repo-relative path list :rtype: List[str] :raises HubVaultValidationError: Raised when the payload shape is invalid. """ values = payload if isinstance(payload, dict): values = payload.get("paths") if isinstance(values, str): return [values] if isinstance(values, Sequence): normalized = [] for item in values: if not isinstance(item, str): raise HubVaultValidationError("paths items must be strings.") normalized.append(item) return normalized raise HubVaultValidationError("Request body must be a path string, a path array, or an object with a 'paths' field.")
[docs] def normalize_snapshot_plan_request(payload) -> dict: """ Normalize a ``snapshot-plan`` request body. :param payload: Raw decoded JSON payload :type payload: object :return: Normalized snapshot-plan options :rtype: dict :raises HubVaultValidationError: Raised when the payload shape is invalid. """ if payload is None: payload = {} if not isinstance(payload, dict): raise HubVaultValidationError("Request body must be a JSON object.") return { "allow_patterns": _normalize_pattern_list(payload.get("allow_patterns"), "allow_patterns"), "ignore_patterns": _normalize_pattern_list(payload.get("ignore_patterns"), "ignore_patterns"), }
def _normalize_optional_string(value, field_name: str) -> Optional[str]: """ Normalize one optional string field. :param value: Raw field value :type value: object :param field_name: Field name used in validation messages :type field_name: str :return: Normalized string or ``None`` :rtype: Optional[str] :raises HubVaultValidationError: Raised when the field is not a string. """ if value is None: return None if not isinstance(value, str): raise HubVaultValidationError("%s must be a string." % (field_name,)) return value def _normalize_required_string(value, field_name: str) -> str: """ Normalize one required string field. :param value: Raw field value :type value: object :param field_name: Field name used in validation messages :type field_name: str :return: Normalized string :rtype: str :raises HubVaultValidationError: Raised when the field is not a string. """ normalized = _normalize_optional_string(value, field_name) if normalized is None: raise HubVaultValidationError("%s must be a string." % (field_name,)) return normalized def _normalize_bool(value, field_name: str, default: bool) -> bool: """ Normalize one boolean field. :param value: Raw field value :type value: object :param field_name: Field name used in validation messages :type field_name: str :param default: Default value when the field is missing :type default: bool :return: Normalized boolean value :rtype: bool :raises HubVaultValidationError: Raised when the field is not boolean. """ if value is None: return bool(default) if not isinstance(value, bool): raise HubVaultValidationError("%s must be a boolean." % (field_name,)) return value def _normalize_non_negative_int(value, field_name: str) -> int: """ Normalize one non-negative integer field. :param value: Raw field value :type value: object :param field_name: Field name used in validation messages :type field_name: str :return: Normalized integer value :rtype: int :raises HubVaultValidationError: Raised when the field is not a non-negative integer. """ if not isinstance(value, int) or isinstance(value, bool): raise HubVaultValidationError("%s must be a non-negative integer." % (field_name,)) if value < 0: raise HubVaultValidationError("%s must be a non-negative integer." % (field_name,)) return int(value) def _normalize_sha256_hex(value, field_name: str) -> str: """ Normalize one public SHA-256 field. :param value: Raw field value :type value: object :param field_name: Field name used in validation messages :type field_name: str :return: Bare hexadecimal SHA-256 digest :rtype: str :raises HubVaultValidationError: Raised when the field is not a string. """ normalized = _normalize_required_string(value, field_name).strip() if normalized.startswith("sha256:"): normalized = normalized[len("sha256:"):] return normalized def _normalize_chunk_descriptor(payload, index: int) -> dict: """ Normalize one chunk descriptor from a write manifest. :param payload: Raw chunk payload :type payload: object :param index: Chunk index used in validation messages :type index: int :return: Normalized chunk descriptor :rtype: dict :raises HubVaultValidationError: Raised when the payload shape is invalid. """ if not isinstance(payload, dict): raise HubVaultValidationError("operations[%d].chunks[%d] must be a JSON object." % (index, index)) return { "chunk_id": _normalize_required_string(payload.get("chunk_id"), "chunk_id"), "checksum": _normalize_required_string(payload.get("checksum"), "checksum"), "logical_offset": _normalize_non_negative_int(payload.get("logical_offset"), "logical_offset"), "logical_size": _normalize_non_negative_int(payload.get("logical_size"), "logical_size"), "stored_size": _normalize_non_negative_int(payload.get("stored_size"), "stored_size"), "compression": _normalize_required_string(payload.get("compression"), "compression"), } def _normalize_manifest_operation(payload, index: int) -> dict: """ Normalize one write-manifest operation. :param payload: Raw operation payload :type payload: object :param index: Operation index used in validation messages :type index: int :return: Normalized operation payload :rtype: dict :raises HubVaultValidationError: Raised when the payload shape is invalid. """ if not isinstance(payload, dict): raise HubVaultValidationError("operations[%d] must be a JSON object." % (index,)) operation_type = _normalize_required_string(payload.get("type"), "type") if operation_type == "add": chunks = payload.get("chunks") or [] if not isinstance(chunks, list): raise HubVaultValidationError("operations[%d].chunks must be a JSON array." % (index,)) return { "type": "add", "path_in_repo": _normalize_required_string(payload.get("path_in_repo"), "path_in_repo"), "size": _normalize_non_negative_int(payload.get("size"), "size"), "sha256": _normalize_sha256_hex(payload.get("sha256"), "sha256"), "chunks": [_normalize_chunk_descriptor(item, index) for item in chunks], } if operation_type == "delete": return { "type": "delete", "path_in_repo": _normalize_required_string(payload.get("path_in_repo"), "path_in_repo"), "is_folder": _normalize_bool(payload.get("is_folder"), "is_folder", default=False), } if operation_type == "copy": return { "type": "copy", "src_path_in_repo": _normalize_required_string(payload.get("src_path_in_repo"), "src_path_in_repo"), "path_in_repo": _normalize_required_string(payload.get("path_in_repo"), "path_in_repo"), "src_revision": _normalize_optional_string(payload.get("src_revision"), "src_revision"), } raise HubVaultValidationError("Unsupported write operation type: %s." % (operation_type,)) def _normalize_plan_operation(payload, index: int) -> dict: """ Normalize one planned write operation. :param payload: Raw plan operation payload :type payload: object :param index: Operation index used in validation messages :type index: int :return: Normalized plan operation payload :rtype: dict :raises HubVaultValidationError: Raised when the payload shape is invalid. """ if not isinstance(payload, dict): raise HubVaultValidationError("upload_plan.operations[%d] must be a JSON object." % (index,)) missing_chunks = payload.get("missing_chunks") or [] if not isinstance(missing_chunks, list): raise HubVaultValidationError("upload_plan.operations[%d].missing_chunks must be a JSON array." % (index,)) normalized_missing_chunks = [] for chunk_index, chunk_payload in enumerate(missing_chunks): if not isinstance(chunk_payload, dict): raise HubVaultValidationError( "upload_plan.operations[%d].missing_chunks[%d] must be a JSON object." % (index, chunk_index) ) normalized_missing_chunks.append( { "chunk_id": _normalize_required_string(chunk_payload.get("chunk_id"), "chunk_id"), "chunk_index": _normalize_non_negative_int(chunk_payload.get("chunk_index"), "chunk_index"), "field_name": _normalize_required_string(chunk_payload.get("field_name"), "field_name"), "logical_size": _normalize_non_negative_int(chunk_payload.get("logical_size"), "logical_size"), } ) return { "index": _normalize_non_negative_int(payload.get("index"), "index"), "type": _normalize_required_string(payload.get("type"), "type"), "path_in_repo": _normalize_optional_string(payload.get("path_in_repo"), "path_in_repo"), "strategy": _normalize_required_string(payload.get("strategy"), "strategy"), "field_name": _normalize_optional_string(payload.get("field_name"), "field_name"), "source_path_in_repo": _normalize_optional_string(payload.get("source_path_in_repo"), "source_path_in_repo"), "source_revision": _normalize_optional_string(payload.get("source_revision"), "source_revision"), "missing_chunks": normalized_missing_chunks, "reused_chunk_count": _normalize_non_negative_int( payload.get("reused_chunk_count", 0), "reused_chunk_count", ), "missing_chunk_count": _normalize_non_negative_int( payload.get("missing_chunk_count", len(normalized_missing_chunks)), "missing_chunk_count", ), }
[docs] def normalize_commit_manifest_request(payload) -> dict: """ Normalize a write-commit manifest or apply payload. :param payload: Raw decoded JSON payload :type payload: object :return: Normalized write manifest payload :rtype: dict :raises HubVaultValidationError: Raised when the payload shape is invalid. """ if not isinstance(payload, dict): raise HubVaultValidationError("Request body must be a JSON object.") operations = payload.get("operations") if not isinstance(operations, list): raise HubVaultValidationError("operations must be a JSON array.") upload_plan = payload.get("upload_plan") normalized_plan = None if upload_plan is not None: if not isinstance(upload_plan, dict): raise HubVaultValidationError("upload_plan must be a JSON object.") plan_operations = upload_plan.get("operations") if not isinstance(plan_operations, list): raise HubVaultValidationError("upload_plan.operations must be a JSON array.") normalized_plan = { "revision": _normalize_required_string(upload_plan.get("revision"), "upload_plan.revision"), "base_head": _normalize_optional_string(upload_plan.get("base_head"), "upload_plan.base_head"), "operations": [ _normalize_plan_operation(item, index) for index, item in enumerate(plan_operations) ], "statistics": dict(upload_plan.get("statistics") or {}), } return { "revision": _normalize_optional_string(payload.get("revision"), "revision"), "parent_commit": _normalize_optional_string(payload.get("parent_commit"), "parent_commit"), "commit_message": _normalize_required_string(payload.get("commit_message"), "commit_message"), "commit_description": _normalize_optional_string(payload.get("commit_description"), "commit_description"), "operations": [_normalize_manifest_operation(item, index) for index, item in enumerate(operations)], "upload_plan": normalized_plan, }
[docs] def normalize_gc_request(payload) -> dict: """ Normalize a GC request body. :param payload: Raw decoded JSON payload :type payload: object :return: Normalized GC options :rtype: dict :raises HubVaultValidationError: Raised when the payload shape is invalid. """ if payload is None: payload = {} if not isinstance(payload, dict): raise HubVaultValidationError("Request body must be a JSON object.") return { "dry_run": _normalize_bool(payload.get("dry_run"), "dry_run", default=False), "prune_cache": _normalize_bool(payload.get("prune_cache"), "prune_cache", default=True), }
[docs] def normalize_squash_history_request(payload) -> dict: """ Normalize a history-squash request body. :param payload: Raw decoded JSON payload :type payload: object :return: Normalized squash-history options :rtype: dict :raises HubVaultValidationError: Raised when the payload shape is invalid. """ if not isinstance(payload, dict): raise HubVaultValidationError("Request body must be a JSON object.") return { "ref_name": _normalize_required_string(payload.get("ref_name"), "ref_name"), "root_revision": _normalize_optional_string(payload.get("root_revision"), "root_revision"), "commit_message": _normalize_optional_string(payload.get("commit_message"), "commit_message"), "commit_description": _normalize_optional_string(payload.get("commit_description"), "commit_description"), "run_gc": _normalize_bool(payload.get("run_gc"), "run_gc", default=True), "prune_cache": _normalize_bool(payload.get("prune_cache"), "prune_cache", default=False), }
[docs] def build_error_payload(error_type: str, message: str) -> dict: """ Build the stable JSON error wrapper used by the server layer. :param error_type: Stable application error type name :type error_type: str :param message: Human-readable error message :type message: str :return: JSON-compatible error payload :rtype: dict """ return { "error": { "type": error_type, "message": message, } }