From 65a8bd4253fbca1b809b4e97701fecdc86d1eaac Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 22 Aug 2024 21:06:20 -0500 Subject: [PATCH] Zarr-v3 Consolidated Metadata Implements the optional Consolidated Metadata feature of zarr-v3. --- src/zarr/api/asynchronous.py | 51 +++++++- src/zarr/codecs/crc32c_.py | 7 +- src/zarr/core/array.py | 8 +- src/zarr/core/common.py | 23 ++++ src/zarr/core/group.py | 71 ++++++++++- src/zarr/core/metadata.py | 23 +--- tests/v3/test_metadata.py | 227 +++++++++++++++++++++++++++++++++++ 7 files changed, 374 insertions(+), 36 deletions(-) diff --git a/src/zarr/api/asynchronous.py b/src/zarr/api/asynchronous.py index ad89584b44..0dd2e76f02 100644 --- a/src/zarr/api/asynchronous.py +++ b/src/zarr/api/asynchronous.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import dataclasses import warnings from collections.abc import Iterable from typing import Any, Literal, Union, cast @@ -12,8 +13,14 @@ from zarr.core.array import Array, AsyncArray from zarr.core.buffer import NDArrayLike from zarr.core.chunk_key_encodings import ChunkKeyEncoding -from zarr.core.common import JSON, AccessModeLiteral, ChunkCoords, MemoryOrder, ZarrFormat -from zarr.core.group import AsyncGroup +from zarr.core.common import ( + JSON, + AccessModeLiteral, + ChunkCoords, + MemoryOrder, + ZarrFormat, +) +from zarr.core.group import AsyncGroup, ConsolidatedMetadata from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata from zarr.store import ( StoreLike, @@ -126,8 +133,38 @@ def _default_zarr_version() -> ZarrFormat: return 3 -async def consolidate_metadata(*args: Any, **kwargs: Any) -> AsyncGroup: - raise NotImplementedError +async def consolidate_metadata(store: StoreLike) -> AsyncGroup: + """ + Consolidate the metadata of all nodes in a hierarchy. + + Upon completion, the metadata of the root node in the Zarr hierarchy will be + updated to include all the metadata of child nodes. + + Parameters + ---------- + store: StoreLike + The store-like object whose metadata you wish to consolidate. + + Returns + ------- + group: AsyncGroup + The group, with the ``consolidated_metadata`` field set to include + the metadata of each child node. + """ + group = await AsyncGroup.open(store) + members = dict([x async for x in group.members(recursive=True)]) + members_metadata = {} + + members_metadata = {k: v.metadata for k, v in members.items()} + + consolidated_metadata = ConsolidatedMetadata(metadata=members_metadata) + metadata = dataclasses.replace(group.metadata, consolidated_metadata=consolidated_metadata) + group = dataclasses.replace( + group, + metadata=metadata, + ) + await group._save_metadata() + return group async def copy(*args: Any, **kwargs: Any) -> tuple[int, int, int]: @@ -229,7 +266,7 @@ async def open( async def open_consolidated(*args: Any, **kwargs: Any) -> AsyncGroup: - raise NotImplementedError + return await open_group(*args, **kwargs) async def save( @@ -703,7 +740,9 @@ async def create( ) else: warnings.warn( - "dimension_separator is not yet implemented", RuntimeWarning, stacklevel=2 + "dimension_separator is not yet implemented", + RuntimeWarning, + stacklevel=2, ) if write_empty_chunks: warnings.warn("write_empty_chunks is not yet implemented", RuntimeWarning, stacklevel=2) diff --git a/src/zarr/codecs/crc32c_.py b/src/zarr/codecs/crc32c_.py index d40b95ef0c..f9e9ce4d1a 100644 --- a/src/zarr/codecs/crc32c_.py +++ b/src/zarr/codecs/crc32c_.py @@ -1,9 +1,10 @@ from __future__ import annotations from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast import numpy as np +import typing_extensions from crc32c import crc32c from zarr.abc.codec import BytesBytesCodec @@ -37,7 +38,7 @@ async def _decode_single( crc32_bytes = data[-4:] inner_bytes = data[:-4] - computed_checksum = np.uint32(crc32c(inner_bytes)).tobytes() + computed_checksum = np.uint32(crc32c(cast(typing_extensions.Buffer, inner_bytes))).tobytes() stored_checksum = bytes(crc32_bytes) if computed_checksum != stored_checksum: raise ValueError( @@ -52,7 +53,7 @@ async def _encode_single( ) -> Buffer | None: data = chunk_bytes.as_numpy_array() # Calculate the checksum and "cast" it to a numpy array - checksum = np.array([crc32c(data)], dtype=np.uint32) + checksum = np.array([crc32c(cast(typing_extensions.Buffer, data))], dtype=np.uint32) # Append the checksum (as bytes) to the data return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("b"))) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 3738b0859b..ce5dcd2b2e 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -372,9 +372,15 @@ async def open( else: # V3 arrays are comprised of a zarr.json object assert zarr_json_bytes is not None + zarr_metadata = json.loads(zarr_json_bytes.to_bytes()) + if zarr_metadata.get("node_type") != "array": + # This KeyError is load bearing for `open`. That currently tries + # to open the node as an `array` and then falls back to opening + # as a group. + raise KeyError return cls( store_path=store_path, - metadata=ArrayV3Metadata.from_dict(json.loads(zarr_json_bytes.to_bytes())), + metadata=ArrayV3Metadata.from_dict(zarr_metadata), ) @property diff --git a/src/zarr/core/common.py b/src/zarr/core/common.py index aaa30cfcb8..4f53c27434 100644 --- a/src/zarr/core/common.py +++ b/src/zarr/core/common.py @@ -16,6 +16,8 @@ overload, ) +import numcodecs + if TYPE_CHECKING: from collections.abc import Awaitable, Callable, Iterator @@ -167,3 +169,24 @@ def parse_order(data: Any) -> Literal["C", "F"]: if data in ("C", "F"): return cast(Literal["C", "F"], data) raise ValueError(f"Expected one of ('C', 'F'), got {data} instead.") + + +def _json_convert(o: Any) -> Any: + if isinstance(o, np.dtype): + return str(o) + if np.isscalar(o): + # convert numpy scalar to python type, and pass + # python types through + out = getattr(o, "item", lambda: o)() + if isinstance(out, complex): + # python complex types are not JSON serializable, so we use the + # serialization defined in the zarr v3 spec + return [out.real, out.imag] + return out + if isinstance(o, Enum): + return o.name + # this serializes numcodecs compressors + # todo: implement to_dict for codecs + elif isinstance(o, numcodecs.abc.Codec): + config: dict[str, Any] = o.get_config() + return config diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index 4becaf940b..126a8ba94c 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -25,8 +25,10 @@ ZGROUP_JSON, ChunkCoords, ZarrFormat, + _json_convert, ) from zarr.core.config import config +from zarr.core.metadata import ArrayMetadata, ArrayV3Metadata from zarr.core.sync import SyncMixin, sync from zarr.store import StoreLike, StorePath, make_store_path from zarr.store.common import ensure_no_existing_node @@ -77,10 +79,57 @@ def _parse_async_node(node: AsyncArray | AsyncGroup) -> Array | Group: raise TypeError(f"Unknown node type, got {type(node)}") +@dataclass(frozen=True) +class ConsolidatedMetadata: + metadata: dict[str, ArrayMetadata | GroupMetadata] + kind: Literal["inline"] = "inline" + must_understand: Literal[False] = False + + def to_dict(self) -> dict[str, JSON]: + return { + "kind": self.kind, + "must_understand": self.must_understand, + "metadata": {k: v.to_dict() for k, v in self.metadata.items()}, + } + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> ConsolidatedMetadata: + data = dict(data) + raw_metadata = data.get("metadata") + if not isinstance(raw_metadata, dict): + raise TypeError("Unexpected type for 'metadata'") + + elif not raw_metadata: + raise ValueError("Must specify metadata") + + metadata: dict[str, ArrayMetadata | GroupMetadata] + if raw_metadata: + metadata = {} + for k, v in raw_metadata.items(): + if not isinstance(v, dict): + raise TypeError(f"Invalid value for metadata items. key={k}, type={type(v)}") + + node_type = v.get("node_type", None) + if node_type == "group": + metadata[k] = GroupMetadata.from_dict(v) + elif node_type == "array": + metadata[k] = ArrayV3Metadata.from_dict(v) + else: + raise ValueError(f"Invalid node_type: '{node_type}'") + # assert data["kind"] == "inline" + if data["kind"] != "inline": + raise ValueError + + if data["must_understand"] is not False: + raise ValueError + return cls(metadata=metadata) + + @dataclass(frozen=True) class GroupMetadata(Metadata): attributes: dict[str, Any] = field(default_factory=dict) zarr_format: ZarrFormat = 3 + consolidated_metadata: ConsolidatedMetadata | None = None node_type: Literal["group"] = field(default="group", init=False) def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]: @@ -88,7 +137,7 @@ def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]: if self.zarr_format == 3: return { ZARR_JSON: prototype.buffer.from_bytes( - json.dumps(self.to_dict(), indent=json_indent).encode() + json.dumps(self.to_dict(), default=_json_convert, indent=json_indent).encode() ) } else: @@ -101,20 +150,33 @@ def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]: ), } - def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: ZarrFormat = 3): + def __init__( + self, + attributes: dict[str, Any] | None = None, + zarr_format: ZarrFormat = 3, + consolidated_metadata: ConsolidatedMetadata | None = None, + ): attributes_parsed = parse_attributes(attributes) zarr_format_parsed = parse_zarr_format(zarr_format) object.__setattr__(self, "attributes", attributes_parsed) object.__setattr__(self, "zarr_format", zarr_format_parsed) + object.__setattr__(self, "consolidated_metadata", consolidated_metadata) @classmethod def from_dict(cls, data: dict[str, Any]) -> GroupMetadata: + data = dict(data) assert data.pop("node_type", None) in ("group", None) + consolidated_metadata = data.pop("consolidated_metadata", None) + if consolidated_metadata: + data["consolidated_metadata"] = ConsolidatedMetadata.from_dict(consolidated_metadata) return cls(**data) def to_dict(self) -> dict[str, Any]: - return asdict(self) + result = asdict(replace(self, consolidated_metadata=None)) + if self.consolidated_metadata: + result["consolidated_metadata"] = self.consolidated_metadata.to_dict() + return result @dataclass(frozen=True) @@ -497,7 +559,8 @@ async def members( # as opposed to a prefix, in the store under the prefix associated with this group # in which case `key` cannot be the name of a sub-array or sub-group. logger.warning( - "Object at %s is not recognized as a component of a Zarr hierarchy.", key + "Object at %s is not recognized as a component of a Zarr hierarchy.", + key, ) async def contains(self, member: str) -> bool: diff --git a/src/zarr/core/metadata.py b/src/zarr/core/metadata.py index d541e43205..0f3c444d28 100644 --- a/src/zarr/core/metadata.py +++ b/src/zarr/core/metadata.py @@ -20,7 +20,6 @@ if TYPE_CHECKING: from typing_extensions import Self -import numcodecs.abc from zarr.core.array_spec import ArraySpec from zarr.core.common import ( @@ -30,6 +29,7 @@ ZATTRS_JSON, ChunkCoords, ZarrFormat, + _json_convert, parse_dtype, parse_named_configuration, parse_shapelike, @@ -252,27 +252,6 @@ def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: return self.chunk_key_encoding.encode_chunk_key(chunk_coords) def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]: - def _json_convert(o: Any) -> Any: - if isinstance(o, np.dtype): - return str(o) - if np.isscalar(o): - # convert numpy scalar to python type, and pass - # python types through - out = getattr(o, "item", lambda: o)() - if isinstance(out, complex): - # python complex types are not JSON serializable, so we use the - # serialization defined in the zarr v3 spec - return [out.real, out.imag] - return out - if isinstance(o, Enum): - return o.name - # this serializes numcodecs compressors - # todo: implement to_dict for codecs - elif isinstance(o, numcodecs.abc.Codec): - config: dict[str, Any] = o.get_config() - return config - raise TypeError - json_indent = config.get("json_indent") return { ZARR_JSON: prototype.buffer.from_bytes( diff --git a/tests/v3/test_metadata.py b/tests/v3/test_metadata.py index e69de29bb2..dab5ecfde2 100644 --- a/tests/v3/test_metadata.py +++ b/tests/v3/test_metadata.py @@ -0,0 +1,227 @@ +import numpy as np + +import zarr.api.synchronous +from zarr.abc.store import Store +from zarr.api.asynchronous import ( + AsyncGroup, + consolidate_metadata, + group, + open, + open_consolidated, +) +from zarr.core.array import ArrayV3Metadata +from zarr.core.group import ConsolidatedMetadata, GroupMetadata + + +async def test_consolidated(memory_store: Store) -> None: + # TODO: Figure out desired keys in + # TODO: variety in the hierarchies + # More nesting + # arrays under arrays + # single array + # etc. + g = await group(store=memory_store, attributes={"foo": "bar"}) + await g.create_array(path="air", shape=(1, 2, 3)) + await g.create_array(path="lat", shape=(1,)) + await g.create_array(path="lon", shape=(2,)) + await g.create_array(path="time", shape=(3,)) + + child = await g.create_group("child", attributes={"key": "child"}) + await child.create_array("array", shape=(4, 4), attributes={"key": "child"}) + + grandchild = await child.create_group("grandchild", attributes={"key": "grandchild"}) + await grandchild.create_array("array", shape=(4, 4), attributes={"key": "grandchild"}) + + await consolidate_metadata(memory_store) + group2 = await AsyncGroup.open(memory_store) + + array_metadata = { + "attributes": {}, + "chunk_key_encoding": { + "configuration": {"separator": "/"}, + "name": "default", + }, + "codecs": ({"configuration": {"endian": "little"}, "name": "bytes"},), + "data_type": np.dtype("float64"), + "fill_value": np.float64(0.0), + "node_type": "array", + # "shape": (1, 2, 3), + "zarr_format": 3, + } + + expected = GroupMetadata( + attributes={"foo": "bar"}, + consolidated_metadata=ConsolidatedMetadata( + kind="inline", + must_understand=False, + metadata={ + "air": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1, 2, 3), + "chunk_grid": { + "configuration": {"chunk_shape": (1, 2, 3)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "lat": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1,), + "chunk_grid": { + "configuration": {"chunk_shape": (1,)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "lon": ArrayV3Metadata.from_dict( + { + **{"shape": (2,)}, + "chunk_grid": { + "configuration": {"chunk_shape": (2,)}, + "name": "regular", + }, + **array_metadata, + } + ), + "time": ArrayV3Metadata.from_dict( + { + **{ + "shape": (3,), + "chunk_grid": { + "configuration": {"chunk_shape": (3,)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "child": GroupMetadata(attributes={"key": "child"}), + "child/array": ArrayV3Metadata.from_dict( + { + **array_metadata, + **{ + "attributes": {"key": "child"}, + "shape": (4, 4), + "chunk_grid": { + "configuration": {"chunk_shape": (4, 4)}, + "name": "regular", + }, + }, + } + ), + "child/grandchild": GroupMetadata(attributes={"key": "grandchild"}), + "child/grandchild/array": ArrayV3Metadata.from_dict( + { + **array_metadata, + **{ + "attributes": {"key": "grandchild"}, + "shape": (4, 4), + "chunk_grid": { + "configuration": {"chunk_shape": (4, 4)}, + "name": "regular", + }, + }, + } + ), + }, + ), + ) + assert group2.metadata == expected + group3 = await open(store=memory_store) + assert group3.metadata == expected + + group4 = await open_consolidated(store=memory_store) + assert group4.metadata == expected + + +def test_consolidated_sync(memory_store): + g = zarr.api.synchronous.group(store=memory_store, attributes={"foo": "bar"}) + g.create_array(name="air", shape=(1, 2, 3)) + g.create_array(name="lat", shape=(1,)) + g.create_array(name="lon", shape=(2,)) + g.create_array(name="time", shape=(3,)) + + zarr.api.synchronous.consolidate_metadata(memory_store) + group2 = zarr.api.synchronous.Group.open(memory_store) + + array_metadata = { + "attributes": {}, + "chunk_key_encoding": { + "configuration": {"separator": "/"}, + "name": "default", + }, + "codecs": ({"configuration": {"endian": "little"}, "name": "bytes"},), + "data_type": np.dtype("float64"), + "fill_value": np.float64(0.0), + "node_type": "array", + # "shape": (1, 2, 3), + "zarr_format": 3, + } + + expected = GroupMetadata( + attributes={"foo": "bar"}, + consolidated_metadata=ConsolidatedMetadata( + kind="inline", + must_understand=False, + metadata={ + "air": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1, 2, 3), + "chunk_grid": { + "configuration": {"chunk_shape": (1, 2, 3)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "lat": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1,), + "chunk_grid": { + "configuration": {"chunk_shape": (1,)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "lon": ArrayV3Metadata.from_dict( + { + **{"shape": (2,)}, + "chunk_grid": { + "configuration": {"chunk_shape": (2,)}, + "name": "regular", + }, + **array_metadata, + } + ), + "time": ArrayV3Metadata.from_dict( + { + **{ + "shape": (3,), + "chunk_grid": { + "configuration": {"chunk_shape": (3,)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + }, + ), + ) + assert group2.metadata == expected + group3 = zarr.api.synchronous.open(store=memory_store) + assert group3.metadata == expected + + group4 = zarr.api.synchronous.open_consolidated(store=memory_store) + assert group4.metadata == expected