Skip to content

Commit

Permalink
Core, Spark: Correct the delete record count for PartitionTable (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu authored and geruh committed Jan 25, 2024
1 parent 8be52a6 commit be65e0b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 54 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,12 @@ void update(ContentFile<?> file, Snapshot snapshot) {
this.dataFileSizeInBytes += file.fileSizeInBytes();
break;
case POSITION_DELETES:
this.posDeleteRecordCount = file.recordCount();
this.posDeleteRecordCount += file.recordCount();
this.posDeleteFileCount += 1;
this.specId = file.specId();
break;
case EQUALITY_DELETES:
this.eqDeleteRecordCount = file.recordCount();
this.eqDeleteRecordCount += file.recordCount();
this.eqDeleteFileCount += 1;
this.specId = file.specId();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1602,9 +1602,15 @@ public void testPartitionsTableDeleteStats() {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table partitionsTable = loadTable(tableIdentifier, "partitions");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
spark.createDataFrame(
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")),
SimpleRecord.class);
Dataset<Row> df2 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
spark.createDataFrame(
Lists.newArrayList(
new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")),
SimpleRecord.class);

df1.select("id", "data")
.write()
Expand All @@ -1624,8 +1630,9 @@ public void testPartitionsTableDeleteStats() {

// test position deletes
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
DeleteFile deleteFile = writePosDeleteFile(table);
table.newRowDelta().addDeletes(deleteFile).commit();
DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
table.refresh();
long posDeleteCommitId = table.currentSnapshot().snapshotId();

Expand All @@ -1648,7 +1655,7 @@ public void testPartitionsTableDeleteStats() {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
Expand All @@ -1664,13 +1671,13 @@ public void testPartitionsTableDeleteStats() {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 1L) // should be incremented now
.set("position_delete_file_count", 1) // should be incremented now
.set("position_delete_record_count", 2L) // should be incremented now
.set("position_delete_file_count", 2) // should be incremented now
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
Expand All @@ -1684,8 +1691,9 @@ public void testPartitionsTableDeleteStats() {
}

// test equality delete
DeleteFile eqDeleteFile = writeEqDeleteFile(table);
table.newRowDelta().addDeletes(eqDeleteFile).commit();
DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
table.refresh();
long eqDeleteCommitId = table.currentSnapshot().snapshotId();
actual =
Expand All @@ -1701,13 +1709,12 @@ public void testPartitionsTableDeleteStats() {
0,
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("record_count", 3L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 1L) // should be incremented now
.set("equality_delete_file_count", 1) // should be incremented now
.set("spec_id", 0)
.set("equality_delete_record_count", 2L) // should be incremented now
.set("equality_delete_file_count", 2) // should be incremented now
.set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
Expand Down Expand Up @@ -2240,22 +2247,26 @@ private DeleteFile writePositionDeletes(
}

private DeleteFile writePosDeleteFile(Table table) {
return writePosDeleteFile(table, 0L);
}

private DeleteFile writePosDeleteFile(Table table, long pos) {
DataFile dataFile =
Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null);
PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
StructLike dataFilePartition = dataFile.partition();

PositionDelete<InternalRow> delete = PositionDelete.create();
delete.set(dataFile.path(), 0L, null);
delete.set(dataFile.path(), pos, null);

return writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete));
}

private DeleteFile writeEqDeleteFile(Table table) {
private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
List<Record> deletes = Lists.newArrayList();
Schema deleteRowSchema = SCHEMA.select("id");
Schema deleteRowSchema = SCHEMA.select("data");
Record delete = GenericRecord.create(deleteRowSchema);
deletes.add(delete.copy("id", 1));
deletes.add(delete.copy("data", dataValue));
try {
return FileHelpers.writeDeleteFile(
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1600,9 +1600,15 @@ public void testPartitionsTableDeleteStats() {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table partitionsTable = loadTable(tableIdentifier, "partitions");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
spark.createDataFrame(
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")),
SimpleRecord.class);
Dataset<Row> df2 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
spark.createDataFrame(
Lists.newArrayList(
new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")),
SimpleRecord.class);

df1.select("id", "data")
.write()
Expand All @@ -1622,8 +1628,9 @@ public void testPartitionsTableDeleteStats() {

// test position deletes
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
DeleteFile deleteFile = writePosDeleteFile(table);
table.newRowDelta().addDeletes(deleteFile).commit();
DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
table.refresh();
long posDeleteCommitId = table.currentSnapshot().snapshotId();

Expand All @@ -1646,7 +1653,7 @@ public void testPartitionsTableDeleteStats() {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
Expand All @@ -1662,13 +1669,13 @@ public void testPartitionsTableDeleteStats() {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 1L) // should be incremented now
.set("position_delete_file_count", 1) // should be incremented now
.set("position_delete_record_count", 2L) // should be incremented now
.set("position_delete_file_count", 2) // should be incremented now
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
Expand All @@ -1682,8 +1689,9 @@ public void testPartitionsTableDeleteStats() {
}

// test equality delete
DeleteFile eqDeleteFile = writeEqDeleteFile(table);
table.newRowDelta().addDeletes(eqDeleteFile).commit();
DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
table.refresh();
long eqDeleteCommitId = table.currentSnapshot().snapshotId();
actual =
Expand All @@ -1699,12 +1707,12 @@ public void testPartitionsTableDeleteStats() {
0,
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("record_count", 3L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 1L) // should be incremented now
.set("equality_delete_file_count", 1) // should be incremented now
.set("equality_delete_record_count", 2L) // should be incremented now
.set("equality_delete_file_count", 2) // should be incremented now
.set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
Expand Down Expand Up @@ -2237,22 +2245,26 @@ private DeleteFile writePositionDeletes(
}

private DeleteFile writePosDeleteFile(Table table) {
return writePosDeleteFile(table, 0L);
}

private DeleteFile writePosDeleteFile(Table table, long pos) {
DataFile dataFile =
Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null);
PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
StructLike dataFilePartition = dataFile.partition();

PositionDelete<InternalRow> delete = PositionDelete.create();
delete.set(dataFile.path(), 0L, null);
delete.set(dataFile.path(), pos, null);

return writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete));
}

private DeleteFile writeEqDeleteFile(Table table) {
private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
List<Record> deletes = Lists.newArrayList();
Schema deleteRowSchema = SCHEMA.select("id");
Schema deleteRowSchema = SCHEMA.select("data");
Record delete = GenericRecord.create(deleteRowSchema);
deletes.add(delete.copy("id", 1));
deletes.add(delete.copy("data", dataValue));
try {
return FileHelpers.writeDeleteFile(
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1602,9 +1602,15 @@ public void testPartitionsTableDeleteStats() {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table partitionsTable = loadTable(tableIdentifier, "partitions");
Dataset<Row> df1 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
spark.createDataFrame(
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")),
SimpleRecord.class);
Dataset<Row> df2 =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
spark.createDataFrame(
Lists.newArrayList(
new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")),
SimpleRecord.class);

df1.select("id", "data")
.write()
Expand All @@ -1624,8 +1630,9 @@ public void testPartitionsTableDeleteStats() {

// test position deletes
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
DeleteFile deleteFile = writePosDeleteFile(table);
table.newRowDelta().addDeletes(deleteFile).commit();
DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
table.refresh();
long posDeleteCommitId = table.currentSnapshot().snapshotId();

Expand All @@ -1648,7 +1655,7 @@ public void testPartitionsTableDeleteStats() {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
Expand All @@ -1664,13 +1671,13 @@ public void testPartitionsTableDeleteStats() {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 1L) // should be incremented now
.set("position_delete_file_count", 1) // should be incremented now
.set("position_delete_record_count", 2L) // should be incremented now
.set("position_delete_file_count", 2) // should be incremented now
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
Expand All @@ -1684,8 +1691,9 @@ public void testPartitionsTableDeleteStats() {
}

// test equality delete
DeleteFile eqDeleteFile = writeEqDeleteFile(table);
table.newRowDelta().addDeletes(eqDeleteFile).commit();
DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
table.refresh();
long eqDeleteCommitId = table.currentSnapshot().snapshotId();
actual =
Expand All @@ -1701,12 +1709,12 @@ public void testPartitionsTableDeleteStats() {
0,
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("record_count", 3L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 1L) // should be incremented now
.set("equality_delete_file_count", 1) // should be incremented now
.set("equality_delete_record_count", 2L) // should be incremented now
.set("equality_delete_file_count", 2) // should be incremented now
.set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
Expand Down Expand Up @@ -2262,22 +2270,26 @@ private DeleteFile writePositionDeletes(
}

private DeleteFile writePosDeleteFile(Table table) {
return writePosDeleteFile(table, 0L);
}

private DeleteFile writePosDeleteFile(Table table, long pos) {
DataFile dataFile =
Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null);
PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
StructLike dataFilePartition = dataFile.partition();

PositionDelete<InternalRow> delete = PositionDelete.create();
delete.set(dataFile.path(), 0L, null);
delete.set(dataFile.path(), pos, null);

return writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete));
}

private DeleteFile writeEqDeleteFile(Table table) {
private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
List<Record> deletes = Lists.newArrayList();
Schema deleteRowSchema = SCHEMA.select("id");
Schema deleteRowSchema = SCHEMA.select("data");
Record delete = GenericRecord.create(deleteRowSchema);
deletes.add(delete.copy("id", 1));
deletes.add(delete.copy("data", dataValue));
try {
return FileHelpers.writeDeleteFile(
table,
Expand Down

0 comments on commit be65e0b

Please sign in to comment.