From 9a29a8e31c2bc85fecca135a5e5a8c2514065b56 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Mon, 16 Oct 2023 17:48:51 +0200 Subject: [PATCH 1/7] inital concatenation with variable length chunks --- kerchunk/combine.py | 42 +++++++++++++++++++++++---- kerchunk/tests/test_combine_concat.py | 25 ++++++++++++++++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/kerchunk/combine.py b/kerchunk/combine.py index 4426d445..f43fcc23 100644 --- a/kerchunk/combine.py +++ b/kerchunk/combine.py @@ -566,6 +566,8 @@ def concatenate_arrays( Input reference sets, maybe generated by ``kerchunk.zarr.single_zarr`` storage_options: dict | None To create the filesystems, such at target/remote protocol and target/remote options + axis + Axis to concatenate along. key_seperator: str "." or "/", how the zarr keys are stored path: str or None @@ -589,6 +591,7 @@ def _replace(l: list, i: int, v) -> list: return l n_files = len(files) + chunk_type = None # None, "fixed", "variable" chunks_offset = 0 for i, fn in enumerate(files): @@ -596,11 +599,19 @@ def _replace(l: list, i: int, v) -> list: zarray = ujson.load(fs.open(f"{path}.zarray")) shape = zarray["shape"] chunks = zarray["chunks"] - n_chunks, rem = divmod(shape[axis], chunks[axis]) - n_chunks += rem > 0 + + if isinstance(chunks[axis], int): + n_chunks, rem = divmod(shape[axis], chunks[axis]) + n_chunks += rem > 0 + else: + n_chunks = len(chunks[axis]) + rem = 0 + # Can this ever work? + # rem = sum(chunks[axis]) - shape[axis] if i == 0: base_shape = _replace(shape, axis, None) + chunk_type = "fixed" if isinstance(chunks[axis], int) else "variable" base_chunks = chunks # result_* are modified in-place result_zarray = zarray @@ -610,6 +621,13 @@ def _replace(l: list, i: int, v) -> list: out[name] = fs.references[name] else: result_shape[axis] += shape[axis] + if chunk_type == "fixed": + assert isinstance(chunks[axis], int) + elif chunk_type == "variable": + assert isinstance(chunks[axis], list) + result_zarray["chunks"][axis] = ( + result_zarray["chunks"][axis] + chunks[axis] + ) # Safety checks if check_arrays: @@ -620,10 +638,22 @@ def _replace(l: list, i: int, v) -> list: raise ValueError( f"Incompatible array shape at index {i}. Expected {expected_shape}, got {shape}." ) - if chunks != base_chunks: - raise ValueError( - f"Incompatible array chunks at index {i}. Expected {base_chunks}, got {chunks}." - ) + if chunk_type == "fixed": + if chunks != base_chunks: + raise ValueError( + f"Incompatible array chunks at index {i}. Expected {base_chunks}, got {chunks}." + ) + else: + assert chunk_type == "variable" + base_chunks_to_compare = [ + v for i, v in enumerate(base_chunks) if i != axis + ] + chunks_to_compare = [v for i, v in enumerate(chunks) if i != axis] + # TODO: deduplicate + if chunks_to_compare != base_chunks_to_compare: + raise ValueError( + f"Incompatible array chunks at index {i}. Expected {base_chunks}, got {chunks}." + ) if i < (n_files - 1) and rem != 0: raise ValueError( f"Array at index {i} has irregular chunking at its boundary. " diff --git a/kerchunk/tests/test_combine_concat.py b/kerchunk/tests/test_combine_concat.py index 3f7ff823..083f1c36 100644 --- a/kerchunk/tests/test_combine_concat.py +++ b/kerchunk/tests/test_combine_concat.py @@ -139,3 +139,28 @@ def test_fail_irregular_chunk_boundaries(tmpdir): with pytest.raises(ValueError, match=r"Array at index 0 has irregular chunking.*"): kerchunk.combine.concatenate_arrays([ref1, ref2], path="x", check_arrays=True) + + +def test_variable_length_chunks(tmpdir): + fn1 = f"{tmpdir}/out1.zarr" + fn2 = f"{tmpdir}/out2.zarr" + x1 = np.arange(10) + x2 = np.arange(10, 20) + g1 = zarr.open(fn1) + g1.create_dataset("x", data=x1, chunks=([2, 2, 2, 2, 2],)) + g2 = zarr.open(fn2) + g2.create_dataset("x", data=x2, chunks=([3, 3, 3, 1],)) + + ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) + ref2 = kerchunk.zarr.single_zarr(fn2, inline=0) + + out = kerchunk.combine.concatenate_arrays( + [ref1, ref2], + path="x", + check_arrays=True, # allow_varible_chunks=True + ) + mapper = fsspec.get_mapper("reference://", fo=out) + g_result = zarr.open(mapper) + + assert g_result["x"].chunks == ([2, 2, 2, 2, 2, 3, 3, 3, 1],) + np.testing.assert_array_equal(g_result["x"][:], np.arange(20)) From 844ff1089ffe1abf4cb6d27d78693772fbf57645 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Mon, 16 Oct 2023 17:57:54 +0200 Subject: [PATCH 2/7] add zarr dep to requirements --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index a04d8aa1..107fa87c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,4 @@ fsspec numcodecs<=0.11 numpy ujson -zarr +zarr @ git+https://github.com/martindurant/zarr.git@varchunk From b7ed8f964dda03a432dccd5a2ac4f042ac10a993 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Mon, 16 Oct 2023 18:16:31 +0200 Subject: [PATCH 3/7] Start expansion of test suite --- kerchunk/tests/test_combine_concat.py | 54 ++++++++++++++++++--------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/kerchunk/tests/test_combine_concat.py b/kerchunk/tests/test_combine_concat.py index 083f1c36..f3196dc9 100644 --- a/kerchunk/tests/test_combine_concat.py +++ b/kerchunk/tests/test_combine_concat.py @@ -141,26 +141,46 @@ def test_fail_irregular_chunk_boundaries(tmpdir): kerchunk.combine.concatenate_arrays([ref1, ref2], path="x", check_arrays=True) -def test_variable_length_chunks(tmpdir): - fn1 = f"{tmpdir}/out1.zarr" - fn2 = f"{tmpdir}/out2.zarr" - x1 = np.arange(10) - x2 = np.arange(10, 20) - g1 = zarr.open(fn1) - g1.create_dataset("x", data=x1, chunks=([2, 2, 2, 2, 2],)) - g2 = zarr.open(fn2) - g2.create_dataset("x", data=x2, chunks=([3, 3, 3, 1],)) - - ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) - ref2 = kerchunk.zarr.single_zarr(fn2, inline=0) +@pytest.mark.parametrize( + "arrays,axis,expected_chunks", + [ + ( + [ + (np.arange(10), ([2, 2, 2, 2, 2],)), + (np.arange(10, 20), ([3, 3, 3, 1],)), + ], + 0, + ([2, 2, 2, 2, 2, 3, 3, 3, 1],), + ), + ( + [ + (np.broadcast_to(np.arange(6), (10, 6)), ([5, 5], [6])), + (np.broadcast_to(np.arange(7, 10), (10, 3)), ([5, 5], [3])), + ], + 1, + ([5, 5], [6, 3]), + ), + ], +) +def test_variable_length_chunks(tmpdir, arrays, axis, expected_chunks): + fns = [] + refs = [] + for i, (x, chunks) in enumerate(arrays): + fn = f"{tmpdir}/out{i}.zarr" + g = zarr.open(fn) + g.create_dataset("x", data=x, chunks=chunks) + fns.append(fn) + ref = kerchunk.zarr.single_zarr(fn, inline=0) + refs.append(ref) out = kerchunk.combine.concatenate_arrays( - [ref1, ref2], - path="x", - check_arrays=True, # allow_varible_chunks=True + refs, axis=axis, path="x", check_arrays=True ) + mapper = fsspec.get_mapper("reference://", fo=out) g_result = zarr.open(mapper) - assert g_result["x"].chunks == ([2, 2, 2, 2, 2, 3, 3, 3, 1],) - np.testing.assert_array_equal(g_result["x"][:], np.arange(20)) + assert g_result["x"].chunks == expected_chunks + np.testing.assert_array_equal( + g_result["x"][:], np.concatenate([a for a, _ in arrays], axis=axis) + ) From dd11b865c38713d5ca6a5ac28f6a1945d92284f8 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Mon, 16 Oct 2023 18:27:42 +0200 Subject: [PATCH 4/7] Simplify tests by using zarr arrays as input --- kerchunk/tests/test_combine_concat.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/kerchunk/tests/test_combine_concat.py b/kerchunk/tests/test_combine_concat.py index f3196dc9..4c2829e5 100644 --- a/kerchunk/tests/test_combine_concat.py +++ b/kerchunk/tests/test_combine_concat.py @@ -146,16 +146,20 @@ def test_fail_irregular_chunk_boundaries(tmpdir): [ ( [ - (np.arange(10), ([2, 2, 2, 2, 2],)), - (np.arange(10, 20), ([3, 3, 3, 1],)), + zarr.array(np.arange(10), chunks=([2, 2, 2, 2, 2],)), + zarr.array(np.arange(10, 20), chunks=([3, 3, 3, 1],)), ], 0, ([2, 2, 2, 2, 2, 3, 3, 3, 1],), ), ( [ - (np.broadcast_to(np.arange(6), (10, 6)), ([5, 5], [6])), - (np.broadcast_to(np.arange(7, 10), (10, 3)), ([5, 5], [3])), + zarr.array( + np.broadcast_to(np.arange(6), (10, 6)), chunks=([5, 5], [6]) + ), + zarr.array( + np.broadcast_to(np.arange(7, 10), (10, 3)), chunks=([5, 5], [3]) + ), ], 1, ([5, 5], [6, 3]), @@ -165,10 +169,10 @@ def test_fail_irregular_chunk_boundaries(tmpdir): def test_variable_length_chunks(tmpdir, arrays, axis, expected_chunks): fns = [] refs = [] - for i, (x, chunks) in enumerate(arrays): + for i, x in enumerate(arrays): fn = f"{tmpdir}/out{i}.zarr" g = zarr.open(fn) - g.create_dataset("x", data=x, chunks=chunks) + g.create_dataset("x", data=x, chunks=x.chunks) fns.append(fn) ref = kerchunk.zarr.single_zarr(fn, inline=0) refs.append(ref) @@ -182,5 +186,5 @@ def test_variable_length_chunks(tmpdir, arrays, axis, expected_chunks): assert g_result["x"].chunks == expected_chunks np.testing.assert_array_equal( - g_result["x"][:], np.concatenate([a for a, _ in arrays], axis=axis) + g_result["x"][...], np.concatenate([a[...] for a in arrays], axis=axis) ) From 0af250900425e67191bbb4843e11835c2a364042 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Mon, 16 Oct 2023 20:21:41 +0200 Subject: [PATCH 5/7] Add mismatched chunk failure test --- kerchunk/tests/test_combine_concat.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/kerchunk/tests/test_combine_concat.py b/kerchunk/tests/test_combine_concat.py index 4c2829e5..063b393c 100644 --- a/kerchunk/tests/test_combine_concat.py +++ b/kerchunk/tests/test_combine_concat.py @@ -166,7 +166,7 @@ def test_fail_irregular_chunk_boundaries(tmpdir): ), ], ) -def test_variable_length_chunks(tmpdir, arrays, axis, expected_chunks): +def test_variable_length_chunks_success(tmpdir, arrays, axis, expected_chunks): fns = [] refs = [] for i, x in enumerate(arrays): @@ -188,3 +188,26 @@ def test_variable_length_chunks(tmpdir, arrays, axis, expected_chunks): np.testing.assert_array_equal( g_result["x"][...], np.concatenate([a[...] for a in arrays], axis=axis) ) + + +def test_variable_length_chunks_mismatch_chunk_failure(tmpdir): + arrays = [ + zarr.array(np.arange(12).reshape(4, 3), chunks=([2, 2], [1, 2])), + zarr.array(np.arange(12, 24).reshape(4, 3), chunks=([4], [2, 1])), + ] + axis = 0 + + fns = [] + refs = [] + for i, x in enumerate(arrays): + fn = f"{tmpdir}/out{i}.zarr" + g = zarr.open(fn) + g.create_dataset("x", data=x, chunks=x.chunks) + fns.append(fn) + ref = kerchunk.zarr.single_zarr(fn, inline=0) + refs.append(ref) + + with pytest.raises(ValueError): + kerchunk.combine.concatenate_arrays( + refs, axis=axis, path="x", check_arrays=True + ) From b9041ad8a316391360d32404d9e8aeb7039f95e3 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Tue, 31 Oct 2023 14:11:50 +0100 Subject: [PATCH 6/7] Better inference of variable chunking from inputs --- kerchunk/combine.py | 30 +++++++++++++++--- kerchunk/tests/test_combine_concat.py | 44 ++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/kerchunk/combine.py b/kerchunk/combine.py index f43fcc23..26c97385 100644 --- a/kerchunk/combine.py +++ b/kerchunk/combine.py @@ -591,18 +591,38 @@ def _replace(l: list, i: int, v) -> list: return l n_files = len(files) - chunk_type = None # None, "fixed", "variable" + fss = [ + fsspec.filesystem("reference", fo=fn, **(storage_options or {})) for fn in files + ] + zarrays = [ujson.load(fs.open(f"{path}.zarray")) for fs in fss] + + # Determine chunk type + _prev_chunks = zarrays[0]["chunks"][axis] + for zarray in zarrays: + axis_chunks = zarray["chunks"][axis] + if isinstance(axis_chunks, list) or (axis_chunks != _prev_chunks): + chunk_type = "variable" + break + _prev_chunks = axis_chunks + else: + chunk_type = "fixed" chunks_offset = 0 - for i, fn in enumerate(files): - fs = fsspec.filesystem("reference", fo=fn, **(storage_options or {})) - zarray = ujson.load(fs.open(f"{path}.zarray")) + for i, (fs, zarray) in enumerate(zip(fss, zarrays)): shape = zarray["shape"] chunks = zarray["chunks"] if isinstance(chunks[axis], int): n_chunks, rem = divmod(shape[axis], chunks[axis]) n_chunks += rem > 0 + if chunk_type == "variable": + if rem != 0: + raise ValueError( + "Cannot handle padded chunks when creating variably chunked arrays. " + f"{i}-th array has {rem} padding for the last chunk." + ) + assert rem == 0 + chunks[axis] = [chunks[axis] for _ in range(n_chunks)] else: n_chunks = len(chunks[axis]) rem = 0 @@ -611,7 +631,7 @@ def _replace(l: list, i: int, v) -> list: if i == 0: base_shape = _replace(shape, axis, None) - chunk_type = "fixed" if isinstance(chunks[axis], int) else "variable" + # chunk_type = "fixed" if isinstance(chunks[axis], int) else "variable" base_chunks = chunks # result_* are modified in-place result_zarray = zarray diff --git a/kerchunk/tests/test_combine_concat.py b/kerchunk/tests/test_combine_concat.py index 063b393c..a89f3d3d 100644 --- a/kerchunk/tests/test_combine_concat.py +++ b/kerchunk/tests/test_combine_concat.py @@ -103,7 +103,10 @@ def test_fail_chunks(tmpdir): ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) ref2 = kerchunk.zarr.single_zarr(fn2, inline=0) - with pytest.raises(ValueError, match=r"Incompatible array chunks at index 1.*"): + with pytest.raises( + ValueError, + match=r"Cannot handle padded chunks when creating variably chunked arrays.", + ): kerchunk.combine.concatenate_arrays([ref1, ref2], path="x", check_arrays=True) @@ -164,6 +167,22 @@ def test_fail_irregular_chunk_boundaries(tmpdir): 1, ([5, 5], [6, 3]), ), + ( + [ + zarr.array(np.arange(10), chunks=(5,)), + zarr.array(np.arange(10, 20), chunks=([3, 7],)), + ], + 0, + ([5, 5, 3, 7],), + ), + ( # Inferring variable chunking from fixed inputs + [ + zarr.array(np.arange(12), chunks=(6,)), + zarr.array(np.arange(12, 24), chunks=(4,)), + ], + 0, + ([6, 6, 4, 4, 4],), + ), ], ) def test_variable_length_chunks_success(tmpdir, arrays, axis, expected_chunks): @@ -211,3 +230,26 @@ def test_variable_length_chunks_mismatch_chunk_failure(tmpdir): kerchunk.combine.concatenate_arrays( refs, axis=axis, path="x", check_arrays=True ) + + +def test_fixed_to_variable_padding_failure(tmpdir): + arrays = [ + zarr.array(np.arange(12), chunks=(6,)), + zarr.array(np.arange(12, 24), chunks=(8,)), + ] + axis = 0 + + fns = [] + refs = [] + for i, x in enumerate(arrays): + fn = f"{tmpdir}/out{i}.zarr" + g = zarr.open(fn) + g.create_dataset("x", data=x, chunks=x.chunks) + fns.append(fn) + ref = kerchunk.zarr.single_zarr(fn, inline=0) + refs.append(ref) + + with pytest.raises(ValueError): + kerchunk.combine.concatenate_arrays( + refs, axis=axis, path="x", check_arrays=True + ) From ae48a748097631069d05d26f3606e9df44da6bf9 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Tue, 31 Oct 2023 14:17:59 +0100 Subject: [PATCH 7/7] Remove commented line --- kerchunk/combine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kerchunk/combine.py b/kerchunk/combine.py index 26c97385..199c7360 100644 --- a/kerchunk/combine.py +++ b/kerchunk/combine.py @@ -631,7 +631,6 @@ def _replace(l: list, i: int, v) -> list: if i == 0: base_shape = _replace(shape, axis, None) - # chunk_type = "fixed" if isinstance(chunks[axis], int) else "variable" base_chunks = chunks # result_* are modified in-place result_zarray = zarray