"""
Chunk planning helpers for :mod:`hubvault.storage`.
This module contains the chunk-planning logic used by the local repository
backend when a file is large enough to switch from whole-blob storage to
chunked pack storage. Phase 10 upgrades this path from fixed-size splitting to
FastCDC content-defined chunking and uses ``blake3`` as the planner's fast
digest.
The module contains:
* :class:`ChunkDescriptor` - Logical metadata for one chunk in a file
* :class:`ChunkPart` - Chunk descriptor paired with chunk payload bytes
* :class:`ChunkPlan` - Full chunk plan and LFS-style public metadata for a file
* :class:`ChunkStore` - Planner that splits bytes into content-defined chunks
* :func:`canonical_lfs_pointer` - Build canonical Git LFS pointer bytes
* :func:`git_blob_oid` - Compute a Git-compatible blob OID
Example::
>>> store = ChunkStore(chunk_size=256, min_chunk_size=64, max_chunk_size=1024)
>>> plan = store.plan_bytes(b"abcdefgh")
>>> sum(chunk.logical_size for chunk in plan.chunks)
8
>>> plan.etag == plan.sha256
True
"""
from blake3 import blake3
from dataclasses import dataclass
from fastcdc import fastcdc
from hashlib import sha1, sha256
from typing import Optional, Tuple
OBJECT_HASH = "sha256"
DEFAULT_CHUNK_SIZE = 4 * 1024 * 1024
DEFAULT_MIN_CHUNK_SIZE = max(64, DEFAULT_CHUNK_SIZE // 4)
DEFAULT_MAX_CHUNK_SIZE = DEFAULT_CHUNK_SIZE * 4
FASTCDC_MIN_CHUNK_SIZE = 64
FASTCDC_MIN_AVG_SIZE = 256
FASTCDC_MIN_MAX_SIZE = 1024
[docs]
def sha256_hex(data: bytes) -> str:
"""
Compute a lowercase hexadecimal SHA-256 digest.
:param data: Input bytes
:type data: bytes
:return: Lowercase hexadecimal digest
:rtype: str
Example::
>>> sha256_hex(b"abc")
'ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad'
"""
return sha256(data).hexdigest()
[docs]
def git_blob_oid(data: bytes) -> str:
"""
Compute a Git-compatible blob OID for bytes.
:param data: Blob payload bytes
:type data: bytes
:return: Git SHA-1 blob OID without a prefix
:rtype: str
Example::
>>> len(git_blob_oid(b"abc"))
40
"""
header = ("blob %d\0" % len(data)).encode("utf-8")
return sha1(header + data).hexdigest()
[docs]
def canonical_lfs_pointer(file_sha256: str, size: int) -> bytes:
"""
Build canonical Git LFS pointer bytes for a file.
:param file_sha256: Raw hexadecimal SHA-256 digest of the logical file
:type file_sha256: str
:param size: Logical file size in bytes
:type size: int
:return: Canonical pointer payload bytes
:rtype: bytes
Example::
>>> canonical_lfs_pointer("a" * 64, 5).startswith(b"version https://git-lfs.github.com/spec/v1\\n")
True
"""
return (
"version https://git-lfs.github.com/spec/v1\n"
"oid sha256:%s\n"
"size %d\n"
).encode("utf-8") % (file_sha256.encode("utf-8"), size)
[docs]
@dataclass(frozen=True)
class ChunkDescriptor:
"""
Describe one logical chunk inside a larger file.
:param chunk_id: Internal chunk identifier with an explicit hash prefix
:type chunk_id: str
:param checksum: Integrity checksum for the logical chunk payload
:type checksum: str
:param logical_offset: Starting byte offset within the logical file
:type logical_offset: int
:param logical_size: Logical chunk size in bytes
:type logical_size: int
:param stored_size: Stored chunk size in bytes
:type stored_size: int
:param compression: Storage compression label, defaults to ``"none"``
:type compression: str, optional
Example::
>>> descriptor = ChunkDescriptor("sha256:" + "a" * 64, "sha256:" + "a" * 64, 0, 4, 4)
>>> descriptor.logical_offset
0
"""
chunk_id: str
checksum: str
logical_offset: int
logical_size: int
stored_size: int
compression: str = "none"
[docs]
@dataclass(frozen=True)
class ChunkPart:
"""
Pair a chunk descriptor with its payload bytes.
:param descriptor: Logical chunk metadata
:type descriptor: ChunkDescriptor
:param data: Chunk payload bytes
:type data: bytes
Example::
>>> part = ChunkPart(ChunkDescriptor("sha256:" + "a" * 64, "sha256:" + "a" * 64, 0, 4, 4), b"data")
>>> part.data
b'data'
"""
descriptor: ChunkDescriptor
data: bytes
[docs]
@dataclass(frozen=True)
class ChunkPlan:
"""
Describe the chunked storage plan for one logical file.
:param logical_size: Logical file size in bytes
:type logical_size: int
:param sha256: Raw hexadecimal SHA-256 digest of the logical file
:type sha256: str
:param oid: Git blob OID of the canonical LFS pointer
:type oid: str
:param etag: Public ETag value, aligned with the file SHA-256 for LFS mode
:type etag: str
:param pointer_size: Size of the canonical LFS pointer in bytes
:type pointer_size: int
:param chunks: Ordered logical chunk descriptors
:type chunks: Tuple[ChunkDescriptor, ...]
:param parts: Ordered chunk payloads paired with descriptors
:type parts: Tuple[ChunkPart, ...]
Example::
>>> store = ChunkStore(chunk_size=256, min_chunk_size=64, max_chunk_size=1024)
>>> plan = store.plan_bytes(b"abcdef")
>>> plan.pointer_size > 0
True
"""
logical_size: int
sha256: str
oid: str
etag: str
pointer_size: int
chunks: Tuple[ChunkDescriptor, ...]
parts: Tuple[ChunkPart, ...]
[docs]
class ChunkStore:
"""
Build deterministic chunk plans for large file payloads.
:param chunk_size: Target average chunk size in bytes, defaults to
:data:`DEFAULT_CHUNK_SIZE`
:type chunk_size: int, optional
:param min_chunk_size: Optional minimum chunk size, defaults to
:data:`DEFAULT_MIN_CHUNK_SIZE`
:type min_chunk_size: Optional[int], optional
:param max_chunk_size: Optional maximum chunk size, defaults to
:data:`DEFAULT_MAX_CHUNK_SIZE`
:type max_chunk_size: Optional[int], optional
:raises ValueError: Raised when the chunk-size settings are invalid.
Example::
>>> store = ChunkStore(chunk_size=256, min_chunk_size=64, max_chunk_size=1024)
>>> store.chunk_size
256
"""
[docs]
def __init__(
self,
chunk_size: int = DEFAULT_CHUNK_SIZE,
min_chunk_size: Optional[int] = None,
max_chunk_size: Optional[int] = None,
) -> None:
"""
Initialize the chunk planner.
:param chunk_size: Target average chunk size in bytes, defaults to
:data:`DEFAULT_CHUNK_SIZE`
:type chunk_size: int, optional
:param min_chunk_size: Optional minimum chunk size, defaults to
:data:`DEFAULT_MIN_CHUNK_SIZE`
:type min_chunk_size: Optional[int], optional
:param max_chunk_size: Optional maximum chunk size, defaults to
:data:`DEFAULT_MAX_CHUNK_SIZE`
:type max_chunk_size: Optional[int], optional
:return: ``None``.
:rtype: None
:raises ValueError: Raised when the chunk-size settings are invalid.
Example::
>>> ChunkStore(chunk_size=256, min_chunk_size=64, max_chunk_size=1024).chunk_size
256
"""
chunk_size = int(chunk_size)
if chunk_size < FASTCDC_MIN_AVG_SIZE:
raise ValueError("chunk_size must be >= %d" % FASTCDC_MIN_AVG_SIZE)
self.chunk_size = chunk_size
self.min_chunk_size = int(min_chunk_size) if min_chunk_size is not None else max(64, chunk_size // 4)
self.max_chunk_size = int(max_chunk_size) if max_chunk_size is not None else max(chunk_size, chunk_size * 4)
if self.min_chunk_size < FASTCDC_MIN_CHUNK_SIZE:
raise ValueError("min_chunk_size must be >= %d" % FASTCDC_MIN_CHUNK_SIZE)
if self.max_chunk_size < FASTCDC_MIN_MAX_SIZE:
raise ValueError("max_chunk_size must be >= %d" % FASTCDC_MIN_MAX_SIZE)
if self.max_chunk_size < self.chunk_size:
raise ValueError("max_chunk_size must be >= chunk_size")
if self.min_chunk_size > self.chunk_size:
raise ValueError("min_chunk_size must be <= chunk_size")
self.algorithm = "fastcdc"
[docs]
def plan_bytes(self, data: bytes) -> ChunkPlan:
"""
Split bytes into content-defined chunks and compute public metadata.
:param data: Logical file payload bytes
:type data: bytes
:return: Chunk plan with chunk descriptors and canonical LFS metadata
:rtype: ChunkPlan
:raises ValueError: Raised when ``data`` is not byte-like.
Example::
>>> store = ChunkStore(chunk_size=256, min_chunk_size=64, max_chunk_size=1024)
>>> plan = store.plan_bytes(b"abcdefgh")
>>> sum(len(part.data) for part in plan.parts)
8
"""
if not isinstance(data, (bytes, bytearray)):
raise ValueError("data must be bytes or bytearray")
payload = bytes(data)
file_sha256 = sha256_hex(payload)
pointer = canonical_lfs_pointer(file_sha256, len(payload))
parts = []
chunk_digest_cache = {}
for boundary in fastcdc(
payload,
min_size=self.min_chunk_size,
avg_size=self.chunk_size,
max_size=self.max_chunk_size,
hf=blake3,
):
chunk = payload[int(boundary.offset):int(boundary.offset) + int(boundary.length)]
cache_key = (str(boundary.hash) or None, len(chunk))
chunk_digest = None
cached_items = chunk_digest_cache.get(cache_key, tuple())
for cached_chunk, cached_digest in cached_items:
if cached_chunk == chunk:
chunk_digest = cached_digest
break
if chunk_digest is None:
chunk_digest = sha256_hex(chunk)
chunk_digest_cache.setdefault(cache_key, []).append((chunk, chunk_digest))
descriptor = ChunkDescriptor(
chunk_id=OBJECT_HASH + ":" + chunk_digest,
checksum=OBJECT_HASH + ":" + chunk_digest,
logical_offset=int(boundary.offset),
logical_size=len(chunk),
stored_size=len(chunk),
compression="none",
)
parts.append(ChunkPart(descriptor=descriptor, data=chunk))
return ChunkPlan(
logical_size=len(payload),
sha256=file_sha256,
oid=git_blob_oid(pointer),
etag=file_sha256,
pointer_size=len(pointer),
chunks=tuple(part.descriptor for part in parts),
parts=tuple(parts),
)