From e591dccac715dfc7a225056ef7f38bfa7febf2ac Mon Sep 17 00:00:00 2001 From: Izer Onadim Date: Tue, 31 Oct 2023 09:07:04 +0000 Subject: [PATCH 1/4] Add failing test for timestamp unit coercion --- tests/io_components/test_write.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/io_components/test_write.py b/tests/io_components/test_write.py index 326551b9..67bf3e0c 100644 --- a/tests/io_components/test_write.py +++ b/tests/io_components/test_write.py @@ -3,6 +3,7 @@ import pandas as pd import pytest +from packaging import version from plateau.core.dataset import DatasetMetadata from plateau.core.index import ExplicitSecondaryIndex @@ -117,3 +118,33 @@ def test_raise_if_dataset_exists(store_factory, dataset_function): raise_if_dataset_exists(dataset_uuid="ThisDoesNotExist", store=store_factory) with pytest.raises(RuntimeError): raise_if_dataset_exists(dataset_uuid=dataset_function.uuid, store=store_factory) + + +@pytest.mark.skipif( + version.parse(pd.__version__) < version.parse("2"), + reason="Timestamp unit coercion is only relevant in pandas >= 2", +) +def test_coerce_schema_timestamp_units(store): + date = pd.Timestamp(2000, 1, 1) + + mps = [ + MetaPartition(label="one", data=pd.DataFrame({"a": date, "b": [date]})), + MetaPartition( + label="two", + data=pd.DataFrame({"a": date.as_unit("ns"), "b": [date.as_unit("ns")]}), + ), + ] + + try: + # Expect this not to fail even though the metapartitions have different + # timestamp units, because all units should be coerced to nanoseconds. + store_dataset_from_partitions( + partition_list=mps, + dataset_uuid="dataset_uuid", + store=store, + dataset_metadata={"some": "metadata"}, + ) + except ValueError as e: + pytest.fail( + f"Expected no error when storing partitions with different timestamp units, but got this error: {e}" + ) From 3766a0402abc1f6367f318388c0694869468bf96 Mon Sep 17 00:00:00 2001 From: Izer Onadim Date: Tue, 31 Oct 2023 09:09:33 +0000 Subject: [PATCH 2/4] Coerce the timestamp units of MetaPartition schemas before saving --- plateau/io_components/write.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/plateau/io_components/write.py b/plateau/io_components/write.py index 4721b33e..a33b29ce 100644 --- a/plateau/io_components/write.py +++ b/plateau/io_components/write.py @@ -1,6 +1,7 @@ from functools import partial from typing import Dict, Iterable, List, Optional, cast +import pyarrow as pa from minimalkv import KeyValueStore from plateau.core import naming @@ -126,6 +127,17 @@ def persist_common_metadata( return result +# Currently we only support nanosecond timestamps. +def coerce_schema_timestamps(wrapper: SchemaWrapper) -> SchemaWrapper: + schema = wrapper.internal() + fields = [] + for field in schema: + if field.type in [pa.timestamp("s"), pa.timestamp("ms"), pa.timestamp("us")]: + field = pa.field(field.name, pa.timestamp("ns")) + fields.append(field) + return SchemaWrapper(pa.schema(fields, schema.metadata), wrapper.origin) + + def store_dataset_from_partitions( partition_list, store: StoreInput, @@ -161,7 +173,7 @@ def store_dataset_from_partitions( for mp in partition_list: if mp.schema: - schemas.add(mp.schema) + schemas.add(coerce_schema_timestamps(mp.schema)) dataset_builder.schema = persist_common_metadata( schemas=schemas, From 7f3af631c9020268b04523add748601750fdc189 Mon Sep 17 00:00:00 2001 From: Izer Onadim Date: Tue, 31 Oct 2023 09:33:30 +0000 Subject: [PATCH 3/4] Add change log entry --- CHANGES.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 19aa97d2..dbe9055c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,13 @@ Changelog ========= +Plateau 4.2.1 (2023-10-31) +========================== + +* Add support for pandas 2.1 +* Fix a bug to do with timestamp dtype conversion +* Add timestamp unit coercion as Plateau currently only supports nanosecond units on timestamps + Plateau 4.2.0 (2023-10-10) ========================== From 3fb34b2ee0d708892732ede1369b9d1c223881bc Mon Sep 17 00:00:00 2001 From: Izer Onadim Date: Tue, 31 Oct 2023 11:59:33 +0000 Subject: [PATCH 4/4] Improve timestamp unit coercion test --- tests/io_components/test_write.py | 42 ++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/tests/io_components/test_write.py b/tests/io_components/test_write.py index 67bf3e0c..c9c2423c 100644 --- a/tests/io_components/test_write.py +++ b/tests/io_components/test_write.py @@ -9,6 +9,7 @@ from plateau.core.index import ExplicitSecondaryIndex from plateau.core.testing import TIME_TO_FREEZE_ISO from plateau.io_components.metapartition import MetaPartition +from plateau.io_components.read import dispatch_metapartitions from plateau.io_components.write import ( raise_if_dataset_exists, store_dataset_from_partitions, @@ -127,7 +128,7 @@ def test_raise_if_dataset_exists(store_factory, dataset_function): def test_coerce_schema_timestamp_units(store): date = pd.Timestamp(2000, 1, 1) - mps = [ + mps_original = [ MetaPartition(label="one", data=pd.DataFrame({"a": date, "b": [date]})), MetaPartition( label="two", @@ -135,16 +136,29 @@ def test_coerce_schema_timestamp_units(store): ), ] - try: - # Expect this not to fail even though the metapartitions have different - # timestamp units, because all units should be coerced to nanoseconds. - store_dataset_from_partitions( - partition_list=mps, - dataset_uuid="dataset_uuid", - store=store, - dataset_metadata={"some": "metadata"}, - ) - except ValueError as e: - pytest.fail( - f"Expected no error when storing partitions with different timestamp units, but got this error: {e}" - ) + mps = map( + lambda mp: mp.store_dataframes(store, dataset_uuid="dataset_uuid"), mps_original + ) + + # Expect this not to fail even though the metapartitions have different + # timestamp units, because all units should be coerced to nanoseconds. + dataset = store_dataset_from_partitions( + partition_list=mps, + dataset_uuid="dataset_uuid", + store=store, + dataset_metadata={"some": "metadata"}, + ) + + # Ensure the dataset can be loaded properly + stored_dataset = DatasetMetadata.load_from_store("dataset_uuid", store) + assert dataset == stored_dataset + + mps = dispatch_metapartitions("dataset_uuid", store) + mps_loaded = map(lambda mp: mp.load_dataframes(store), mps) + + # Ensure the values and dtypes of the loaded datasets are correct + for mp in mps_loaded: + assert mp.data["a"].dtype == "datetime64[ns]" + assert mp.data["b"].dtype == "datetime64[ns]" + assert mp.data["a"][0] == date + assert mp.data["b"][0] == date