"""
Serialization helpers for :mod:`hubvault.remote`.
This module reconstructs public ``hubvault`` dataclasses and exception types
from server JSON payloads.
The module contains:
* :func:`decode_json_payload` - Normalize one decoded JSON payload
* :func:`decode_error_response` - Map one error payload back to a public exception
* :func:`decode_commit_change_info` - Decode one commit-diff entry
* :func:`decode_commit_detail_info` - Decode one commit-detail payload
"""
from datetime import datetime
from ..errors import (
ConflictError,
EntryNotFoundError,
HubVaultError,
HubVaultValidationError,
IntegrityError,
RepositoryAlreadyExistsError,
RepositoryNotFoundError,
RevisionNotFoundError,
UnsupportedPathError,
VerificationError,
)
from ..models import (
BlobLfsInfo,
BlobSecurityInfo,
CommitChangeInfo,
CommitDetailInfo,
CommitInfo,
CommitFileVersionInfo,
GcReport,
GitCommitInfo,
GitRefInfo,
GitRefs,
LastCommitInfo,
MergeConflict,
MergeResult,
ReflogEntry,
RepoFile,
RepoFolder,
RepoInfo,
SquashReport,
StorageOverview,
StorageSectionInfo,
VerifyReport,
)
from .errors import HubVaultRemoteAuthError, HubVaultRemoteProtocolError
_PUBLIC_ERROR_TYPES = {
error_type.__name__: error_type
for error_type in [
ConflictError,
EntryNotFoundError,
HubVaultValidationError,
IntegrityError,
RepositoryAlreadyExistsError,
RepositoryNotFoundError,
RevisionNotFoundError,
UnsupportedPathError,
VerificationError,
]
}
def _require_dict(payload, name: str) -> dict:
"""
Validate that one payload value is a JSON object.
:param payload: Raw decoded JSON payload
:type payload: object
:param name: Human-readable payload name
:type name: str
:return: JSON object payload
:rtype: dict
:raises HubVaultRemoteProtocolError: Raised when the payload is not an object.
"""
if not isinstance(payload, dict):
raise HubVaultRemoteProtocolError("%s must be a JSON object." % (name,))
return payload
def _parse_datetime(value, field_name: str) -> datetime:
"""
Parse one ISO-8601 datetime string.
:param value: Raw JSON field value
:type value: object
:param field_name: Field name used in validation messages
:type field_name: str
:return: Parsed datetime value
:rtype: datetime.datetime
:raises HubVaultRemoteProtocolError: Raised when the value is missing or invalid.
"""
if not isinstance(value, str):
raise HubVaultRemoteProtocolError("%s must be an ISO-8601 string." % (field_name,))
try:
return datetime.fromisoformat(value)
except ValueError as err:
raise HubVaultRemoteProtocolError("Invalid datetime for %s: %s" % (field_name, err))
def _decode_last_commit_info(payload):
"""
Decode optional last-commit metadata.
:param payload: Raw last-commit payload
:type payload: object
:return: Decoded last-commit metadata or ``None``
:rtype: Optional[LastCommitInfo]
"""
if payload is None:
return None
data = _require_dict(payload, "last_commit")
return LastCommitInfo(
oid=str(data["oid"]),
title=str(data["title"]),
date=_parse_datetime(data["date"], "last_commit.date"),
)
def _decode_blob_security_info(payload):
"""
Decode optional blob-security metadata.
:param payload: Raw blob-security payload
:type payload: object
:return: Decoded blob-security metadata or ``None``
:rtype: Optional[BlobSecurityInfo]
"""
if payload is None:
return None
data = _require_dict(payload, "security")
return BlobSecurityInfo(
safe=bool(data["safe"]),
status=str(data["status"]),
av_scan=data.get("av_scan"),
pickle_import_scan=data.get("pickle_import_scan"),
)
def _decode_blob_lfs_info(payload):
"""
Decode optional large-file metadata.
:param payload: Raw large-file payload
:type payload: object
:return: Decoded large-file metadata or ``None``
:rtype: Optional[BlobLfsInfo]
"""
if payload is None:
return None
data = _require_dict(payload, "lfs")
return BlobLfsInfo(
size=int(data["size"]),
sha256=str(data["sha256"]),
pointer_size=int(data["pointer_size"]),
)
[docs]
def decode_json_payload(payload):
"""
Return the decoded JSON payload unchanged for the skeleton stage.
:param payload: Decoded JSON-compatible payload
:type payload: object
:return: The same payload value
:rtype: object
"""
return payload
[docs]
def decode_error_response(payload, *, status_code: int):
"""
Map one error payload back to a public exception object.
:param payload: Raw decoded JSON payload
:type payload: object
:param status_code: HTTP status code attached to the response
:type status_code: int
:return: Public exception instance matching the server response
:rtype: Exception
"""
data = _require_dict(payload, "error response")
error_data = data.get("error")
if isinstance(error_data, dict):
message = str(error_data.get("message", "Remote request failed."))
error_type_name = str(error_data.get("type", ""))
else:
message = str(data.get("detail", "Remote request failed."))
error_type_name = ""
if status_code in {401, 403}:
return HubVaultRemoteAuthError(message)
error_type = _PUBLIC_ERROR_TYPES.get(error_type_name)
if error_type is not None:
return error_type(message)
return HubVaultRemoteProtocolError(message)
[docs]
def decode_repo_info(payload) -> RepoInfo:
"""
Decode repository metadata from JSON.
:param payload: Raw repository payload
:type payload: object
:return: Decoded repository metadata
:rtype: RepoInfo
"""
data = _require_dict(payload, "repo_info")
return RepoInfo(
repo_path=str(data["repo_path"]),
format_version=int(data["format_version"]),
default_branch=str(data["default_branch"]),
head=None if data.get("head") is None else str(data["head"]),
refs=[str(item) for item in data.get("refs", [])],
)
[docs]
def decode_commit_info(payload) -> CommitInfo:
"""
Decode one write-commit result from JSON.
:param payload: Raw commit payload
:type payload: object
:return: Decoded commit metadata
:rtype: CommitInfo
"""
data = _require_dict(payload, "commit info")
return CommitInfo(
commit_url=str(data["commit_url"]),
commit_message=str(data["commit_message"]),
commit_description=str(data.get("commit_description", "")),
oid=str(data["oid"]),
pr_url=None if data.get("pr_url") is None else str(data.get("pr_url")),
_url=None if data.get("_url") is None else str(data.get("_url")),
)
def _decode_commit_file_version_info(payload):
"""
Decode one optional commit-diff file-side payload.
:param payload: Raw file-side payload
:type payload: object
:return: Decoded file-side metadata or ``None``
:rtype: Optional[CommitFileVersionInfo]
"""
if payload is None:
return None
data = _require_dict(payload, "commit file version")
return CommitFileVersionInfo(
path=str(data["path"]),
size=int(data["size"]),
oid=str(data["oid"]),
blob_id=str(data["blob_id"]),
sha256=str(data["sha256"]),
)
[docs]
def decode_commit_change_info(payload) -> CommitChangeInfo:
"""
Decode one file-level commit change payload.
:param payload: Raw commit change payload
:type payload: object
:return: Decoded commit change metadata
:rtype: CommitChangeInfo
"""
data = _require_dict(payload, "commit change info")
return CommitChangeInfo(
path=str(data["path"]),
change_type=str(data["change_type"]),
old_file=_decode_commit_file_version_info(data.get("old_file")),
new_file=_decode_commit_file_version_info(data.get("new_file")),
is_binary=bool(data.get("is_binary")),
unified_diff=None if data.get("unified_diff") is None else str(data.get("unified_diff")),
)
[docs]
def decode_repo_entry(payload):
"""
Decode one repository file or folder entry.
:param payload: Raw repository entry payload
:type payload: object
:return: Decoded file or folder entry
:rtype: Union[RepoFile, RepoFolder]
:raises HubVaultRemoteProtocolError: Raised when the entry type is unsupported.
"""
data = _require_dict(payload, "repo entry")
entry_type = str(data.get("entry_type", ""))
if entry_type == "file":
return RepoFile(
path=str(data["path"]),
size=int(data["size"]),
blob_id=str(data["blob_id"]),
lfs=_decode_blob_lfs_info(data.get("lfs")),
last_commit=_decode_last_commit_info(data.get("last_commit")),
security=_decode_blob_security_info(data.get("security")),
oid=None if data.get("oid") is None else str(data.get("oid")),
sha256=None if data.get("sha256") is None else str(data.get("sha256")),
etag=None if data.get("etag") is None else str(data.get("etag")),
)
if entry_type == "folder":
return RepoFolder(
path=str(data["path"]),
tree_id=str(data["tree_id"]),
last_commit=_decode_last_commit_info(data.get("last_commit")),
)
raise HubVaultRemoteProtocolError("Unsupported repo entry type: %r." % (entry_type,))
[docs]
def decode_repo_entries(payload) -> list:
"""
Decode repository file and folder entries from JSON.
:param payload: Raw repository entries payload
:type payload: object
:return: Decoded repository entries
:rtype: list
"""
if not isinstance(payload, list):
raise HubVaultRemoteProtocolError("Repository entries must be a JSON array.")
return [decode_repo_entry(item) for item in payload]
[docs]
def decode_git_commit_info(payload) -> GitCommitInfo:
"""
Decode one commit-list entry from JSON.
:param payload: Raw commit-list payload
:type payload: object
:return: Decoded commit-list entry
:rtype: GitCommitInfo
"""
data = _require_dict(payload, "git commit info")
return GitCommitInfo(
commit_id=str(data["commit_id"]),
authors=[str(item) for item in data.get("authors", [])],
created_at=_parse_datetime(data["created_at"], "created_at"),
title=str(data["title"]),
message=str(data["message"]),
formatted_title=None if data.get("formatted_title") is None else str(data["formatted_title"]),
formatted_message=None if data.get("formatted_message") is None else str(data["formatted_message"]),
)
[docs]
def decode_git_commit_list(payload) -> list:
"""
Decode commit-list entries from JSON.
:param payload: Raw commit-list payload
:type payload: object
:return: Decoded commit-list entries
:rtype: list
"""
if not isinstance(payload, list):
raise HubVaultRemoteProtocolError("Commit list must be a JSON array.")
return [decode_git_commit_info(item) for item in payload]
[docs]
def decode_commit_detail_info(payload) -> CommitDetailInfo:
"""
Decode one commit-detail payload.
:param payload: Raw commit-detail payload
:type payload: object
:return: Decoded commit detail metadata
:rtype: CommitDetailInfo
"""
data = _require_dict(payload, "commit detail info")
parent_commit_ids = data.get("parent_commit_ids")
changes = data.get("changes")
if not isinstance(parent_commit_ids, list):
raise HubVaultRemoteProtocolError("parent_commit_ids must be a JSON array.")
if not isinstance(changes, list):
raise HubVaultRemoteProtocolError("changes must be a JSON array.")
return CommitDetailInfo(
commit=decode_git_commit_info(data["commit"]),
parent_commit_ids=[str(item) for item in parent_commit_ids],
compare_parent_commit_id=None
if data.get("compare_parent_commit_id") is None
else str(data.get("compare_parent_commit_id")),
changes=[decode_commit_change_info(item) for item in changes],
)
[docs]
def decode_git_ref_info(payload) -> GitRefInfo:
"""
Decode one git reference entry from JSON.
:param payload: Raw ref payload
:type payload: object
:return: Decoded git ref entry
:rtype: GitRefInfo
"""
data = _require_dict(payload, "git ref info")
return GitRefInfo(
name=str(data["name"]),
ref=str(data["ref"]),
target_commit=None if data.get("target_commit") is None else str(data.get("target_commit")),
)
[docs]
def decode_git_refs(payload) -> GitRefs:
"""
Decode branch and tag refs from JSON.
:param payload: Raw refs payload
:type payload: object
:return: Decoded refs collection
:rtype: GitRefs
"""
data = _require_dict(payload, "git refs")
pull_requests = data.get("pull_requests")
return GitRefs(
branches=[decode_git_ref_info(item) for item in data.get("branches", [])],
converts=[decode_git_ref_info(item) for item in data.get("converts", [])],
tags=[decode_git_ref_info(item) for item in data.get("tags", [])],
pull_requests=None if pull_requests is None else [decode_git_ref_info(item) for item in pull_requests],
)
[docs]
def decode_reflog_entry(payload) -> ReflogEntry:
"""
Decode one reflog entry from JSON.
:param payload: Raw reflog payload
:type payload: object
:return: Decoded reflog entry
:rtype: ReflogEntry
"""
data = _require_dict(payload, "reflog entry")
return ReflogEntry(
timestamp=_parse_datetime(data["timestamp"], "timestamp"),
ref_name=str(data["ref_name"]),
old_head=None if data.get("old_head") is None else str(data.get("old_head")),
new_head=None if data.get("new_head") is None else str(data.get("new_head")),
message=str(data["message"]),
checksum=str(data["checksum"]),
)
[docs]
def decode_reflog_entries(payload) -> list:
"""
Decode reflog entries from JSON.
:param payload: Raw reflog payload
:type payload: object
:return: Decoded reflog entries
:rtype: list
"""
if not isinstance(payload, list):
raise HubVaultRemoteProtocolError("Reflog entries must be a JSON array.")
return [decode_reflog_entry(item) for item in payload]
[docs]
def decode_merge_conflict(payload) -> MergeConflict:
"""
Decode one merge conflict from JSON.
:param payload: Raw conflict payload
:type payload: object
:return: Decoded merge conflict
:rtype: MergeConflict
"""
data = _require_dict(payload, "merge conflict")
return MergeConflict(
path=str(data["path"]),
conflict_type=str(data["conflict_type"]),
message=str(data["message"]),
base_oid=None if data.get("base_oid") is None else str(data.get("base_oid")),
target_oid=None if data.get("target_oid") is None else str(data.get("target_oid")),
source_oid=None if data.get("source_oid") is None else str(data.get("source_oid")),
related_path=None if data.get("related_path") is None else str(data.get("related_path")),
)
[docs]
def decode_merge_result(payload) -> MergeResult:
"""
Decode one structured merge result from JSON.
:param payload: Raw merge payload
:type payload: object
:return: Decoded merge result
:rtype: MergeResult
"""
data = _require_dict(payload, "merge result")
return MergeResult(
status=str(data["status"]),
target_revision=str(data["target_revision"]),
source_revision=str(data["source_revision"]),
base_commit=None if data.get("base_commit") is None else str(data.get("base_commit")),
target_head_before=None
if data.get("target_head_before") is None
else str(data.get("target_head_before")),
source_head=None if data.get("source_head") is None else str(data.get("source_head")),
head_after=None if data.get("head_after") is None else str(data.get("head_after")),
commit=None if data.get("commit") is None else decode_commit_info(data.get("commit")),
conflicts=[decode_merge_conflict(item) for item in data.get("conflicts", [])],
fast_forward=bool(data.get("fast_forward")),
created_commit=bool(data.get("created_commit")),
)
[docs]
def decode_verify_report(payload) -> VerifyReport:
"""
Decode one verification report from JSON.
:param payload: Raw verification payload
:type payload: object
:return: Decoded verification report
:rtype: VerifyReport
"""
data = _require_dict(payload, "verify report")
return VerifyReport(
ok=bool(data["ok"]),
checked_refs=[str(item) for item in data.get("checked_refs", [])],
warnings=[str(item) for item in data.get("warnings", [])],
errors=[str(item) for item in data.get("errors", [])],
)
[docs]
def decode_storage_section_info(payload) -> StorageSectionInfo:
"""
Decode one storage-section entry from JSON.
:param payload: Raw storage-section payload
:type payload: object
:return: Decoded storage-section entry
:rtype: StorageSectionInfo
"""
data = _require_dict(payload, "storage section")
return StorageSectionInfo(
name=str(data["name"]),
path=str(data["path"]),
total_size=int(data["total_size"]),
file_count=int(data["file_count"]),
reclaimable_size=int(data["reclaimable_size"]),
reclaim_strategy=str(data["reclaim_strategy"]),
notes=str(data["notes"]),
)
[docs]
def decode_storage_overview(payload) -> StorageOverview:
"""
Decode one storage-overview report from JSON.
:param payload: Raw storage-overview payload
:type payload: object
:return: Decoded storage overview
:rtype: StorageOverview
"""
data = _require_dict(payload, "storage overview")
return StorageOverview(
total_size=int(data["total_size"]),
reachable_size=int(data["reachable_size"]),
historical_retained_size=int(data["historical_retained_size"]),
reclaimable_gc_size=int(data["reclaimable_gc_size"]),
reclaimable_cache_size=int(data["reclaimable_cache_size"]),
reclaimable_temporary_size=int(data["reclaimable_temporary_size"]),
sections=[decode_storage_section_info(item) for item in data.get("sections", [])],
recommendations=[str(item) for item in data.get("recommendations", [])],
)
[docs]
def decode_gc_report(payload) -> GcReport:
"""
Decode one GC report from JSON.
:param payload: Raw GC payload
:type payload: object
:return: Decoded GC report
:rtype: GcReport
"""
data = _require_dict(payload, "gc report")
return GcReport(
dry_run=bool(data["dry_run"]),
checked_refs=[str(item) for item in data.get("checked_refs", [])],
reclaimed_size=int(data["reclaimed_size"]),
reclaimed_object_size=int(data["reclaimed_object_size"]),
reclaimed_chunk_size=int(data["reclaimed_chunk_size"]),
reclaimed_cache_size=int(data["reclaimed_cache_size"]),
reclaimed_temporary_size=int(data["reclaimed_temporary_size"]),
removed_file_count=int(data["removed_file_count"]),
notes=[str(item) for item in data.get("notes", [])],
)
[docs]
def decode_squash_report(payload) -> SquashReport:
"""
Decode one history-squash report from JSON.
:param payload: Raw squash payload
:type payload: object
:return: Decoded squash report
:rtype: SquashReport
"""
data = _require_dict(payload, "squash report")
return SquashReport(
ref_name=str(data["ref_name"]),
old_head=str(data["old_head"]),
new_head=str(data["new_head"]),
root_commit_before=str(data["root_commit_before"]),
rewritten_commit_count=int(data["rewritten_commit_count"]),
dropped_ancestor_count=int(data["dropped_ancestor_count"]),
blocking_refs=[str(item) for item in data.get("blocking_refs", [])],
gc_report=None if data.get("gc_report") is None else decode_gc_report(data.get("gc_report")),
)
[docs]
def decode_snapshot_plan(payload) -> dict:
"""
Decode a snapshot-plan manifest from JSON.
:param payload: Raw snapshot-plan payload
:type payload: object
:return: Normalized snapshot-plan manifest
:rtype: dict
"""
data = _require_dict(payload, "snapshot plan")
files = data.get("files", [])
if not isinstance(files, list):
raise HubVaultRemoteProtocolError("Snapshot plan files must be a JSON array.")
normalized_files = []
for item in files:
file_data = _require_dict(item, "snapshot plan file")
normalized_files.append(
{
"path": str(file_data["path"]),
"size": int(file_data["size"]),
"blob_id": str(file_data["blob_id"]),
"oid": None if file_data.get("oid") is None else str(file_data.get("oid")),
"sha256": None if file_data.get("sha256") is None else str(file_data.get("sha256")),
"etag": None if file_data.get("etag") is None else str(file_data.get("etag")),
"download_url": str(file_data["download_url"]),
}
)
return {
"revision": str(data["revision"]),
"resolved_revision": str(data["resolved_revision"]),
"head": None if data.get("head") is None else str(data.get("head")),
"allow_patterns": [str(item) for item in data.get("allow_patterns", [])],
"ignore_patterns": [str(item) for item in data.get("ignore_patterns", [])],
"files": normalized_files,
}