Skip to content

Commit

Permalink
Zarr-v3 Consolidated Metadata
Browse files Browse the repository at this point in the history
Implements the optional Consolidated Metadata feature of zarr-v3.
  • Loading branch information
TomAugspurger committed Aug 25, 2024
1 parent 8ee89f4 commit 65a8bd4
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 36 deletions.
51 changes: 45 additions & 6 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import dataclasses
import warnings
from collections.abc import Iterable
from typing import Any, Literal, Union, cast
Expand All @@ -12,8 +13,14 @@
from zarr.core.array import Array, AsyncArray
from zarr.core.buffer import NDArrayLike
from zarr.core.chunk_key_encodings import ChunkKeyEncoding
from zarr.core.common import JSON, AccessModeLiteral, ChunkCoords, MemoryOrder, ZarrFormat
from zarr.core.group import AsyncGroup
from zarr.core.common import (
JSON,
AccessModeLiteral,
ChunkCoords,
MemoryOrder,
ZarrFormat,
)
from zarr.core.group import AsyncGroup, ConsolidatedMetadata
from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata
from zarr.store import (
StoreLike,
Expand Down Expand Up @@ -126,8 +133,38 @@ def _default_zarr_version() -> ZarrFormat:
return 3


async def consolidate_metadata(*args: Any, **kwargs: Any) -> AsyncGroup:
raise NotImplementedError
async def consolidate_metadata(store: StoreLike) -> AsyncGroup:
"""
Consolidate the metadata of all nodes in a hierarchy.
Upon completion, the metadata of the root node in the Zarr hierarchy will be
updated to include all the metadata of child nodes.
Parameters
----------
store: StoreLike
The store-like object whose metadata you wish to consolidate.
Returns
-------
group: AsyncGroup
The group, with the ``consolidated_metadata`` field set to include
the metadata of each child node.
"""
group = await AsyncGroup.open(store)
members = dict([x async for x in group.members(recursive=True)])
members_metadata = {}

members_metadata = {k: v.metadata for k, v in members.items()}

consolidated_metadata = ConsolidatedMetadata(metadata=members_metadata)
metadata = dataclasses.replace(group.metadata, consolidated_metadata=consolidated_metadata)
group = dataclasses.replace(
group,
metadata=metadata,
)
await group._save_metadata()
return group


async def copy(*args: Any, **kwargs: Any) -> tuple[int, int, int]:
Expand Down Expand Up @@ -229,7 +266,7 @@ async def open(


async def open_consolidated(*args: Any, **kwargs: Any) -> AsyncGroup:
raise NotImplementedError
return await open_group(*args, **kwargs)


async def save(
Expand Down Expand Up @@ -703,7 +740,9 @@ async def create(
)
else:
warnings.warn(
"dimension_separator is not yet implemented", RuntimeWarning, stacklevel=2
"dimension_separator is not yet implemented",
RuntimeWarning,
stacklevel=2,
)
if write_empty_chunks:
warnings.warn("write_empty_chunks is not yet implemented", RuntimeWarning, stacklevel=2)
Expand Down
7 changes: 4 additions & 3 deletions src/zarr/codecs/crc32c_.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import numpy as np
import typing_extensions
from crc32c import crc32c

from zarr.abc.codec import BytesBytesCodec
Expand Down Expand Up @@ -37,7 +38,7 @@ async def _decode_single(
crc32_bytes = data[-4:]
inner_bytes = data[:-4]

computed_checksum = np.uint32(crc32c(inner_bytes)).tobytes()
computed_checksum = np.uint32(crc32c(cast(typing_extensions.Buffer, inner_bytes))).tobytes()
stored_checksum = bytes(crc32_bytes)
if computed_checksum != stored_checksum:
raise ValueError(
Expand All @@ -52,7 +53,7 @@ async def _encode_single(
) -> Buffer | None:
data = chunk_bytes.as_numpy_array()
# Calculate the checksum and "cast" it to a numpy array
checksum = np.array([crc32c(data)], dtype=np.uint32)
checksum = np.array([crc32c(cast(typing_extensions.Buffer, data))], dtype=np.uint32)
# Append the checksum (as bytes) to the data
return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("b")))

Expand Down
8 changes: 7 additions & 1 deletion src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,15 @@ async def open(
else:
# V3 arrays are comprised of a zarr.json object
assert zarr_json_bytes is not None
zarr_metadata = json.loads(zarr_json_bytes.to_bytes())
if zarr_metadata.get("node_type") != "array":
# This KeyError is load bearing for `open`. That currently tries
# to open the node as an `array` and then falls back to opening
# as a group.
raise KeyError
return cls(
store_path=store_path,
metadata=ArrayV3Metadata.from_dict(json.loads(zarr_json_bytes.to_bytes())),
metadata=ArrayV3Metadata.from_dict(zarr_metadata),
)

@property
Expand Down
23 changes: 23 additions & 0 deletions src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
overload,
)

import numcodecs

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterator

Expand Down Expand Up @@ -167,3 +169,24 @@ def parse_order(data: Any) -> Literal["C", "F"]:
if data in ("C", "F"):
return cast(Literal["C", "F"], data)
raise ValueError(f"Expected one of ('C', 'F'), got {data} instead.")


def _json_convert(o: Any) -> Any:
if isinstance(o, np.dtype):
return str(o)
if np.isscalar(o):
# convert numpy scalar to python type, and pass
# python types through
out = getattr(o, "item", lambda: o)()
if isinstance(out, complex):
# python complex types are not JSON serializable, so we use the
# serialization defined in the zarr v3 spec
return [out.real, out.imag]
return out
if isinstance(o, Enum):
return o.name
# this serializes numcodecs compressors
# todo: implement to_dict for codecs
elif isinstance(o, numcodecs.abc.Codec):
config: dict[str, Any] = o.get_config()
return config
71 changes: 67 additions & 4 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
ZGROUP_JSON,
ChunkCoords,
ZarrFormat,
_json_convert,
)
from zarr.core.config import config
from zarr.core.metadata import ArrayMetadata, ArrayV3Metadata
from zarr.core.sync import SyncMixin, sync
from zarr.store import StoreLike, StorePath, make_store_path
from zarr.store.common import ensure_no_existing_node
Expand Down Expand Up @@ -77,18 +79,65 @@ def _parse_async_node(node: AsyncArray | AsyncGroup) -> Array | Group:
raise TypeError(f"Unknown node type, got {type(node)}")


@dataclass(frozen=True)
class ConsolidatedMetadata:
metadata: dict[str, ArrayMetadata | GroupMetadata]
kind: Literal["inline"] = "inline"
must_understand: Literal[False] = False

def to_dict(self) -> dict[str, JSON]:
return {
"kind": self.kind,
"must_understand": self.must_understand,
"metadata": {k: v.to_dict() for k, v in self.metadata.items()},
}

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> ConsolidatedMetadata:
data = dict(data)
raw_metadata = data.get("metadata")
if not isinstance(raw_metadata, dict):
raise TypeError("Unexpected type for 'metadata'")

elif not raw_metadata:
raise ValueError("Must specify metadata")

metadata: dict[str, ArrayMetadata | GroupMetadata]
if raw_metadata:
metadata = {}
for k, v in raw_metadata.items():
if not isinstance(v, dict):
raise TypeError(f"Invalid value for metadata items. key={k}, type={type(v)}")

node_type = v.get("node_type", None)
if node_type == "group":
metadata[k] = GroupMetadata.from_dict(v)
elif node_type == "array":
metadata[k] = ArrayV3Metadata.from_dict(v)
else:
raise ValueError(f"Invalid node_type: '{node_type}'")
# assert data["kind"] == "inline"
if data["kind"] != "inline":
raise ValueError

if data["must_understand"] is not False:
raise ValueError
return cls(metadata=metadata)


@dataclass(frozen=True)
class GroupMetadata(Metadata):
attributes: dict[str, Any] = field(default_factory=dict)
zarr_format: ZarrFormat = 3
consolidated_metadata: ConsolidatedMetadata | None = None
node_type: Literal["group"] = field(default="group", init=False)

def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
json_indent = config.get("json_indent")
if self.zarr_format == 3:
return {
ZARR_JSON: prototype.buffer.from_bytes(
json.dumps(self.to_dict(), indent=json_indent).encode()
json.dumps(self.to_dict(), default=_json_convert, indent=json_indent).encode()
)
}
else:
Expand All @@ -101,20 +150,33 @@ def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
),
}

def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: ZarrFormat = 3):
def __init__(
self,
attributes: dict[str, Any] | None = None,
zarr_format: ZarrFormat = 3,
consolidated_metadata: ConsolidatedMetadata | None = None,
):
attributes_parsed = parse_attributes(attributes)
zarr_format_parsed = parse_zarr_format(zarr_format)

object.__setattr__(self, "attributes", attributes_parsed)
object.__setattr__(self, "zarr_format", zarr_format_parsed)
object.__setattr__(self, "consolidated_metadata", consolidated_metadata)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> GroupMetadata:
data = dict(data)
assert data.pop("node_type", None) in ("group", None)
consolidated_metadata = data.pop("consolidated_metadata", None)
if consolidated_metadata:
data["consolidated_metadata"] = ConsolidatedMetadata.from_dict(consolidated_metadata)
return cls(**data)

def to_dict(self) -> dict[str, Any]:
return asdict(self)
result = asdict(replace(self, consolidated_metadata=None))
if self.consolidated_metadata:
result["consolidated_metadata"] = self.consolidated_metadata.to_dict()
return result


@dataclass(frozen=True)
Expand Down Expand Up @@ -497,7 +559,8 @@ async def members(
# as opposed to a prefix, in the store under the prefix associated with this group
# in which case `key` cannot be the name of a sub-array or sub-group.
logger.warning(
"Object at %s is not recognized as a component of a Zarr hierarchy.", key
"Object at %s is not recognized as a component of a Zarr hierarchy.",
key,
)

async def contains(self, member: str) -> bool:
Expand Down
23 changes: 1 addition & 22 deletions src/zarr/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
if TYPE_CHECKING:
from typing_extensions import Self

import numcodecs.abc

from zarr.core.array_spec import ArraySpec
from zarr.core.common import (
Expand All @@ -30,6 +29,7 @@
ZATTRS_JSON,
ChunkCoords,
ZarrFormat,
_json_convert,
parse_dtype,
parse_named_configuration,
parse_shapelike,
Expand Down Expand Up @@ -252,27 +252,6 @@ def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str:
return self.chunk_key_encoding.encode_chunk_key(chunk_coords)

def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
def _json_convert(o: Any) -> Any:
if isinstance(o, np.dtype):
return str(o)
if np.isscalar(o):
# convert numpy scalar to python type, and pass
# python types through
out = getattr(o, "item", lambda: o)()
if isinstance(out, complex):
# python complex types are not JSON serializable, so we use the
# serialization defined in the zarr v3 spec
return [out.real, out.imag]
return out
if isinstance(o, Enum):
return o.name
# this serializes numcodecs compressors
# todo: implement to_dict for codecs
elif isinstance(o, numcodecs.abc.Codec):
config: dict[str, Any] = o.get_config()
return config
raise TypeError

json_indent = config.get("json_indent")
return {
ZARR_JSON: prototype.buffer.from_bytes(
Expand Down
Loading

0 comments on commit 65a8bd4

Please sign in to comment.