Skip to content

Commit

Permalink
Core, Flink, Spark, KafkaConnect: Remove usage of deprecated path API (
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar authored Dec 11, 2024
1 parent ff81344 commit da53495
Show file tree
Hide file tree
Showing 126 changed files with 535 additions and 533 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ public FileContent content() {

@Override
public CharSequence path() {
return deleteFile.path();
return deleteFile.location();
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/V1Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public FileContent content() {

@Override
public CharSequence path() {
return wrapped.path();
return wrapped.location();
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ public FileContent content() {

@Override
public CharSequence path() {
return wrapped.path();
return wrapped.location();
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/V3Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ public FileContent content() {

@Override
public CharSequence path() {
return wrapped.path();
return wrapped.location();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testReplaceManifestsSeparate() {

// cluster by path will split the manifest into two

table.rewriteManifests().clusterBy(file -> file.location()).commit();
table.rewriteManifests().clusterBy(ContentFile::location).commit();

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ private void assertDeletes(FileScanTask task, DeleteFile... expectedDeleteFiles)
}

private CharSequenceSet deletePaths(FileScanTask task) {
return CharSequenceSet.of(Iterables.transform(task.deletes(), ContentFile::path));
return CharSequenceSet.of(Iterables.transform(task.deletes(), ContentFile::location));
}

private List<FileScanTask> planTasks() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void testInsertDuplicatedKey() throws IOException {

// Check records in the data file.
DataFile dataFile = result.dataFiles()[0];
assertThat(readRecordsAsList(table.schema(), dataFile.path()))
assertThat(readRecordsAsList(table.schema(), dataFile.location()))
.isEqualTo(
ImmutableList.of(
createRecord(1, "aaa"),
Expand All @@ -192,13 +192,13 @@ public void testInsertDuplicatedKey() throws IOException {

// Check records in the pos-delete file.
Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.path()))
assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
.isEqualTo(
ImmutableList.of(
posRecord.copy("file_path", dataFile.path(), "pos", 0L),
posRecord.copy("file_path", dataFile.path(), "pos", 1L),
posRecord.copy("file_path", dataFile.path(), "pos", 2L),
posRecord.copy("file_path", dataFile.path(), "pos", 3L)));
posRecord.copy("file_path", dataFile.location(), "pos", 0L),
posRecord.copy("file_path", dataFile.location(), "pos", 1L),
posRecord.copy("file_path", dataFile.location(), "pos", 2L),
posRecord.copy("file_path", dataFile.location(), "pos", 3L)));
}

@TestTemplate
Expand Down Expand Up @@ -226,13 +226,13 @@ public void testUpsertSameRow() throws IOException {

// Check records in the data file.
DataFile dataFile = result.dataFiles()[0];
assertThat(readRecordsAsList(table.schema(), dataFile.path()))
assertThat(readRecordsAsList(table.schema(), dataFile.location()))
.isEqualTo(ImmutableList.of(record, record));

// Check records in the pos-delete file.
DeleteFile posDeleteFile = result.deleteFiles()[0];
assertThat(readRecordsAsList(DeleteSchemaUtil.pathPosSchema(), posDeleteFile.path()))
.isEqualTo(ImmutableList.of(posRecord.copy("file_path", dataFile.path(), "pos", 0L)));
assertThat(readRecordsAsList(DeleteSchemaUtil.pathPosSchema(), posDeleteFile.location()))
.isEqualTo(ImmutableList.of(posRecord.copy("file_path", dataFile.location(), "pos", 0L)));

deltaWriter =
createTaskWriter(eqDeleteFieldIds, eqDeleteRowSchema, DeleteGranularity.PARTITION);
Expand Down Expand Up @@ -312,24 +312,24 @@ public void testUpsertData() throws IOException {

// Check records in the data file.
DataFile dataFile = result.dataFiles()[0];
assertThat(readRecordsAsList(table.schema(), dataFile.path()))
assertThat(readRecordsAsList(table.schema(), dataFile.location()))
.isEqualTo(
ImmutableList.of(
createRecord(5, "aaa"), createRecord(6, "aaa"), createRecord(7, "ccc")));

// Check records in the eq-delete file.
DeleteFile eqDeleteFile = result.deleteFiles()[0];
assertThat(eqDeleteFile.content()).isEqualTo(FileContent.EQUALITY_DELETES);
assertThat(readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()))
assertThat(readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.location()))
.isEqualTo(
ImmutableList.of(keyFunc.apply("aaa"), keyFunc.apply("ccc"), keyFunc.apply("bbb")));

// Check records in the pos-delete file.
DeleteFile posDeleteFile = result.deleteFiles()[1];
Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
assertThat(posDeleteFile.content()).isEqualTo(FileContent.POSITION_DELETES);
assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.path()))
.isEqualTo(ImmutableList.of(posRecord.copy("file_path", dataFile.path(), "pos", 0L)));
assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
.isEqualTo(ImmutableList.of(posRecord.copy("file_path", dataFile.location(), "pos", 0L)));
}

@TestTemplate
Expand Down Expand Up @@ -397,15 +397,15 @@ public void testUpsertDataWithFullRowSchema() throws IOException {

// Check records in the data file.
DataFile dataFile = result.dataFiles()[0];
assertThat(readRecordsAsList(table.schema(), dataFile.path()))
assertThat(readRecordsAsList(table.schema(), dataFile.location()))
.isEqualTo(
ImmutableList.of(
createRecord(5, "aaa"), createRecord(6, "aaa"), createRecord(7, "ccc")));

// Check records in the eq-delete file.
DeleteFile eqDeleteFile = result.deleteFiles()[0];
assertThat(eqDeleteFile.content()).isEqualTo(FileContent.EQUALITY_DELETES);
assertThat(readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()))
assertThat(readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.location()))
.isEqualTo(
ImmutableList.of(
createRecord(3, "aaa"), createRecord(4, "ccc"), createRecord(2, "bbb")));
Expand All @@ -414,8 +414,8 @@ public void testUpsertDataWithFullRowSchema() throws IOException {
DeleteFile posDeleteFile = result.deleteFiles()[1];
Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
assertThat(posDeleteFile.content()).isEqualTo(FileContent.POSITION_DELETES);
assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.path()))
.isEqualTo(ImmutableList.of(posRecord.copy("file_path", dataFile.path(), "pos", 0L)));
assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
.isEqualTo(ImmutableList.of(posRecord.copy("file_path", dataFile.location(), "pos", 0L)));
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private static class FlinkDeleteFilter extends DeleteFilter<RowData> {
Schema tableSchema,
Schema requestedSchema,
InputFilesDecryptor inputFilesDecryptor) {
super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema);
super(task.file().location(), task.deletes(), tableSchema, requestedSchema);
this.requiredRowType = FlinkSchemaUtil.convert(requiredSchema());
this.asStructLike = new RowDataWrapper(requiredRowType, requiredSchema().asStruct());
this.inputFilesDecryptor = inputFilesDecryptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private String toString(Collection<FileScanTask> files) {
.map(
fileScanTask ->
MoreObjects.toStringHelper(fileScanTask)
.add("file", fileScanTask.file().path().toString())
.add("file", fileScanTask.file().location())
.add("start", fileScanTask.start())
.add("length", fileScanTask.length())
.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType)
for (FileScanTask scanTask : fileScanTasks) {
long recordCountInFile = scanTask.file().recordCount();

String[] splitFilePath = scanTask.file().path().toString().split("/");
String[] splitFilePath = scanTask.file().location().split("/");
// Filename example: 00007-0-a7d3a29a-33e9-4740-88f4-0f494397d60c-00001.parquet
// Writer ID: .......^^^^^
String filename = splitFilePath[splitFilePath.length - 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,6 @@ public void testDeleteStats() throws Exception {
new String(
deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array());
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
assumeThat(fromStat).isEqualTo(dataFile.path().toString());
assumeThat(fromStat).isEqualTo(dataFile.location());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testCloseTwice() throws IOException {

FileSystem fs = FileSystem.get(CONF);
for (DataFile dataFile : dataFiles) {
assertThat(fs.exists(new Path(dataFile.path().toString()))).isTrue();
assertThat(fs.exists(new Path(dataFile.location()))).isTrue();
}
}
}
Expand All @@ -133,7 +133,7 @@ public void testAbort() throws IOException {

FileSystem fs = FileSystem.get(CONF);
for (DataFile dataFile : dataFiles) {
assertThat(fs.exists(new Path(dataFile.path().toString()))).isFalse();
assertThat(fs.exists(new Path(dataFile.location()))).isFalse();
}
}
}
Expand All @@ -155,7 +155,7 @@ public void testCompleteFiles() throws IOException {

FileSystem fs = FileSystem.get(CONF);
for (DataFile dataFile : dataFiles) {
assertThat(fs.exists(new Path(dataFile.path().toString()))).isTrue();
assertThat(fs.exists(new Path(dataFile.location()))).isTrue();
}

AppendFiles appendFiles = table.newAppend();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,9 @@ public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception {
assertThat(split.task().files()).hasSize(2);
Set<String> discoveredFiles =
split.task().files().stream()
.map(fileScanTask -> fileScanTask.file().path().toString())
.map(fileScanTask -> fileScanTask.file().location())
.collect(Collectors.toSet());
Set<String> expectedFiles =
ImmutableSet.of(dataFile1.path().toString(), dataFile2.path().toString());
Set<String> expectedFiles = ImmutableSet.of(dataFile1.location(), dataFile2.location());
assertThat(discoveredFiles).containsExactlyInAnyOrderElementsOf(expectedFiles);

IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
Expand Down Expand Up @@ -244,10 +243,10 @@ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio
assertThat(split.task().files()).hasSize(1);
Set<String> discoveredFiles =
split.task().files().stream()
.map(fileScanTask -> fileScanTask.file().path().toString())
.map(fileScanTask -> fileScanTask.file().location())
.collect(Collectors.toSet());
// should discover dataFile2 appended in snapshot2
Set<String> expectedFiles = ImmutableSet.of(dataFile2.path().toString());
Set<String> expectedFiles = ImmutableSet.of(dataFile2.location());
assertThat(discoveredFiles).containsExactlyElementsOf(expectedFiles);

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
Expand Down Expand Up @@ -316,11 +315,10 @@ public void testIncrementalFromEarliestSnapshotWithNonEmptyTable() throws Except
assertThat(split.task().files()).hasSize(2);
Set<String> discoveredFiles =
split.task().files().stream()
.map(fileScanTask -> fileScanTask.file().path().toString())
.map(fileScanTask -> fileScanTask.file().location())
.collect(Collectors.toSet());
// should discover files appended in both snapshot1 and snapshot2
Set<String> expectedFiles =
ImmutableSet.of(dataFile1.path().toString(), dataFile2.path().toString());
Set<String> expectedFiles = ImmutableSet.of(dataFile1.location(), dataFile2.location());
assertThat(discoveredFiles).containsExactlyInAnyOrderElementsOf(expectedFiles);

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
Expand Down Expand Up @@ -406,10 +404,10 @@ public void testIncrementalFromSnapshotId() throws Exception {
assertThat(split.task().files()).hasSize(1);
Set<String> discoveredFiles =
split.task().files().stream()
.map(fileScanTask -> fileScanTask.file().path().toString())
.map(fileScanTask -> fileScanTask.file().location())
.collect(Collectors.toSet());
// should discover dataFile2 appended in snapshot2
Set<String> expectedFiles = ImmutableSet.of(dataFile2.path().toString());
Set<String> expectedFiles = ImmutableSet.of(dataFile2.location());
assertThat(discoveredFiles).containsExactlyElementsOf(expectedFiles);

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
Expand Down Expand Up @@ -489,10 +487,10 @@ public void testIncrementalFromSnapshotTimestamp() throws Exception {
assertThat(split.task().files()).hasSize(1);
Set<String> discoveredFiles =
split.task().files().stream()
.map(fileScanTask -> fileScanTask.file().path().toString())
.map(fileScanTask -> fileScanTask.file().location())
.collect(Collectors.toSet());
// should discover dataFile2 appended in snapshot2
Set<String> expectedFiles = ImmutableSet.of(dataFile2.path().toString());
Set<String> expectedFiles = ImmutableSet.of(dataFile2.location());
assertThat(discoveredFiles).containsExactlyElementsOf(expectedFiles);

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
Expand Down Expand Up @@ -529,12 +527,12 @@ public void testMaxPlanningSnapshotCount() throws Exception {
ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
// should discover dataFile1 appended in snapshot1
verifyMaxPlanningSnapshotCountResult(
secondResult, null, snapshot1, ImmutableSet.of(dataFile1.path().toString()));
secondResult, null, snapshot1, ImmutableSet.of(dataFile1.location()));

ContinuousEnumerationResult thirdResult = splitPlanner.planSplits(secondResult.toPosition());
// should discover dataFile2 appended in snapshot2
verifyMaxPlanningSnapshotCountResult(
thirdResult, snapshot1, snapshot2, ImmutableSet.of(dataFile2.path().toString()));
thirdResult, snapshot1, snapshot2, ImmutableSet.of(dataFile2.location()));
}

@Test
Expand Down Expand Up @@ -670,7 +668,7 @@ private void verifyMaxPlanningSnapshotCountResult(
assertThat(split.task().files()).hasSize(1);
Set<String> discoveredFiles =
split.task().files().stream()
.map(fileScanTask -> fileScanTask.file().path().toString())
.map(fileScanTask -> fileScanTask.file().location())
.collect(Collectors.toSet());
assertThat(discoveredFiles).containsExactlyElementsOf(expectedFiles);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private static class FlinkDeleteFilter extends DeleteFilter<RowData> {
Schema tableSchema,
Schema requestedSchema,
InputFilesDecryptor inputFilesDecryptor) {
super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema);
super(task.file().location(), task.deletes(), tableSchema, requestedSchema);
this.requiredRowType = FlinkSchemaUtil.convert(requiredSchema());
this.asStructLike = new RowDataWrapper(requiredRowType, requiredSchema().asStruct());
this.inputFilesDecryptor = inputFilesDecryptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private String toString(Collection<FileScanTask> files) {
.map(
fileScanTask ->
MoreObjects.toStringHelper(fileScanTask)
.add("file", fileScanTask.file().path().toString())
.add("file", fileScanTask.file().location())
.add("start", fileScanTask.start())
.add("length", fileScanTask.length())
.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,11 +646,11 @@ public void testSetCurrentAndCherryPickSnapshotId() {
private void validateTableFiles(Table tbl, DataFile... expectedFiles) {
tbl.refresh();
Set<CharSequence> expectedFilePaths =
Arrays.stream(expectedFiles).map(DataFile::path).collect(Collectors.toSet());
Arrays.stream(expectedFiles).map(DataFile::location).collect(Collectors.toSet());
Set<CharSequence> actualFilePaths =
StreamSupport.stream(tbl.newScan().planFiles().spliterator(), false)
.map(FileScanTask::file)
.map(ContentFile::path)
.map(ContentFile::location)
.collect(Collectors.toSet());
assertThat(actualFilePaths).as("Files should match").isEqualTo(expectedFilePaths);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ public static void assertEquals(ContentFile<?> expected, ContentFile<?> actual)
assertThat(actual).isNotNull();
assertThat(actual.specId()).as("SpecId").isEqualTo(expected.specId());
assertThat(actual.content()).as("Content").isEqualTo(expected.content());
assertThat(actual.path()).as("Path").isEqualTo(expected.path());
assertThat(actual.location()).as("Location").isEqualTo(expected.location());
assertThat(actual.format()).as("Format").isEqualTo(expected.format());
assertThat(actual.partition().size())
.as("Partition size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
assertThat(dataFilesRewrote).hasSize(2);
// the biggest file do not be rewrote
List rewroteDataFileNames =
dataFilesRewrote.stream().map(ContentFile::path).collect(Collectors.toList());
dataFilesRewrote.stream().map(ContentFile::location).collect(Collectors.toList());
assertThat(rewroteDataFileNames).contains(file.getAbsolutePath());

// Assert the table records as expected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private TableTestStats extractPartitionResults(TableSchemaType tableSchemaType)
for (FileScanTask scanTask : fileScanTasks) {
long recordCountInFile = scanTask.file().recordCount();

String[] splitFilePath = scanTask.file().path().toString().split("/");
String[] splitFilePath = scanTask.file().location().split("/");
// Filename example: 00007-0-a7d3a29a-33e9-4740-88f4-0f494397d60c-00001.parquet
// Writer ID: .......^^^^^
String filename = splitFilePath[splitFilePath.length - 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,6 @@ public void testDeleteStats() throws Exception {
new String(
deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array());
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
assumeThat(fromStat).isEqualTo(dataFile.path().toString());
assumeThat(fromStat).isEqualTo(dataFile.location());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void testDeleteStats() throws Exception {
new String(
deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array());
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
assumeThat(fromStat).isEqualTo(dataFile.path().toString());
assumeThat(fromStat).isEqualTo(dataFile.location());
}

protected void testChangeLogs(
Expand Down
Loading

0 comments on commit da53495

Please sign in to comment.