diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 92ca300f6d..fdebfee5dc 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -346,6 +346,7 @@ class DataFile(Record): "split_offsets", "equality_ids", "sort_order_id", + "spec_id", ) content: DataFileContent file_path: str @@ -363,6 +364,7 @@ class DataFile(Record): split_offsets: Optional[List[int]] equality_ids: Optional[List[int]] sort_order_id: Optional[int] + spec_id: Optional[int] def __setattr__(self, name: str, value: Any) -> None: """Assign a key/value to a DataFile.""" @@ -582,7 +584,7 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent}, ) as reader: return [ - _inherit_sequence_number(entry, self) + _inherit_from_manifest(entry, self) for entry in reader if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] @@ -607,18 +609,24 @@ def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]: yield from reader -def _inherit_sequence_number(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry: - """Inherits the sequence numbers. +def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry: + """ + Inherits properties from manifest file. + + The properties that will be inherited are: + - sequence numbers + - partition spec id. - More information in the spec: https://iceberg.apache.org/spec/#sequence-number-inheritance + More information about inheriting sequence numbers: https://iceberg.apache.org/spec/#sequence-number-inheritance Args: - entry: The manifest entry that has null sequence numbers. - manifest: The manifest that has a sequence number. + entry: The manifest entry. + manifest: The manifest file. Returns: - The manifest entry with the sequence numbers set. + The manifest entry with properties inherited. """ + # Inherit sequence numbers. # The snapshot_id is required in V1, inherit with V2 when null if entry.snapshot_id is None: entry.snapshot_id = manifest.added_snapshot_id @@ -634,6 +642,9 @@ def _inherit_sequence_number(entry: ManifestEntry, manifest: ManifestFile) -> Ma # Only available in V2, always 0 in V1 entry.file_sequence_number = manifest.sequence_number + # Inherit partition spec id. + entry.data_file.spec_id = manifest.partition_spec_id + return entry diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 518026cc4f..7d9c11679e 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -256,6 +256,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in split_offsets=[4, 133697593], equality_ids=[], sort_order_id=4, + spec_id=3, ) if format_version == 1: data_file.block_size_in_bytes = DEFAULT_BLOCK_SIZE diff --git a/tests/test_integration_manifest.py b/tests/test_integration_manifest.py index 475e0d40a6..40a57cea22 100644 --- a/tests/test_integration_manifest.py +++ b/tests/test_integration_manifest.py @@ -26,11 +26,7 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.io.pyarrow import PyArrowFileIO -from pyiceberg.manifest import ( - DataFile, - ManifestEntry, - write_manifest, -) +from pyiceberg.manifest import DataFile, ManifestEntry, write_manifest from pyiceberg.table import Table from pyiceberg.utils.lazydict import LazyDict @@ -101,9 +97,14 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: split_offsets=entry.data_file.split_offsets, equality_ids=entry.data_file.equality_ids, sort_order_id=entry.data_file.sort_order_id, + spec_id=entry.data_file.spec_id, ) wrapped_entry_v2 = ManifestEntry(*entry.record_fields()) wrapped_entry_v2.data_file = wrapped_data_file_v2_debug + wrapped_entry_v2_dict = todict(wrapped_entry_v2) + # This one should not be written + del wrapped_entry_v2_dict['data_file']['spec_id'] + with TemporaryDirectory() as tmpdir: tmp_avro_file = tmpdir + "/test_write_manifest.avro" output = PyArrowFileIO().new_output(tmp_avro_file) @@ -122,4 +123,4 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: it = iter(r) fa_entry = next(it) - assert fa_entry == todict(wrapped_entry_v2) + assert fa_entry == wrapped_entry_v2_dict