Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Ignore split offsets when the last split offset is past the file length #8860

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,16 @@ public ByteBuffer keyMetadata() {

@Override
public List<Long> splitOffsets() {
if (splitOffsets == null || splitOffsets.length == 0) {
return null;
}

// If the last split offset is past the file size this means the split offsets are corrupted and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] wondering if throwing an exception or having a pre-condition would be helpful to identify the buggy writer ?

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean throwing at read time (i.e. instead of returning null, we throw)? If so, I don't think we want to do that because that's essentially unnecessarily breaking future readers and split offsets are optional anyways; this approach takes the stance of detecting the corruption and making sure read logic doesn't leverage the corrupted metadata.

If you mean throwing at the time of writing the manifest entry (a precondition check in the constructor of BaseFile), I went back and forth on this but I think the problem there is let's say someone upgrades. When some process is performed which rewrites a set of files (including some corrupted entries) it would fail due to the precondition. The benefit is it would prevent spreading the previous corruption which is nice, but at the cost of failing operations. Considering again the corrupted split offsets will be ignored at read time anyways, failing at write time seems needless.

To prevent spreading previous corrupted state, at the time of writing the manifest if the corruption is detected the split offsets could be recomputed (a sort of "fix-up" process). This requires more investigation though, not sure how feasible it is and the perf implications (e.g. for Parquet we'd need to go through the block metadata again)

let me know what you think!

Copy link
Contributor

@singhpk234 singhpk234 Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you !

I was mostly coming from the point of view of a buggy writer (that didn't use core-lib as we expose split offsets via ParquetMetadata or purpose fully passed wrong offsets) which has already committed this metadata. Such writers will never be caught because we will be silently skipping the malformed offsets, was wondering if having a warning log then, during reads so that we could let the readers know of the corruptions and reads would be a bit un-optimized, so that it could help in backtracking the buggy writer, thoughts ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like Split Offsets being ordered in ascending order is not being enforced during reads either, we just swallow, we should be fine in this case as well then :P

      if (splitOffsets != null && ArrayUtil.isStrictlyAscending(splitOffsets)) {
        return () ->
            new OffsetsAwareSplitScanTaskIterator<>(
                self(), length(), splitOffsets, this::newSplitTask);
      } else {
        return () ->
            new FixedSizeSplitScanTaskIterator<>(
                self(), length(), targetSplitSize, this::newSplitTask);
      }
    }

// should not be used
if (splitOffsets[splitOffsets.length - 1] >= fileSizeInBytes) {
amogh-jahagirdar marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

return ArrayUtil.toUnmodifiableLongList(splitOffsets);
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class TableTestBase {
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2") // easy way to set partition data for now
.withRecordCount(1)
.withSplitOffsets(ImmutableList.of(2L, 2_000_000L))
.withSplitOffsets(ImmutableList.of(2L, 8L))
.build();
static final DeleteFile FILE_C2_DELETES =
FileMetadata.deleteFileBuilder(SPEC)
Expand All @@ -129,7 +129,7 @@ public class TableTestBase {
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=3") // easy way to set partition data for now
.withRecordCount(1)
.withSplitOffsets(ImmutableList.of(3L, 3_000L, 3_000_000L))
.withSplitOffsets(ImmutableList.of(0L, 3L, 6L))
.build();
static final DeleteFile FILE_D2_DELETES =
FileMetadata.deleteFileBuilder(SPEC)
Expand Down
17 changes: 17 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
Expand Down Expand Up @@ -159,4 +160,20 @@ public void testDeleteFilePositions() throws IOException {
}
}
}

@Test
public void testDataFileSplitOffsetsNullWhenInvalid() throws IOException {
DataFile invalidOffset =
DataFiles.builder(SPEC)
.withPath("/path/to/invalid-offsets.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.withSplitOffsets(ImmutableList.of(2L, 1000L)) // Offset 1000 is out of bounds
.build();
ManifestFile manifest = writeManifest(1000L, invalidOffset);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
DataFile file = Iterables.getOnlyElement(reader);
Assertions.assertThat(file.splitOffsets()).isNull();
}
}
}