From b23ea1377be4cc45eefaf6e272432942bc24c60d Mon Sep 17 00:00:00 2001 From: Pucheng Yang Date: Thu, 12 Oct 2023 16:40:50 -0700 Subject: [PATCH 1/5] Add spec_id back to data file --- pyiceberg/manifest.py | 10 ++++++++++ tests/avro/test_file.py | 2 ++ tests/test_integration_manifest.py | 1 + 3 files changed, 13 insertions(+) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 92ca300f6d..734eed8c3d 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -182,6 +182,7 @@ def __repr__(self) -> str: doc="Splittable offsets", ), NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"), + NestedField(field_id=141, name="spec_id", field_type=IntegerType(), required=False, doc="Partition spec ID"), ), 2: StructType( NestedField( @@ -277,6 +278,13 @@ def __repr__(self) -> str: required=False, doc="ID representing sort order for this file", ), + NestedField( + field_id=141, + name="spec_id", + field_type=IntegerType(), + required=False, + doc="ID representing spec id for this file", + ), ), } @@ -346,6 +354,7 @@ class DataFile(Record): "split_offsets", "equality_ids", "sort_order_id", + "spec_id", ) content: DataFileContent file_path: str @@ -363,6 +372,7 @@ class DataFile(Record): split_offsets: Optional[List[int]] equality_ids: Optional[List[int]] sort_order_id: Optional[int] + spec_id: Optional[int] def __setattr__(self, name: str, value: Any) -> None: """Assign a key/value to a DataFile.""" diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 518026cc4f..528baca5c1 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -135,6 +135,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: split_offsets=[4, 133697593], equality_ids=[], sort_order_id=4, + spec_id=3, ) entry = ManifestEntry( status=ManifestEntryStatus.ADDED, @@ -256,6 +257,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in split_offsets=[4, 133697593], equality_ids=[], sort_order_id=4, + spec_id=3, ) if format_version == 1: data_file.block_size_in_bytes = DEFAULT_BLOCK_SIZE diff --git a/tests/test_integration_manifest.py b/tests/test_integration_manifest.py index 475e0d40a6..34b20f271d 100644 --- a/tests/test_integration_manifest.py +++ b/tests/test_integration_manifest.py @@ -101,6 +101,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: split_offsets=entry.data_file.split_offsets, equality_ids=entry.data_file.equality_ids, sort_order_id=entry.data_file.sort_order_id, + spec_id=entry.data_file.spec_id, ) wrapped_entry_v2 = ManifestEntry(*entry.record_fields()) wrapped_entry_v2.data_file = wrapped_data_file_v2_debug From 176adf49ade2e9e516efa0553804ac9363d03533 Mon Sep 17 00:00:00 2001 From: Pucheng Yang Date: Fri, 13 Oct 2023 08:49:35 -0700 Subject: [PATCH 2/5] address comments --- pyiceberg/manifest.py | 28 +++++++++++++--------------- tests/avro/test_file.py | 1 - 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 734eed8c3d..d83c96ee51 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -182,7 +182,6 @@ def __repr__(self) -> str: doc="Splittable offsets", ), NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"), - NestedField(field_id=141, name="spec_id", field_type=IntegerType(), required=False, doc="Partition spec ID"), ), 2: StructType( NestedField( @@ -278,13 +277,6 @@ def __repr__(self) -> str: required=False, doc="ID representing sort order for this file", ), - NestedField( - field_id=141, - name="spec_id", - field_type=IntegerType(), - required=False, - doc="ID representing spec id for this file", - ), ), } @@ -592,7 +584,7 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent}, ) as reader: return [ - _inherit_sequence_number(entry, self) + _inherit_from_manifest(entry, self) for entry in reader if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] @@ -617,18 +609,21 @@ def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]: yield from reader -def _inherit_sequence_number(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry: - """Inherits the sequence numbers. +def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry: + """Inherits below properties from manifest file: + - sequence numbers. + - partition spec id. - More information in the spec: https://iceberg.apache.org/spec/#sequence-number-inheritance + More information about inheriting sequence numbers: https://iceberg.apache.org/spec/#sequence-number-inheritance Args: - entry: The manifest entry that has null sequence numbers. - manifest: The manifest that has a sequence number. + entry: The manifest entry. + manifest: The manifest file. Returns: - The manifest entry with the sequence numbers set. + The manifest entry with properties inherited. """ + # Inherit sequence numbers. # The snapshot_id is required in V1, inherit with V2 when null if entry.snapshot_id is None: entry.snapshot_id = manifest.added_snapshot_id @@ -644,6 +639,9 @@ def _inherit_sequence_number(entry: ManifestEntry, manifest: ManifestFile) -> Ma # Only available in V2, always 0 in V1 entry.file_sequence_number = manifest.sequence_number + # Inherit partition spec id. + entry.data_file.spec_id = manifest.partition_spec_id + return entry diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 528baca5c1..7d9c11679e 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -135,7 +135,6 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: split_offsets=[4, 133697593], equality_ids=[], sort_order_id=4, - spec_id=3, ) entry = ManifestEntry( status=ManifestEntryStatus.ADDED, From 44e67f0872e13d37a7e0f3ca14a81656176e121c Mon Sep 17 00:00:00 2001 From: Pucheng Yang Date: Fri, 13 Oct 2023 11:40:55 -0700 Subject: [PATCH 3/5] fix lint --- pyiceberg/manifest.py | 9 ++++++--- tests/test_integration_manifest.py | 7 ++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index d83c96ee51..fdebfee5dc 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -610,9 +610,12 @@ def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]: def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry: - """Inherits below properties from manifest file: - - sequence numbers. - - partition spec id. + """ + Inherits properties from manifest file. + + The properties that will be inherited are: + - sequence numbers + - partition spec id. More information about inheriting sequence numbers: https://iceberg.apache.org/spec/#sequence-number-inheritance diff --git a/tests/test_integration_manifest.py b/tests/test_integration_manifest.py index 34b20f271d..b6c4798d9e 100644 --- a/tests/test_integration_manifest.py +++ b/tests/test_integration_manifest.py @@ -26,11 +26,7 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.io.pyarrow import PyArrowFileIO -from pyiceberg.manifest import ( - DataFile, - ManifestEntry, - write_manifest, -) +from pyiceberg.manifest import DataFile, ManifestEntry, _inherit_from_manifest, write_manifest from pyiceberg.table import Table from pyiceberg.utils.lazydict import LazyDict @@ -105,6 +101,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: ) wrapped_entry_v2 = ManifestEntry(*entry.record_fields()) wrapped_entry_v2.data_file = wrapped_data_file_v2_debug + wrapped_entry_v2.snapshot_id = test_manifest_file.added_snapshot_id with TemporaryDirectory() as tmpdir: tmp_avro_file = tmpdir + "/test_write_manifest.avro" output = PyArrowFileIO().new_output(tmp_avro_file) From 359a93beba2a336e5f21a020ee4249173d55d401 Mon Sep 17 00:00:00 2001 From: Pucheng Yang Date: Fri, 13 Oct 2023 11:46:38 -0700 Subject: [PATCH 4/5] stage all changes so far --- tests/test_integration_manifest.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_integration_manifest.py b/tests/test_integration_manifest.py index b6c4798d9e..801cf734a8 100644 --- a/tests/test_integration_manifest.py +++ b/tests/test_integration_manifest.py @@ -26,7 +26,7 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.io.pyarrow import PyArrowFileIO -from pyiceberg.manifest import DataFile, ManifestEntry, _inherit_from_manifest, write_manifest +from pyiceberg.manifest import DataFile, ManifestEntry, write_manifest from pyiceberg.table import Table from pyiceberg.utils.lazydict import LazyDict @@ -101,7 +101,6 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: ) wrapped_entry_v2 = ManifestEntry(*entry.record_fields()) wrapped_entry_v2.data_file = wrapped_data_file_v2_debug - wrapped_entry_v2.snapshot_id = test_manifest_file.added_snapshot_id with TemporaryDirectory() as tmpdir: tmp_avro_file = tmpdir + "/test_write_manifest.avro" output = PyArrowFileIO().new_output(tmp_avro_file) From 0ac7ebfcd004c202c40fca88c1b04c63b3a1cbeb Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 13 Oct 2023 21:18:00 +0200 Subject: [PATCH 5/5] Fix tests --- tests/test_integration_manifest.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_integration_manifest.py b/tests/test_integration_manifest.py index 801cf734a8..40a57cea22 100644 --- a/tests/test_integration_manifest.py +++ b/tests/test_integration_manifest.py @@ -101,6 +101,10 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: ) wrapped_entry_v2 = ManifestEntry(*entry.record_fields()) wrapped_entry_v2.data_file = wrapped_data_file_v2_debug + wrapped_entry_v2_dict = todict(wrapped_entry_v2) + # This one should not be written + del wrapped_entry_v2_dict['data_file']['spec_id'] + with TemporaryDirectory() as tmpdir: tmp_avro_file = tmpdir + "/test_write_manifest.avro" output = PyArrowFileIO().new_output(tmp_avro_file) @@ -119,4 +123,4 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: it = iter(r) fa_entry = next(it) - assert fa_entry == todict(wrapped_entry_v2) + assert fa_entry == wrapped_entry_v2_dict