diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index 2537f5172b8b..5ff796e95827 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -314,12 +314,12 @@ void update(ContentFile file, Snapshot snapshot) { this.dataFileSizeInBytes += file.fileSizeInBytes(); break; case POSITION_DELETES: - this.posDeleteRecordCount = file.recordCount(); + this.posDeleteRecordCount += file.recordCount(); this.posDeleteFileCount += 1; this.specId = file.specId(); break; case EQUALITY_DELETES: - this.eqDeleteRecordCount = file.recordCount(); + this.eqDeleteRecordCount += file.recordCount(); this.eqDeleteFileCount += 1; this.specId = file.specId(); break; diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 143488176fce..3c52652748ef 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1602,9 +1602,15 @@ public void testPartitionsTableDeleteStats() { Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table partitionsTable = loadTable(tableIdentifier, "partitions"); Dataset df1 = - spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")), + SimpleRecord.class); Dataset df2 = - spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class); + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")), + SimpleRecord.class); df1.select("id", "data") .write() @@ -1624,8 +1630,9 @@ public void testPartitionsTableDeleteStats() { // test position deletes table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); - DeleteFile deleteFile = writePosDeleteFile(table); - table.newRowDelta().addDeletes(deleteFile).commit(); + DeleteFile deleteFile1 = writePosDeleteFile(table, 0); + DeleteFile deleteFile2 = writePosDeleteFile(table, 1); + table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit(); table.refresh(); long posDeleteCommitId = table.currentSnapshot().snapshotId(); @@ -1648,7 +1655,7 @@ public void testPartitionsTableDeleteStats() { expected.add( builder .set("partition", partitionBuilder.set("id", 1).build()) - .set("record_count", 1L) + .set("record_count", 3L) .set("file_count", 1) .set( "total_data_file_size_in_bytes", @@ -1664,13 +1671,13 @@ public void testPartitionsTableDeleteStats() { expected.add( builder .set("partition", partitionBuilder.set("id", 2).build()) - .set("record_count", 1L) + .set("record_count", 3L) .set("file_count", 1) .set( "total_data_file_size_in_bytes", totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) - .set("position_delete_record_count", 1L) // should be incremented now - .set("position_delete_file_count", 1) // should be incremented now + .set("position_delete_record_count", 2L) // should be incremented now + .set("position_delete_file_count", 2) // should be incremented now .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) .set("spec_id", 0) @@ -1684,8 +1691,9 @@ public void testPartitionsTableDeleteStats() { } // test equality delete - DeleteFile eqDeleteFile = writeEqDeleteFile(table); - table.newRowDelta().addDeletes(eqDeleteFile).commit(); + DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d"); + DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f"); + table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit(); table.refresh(); long eqDeleteCommitId = table.currentSnapshot().snapshotId(); actual = @@ -1701,13 +1709,12 @@ public void testPartitionsTableDeleteStats() { 0, builder .set("partition", partitionBuilder.set("id", 1).build()) - .set("record_count", 1L) + .set("record_count", 3L) .set("file_count", 1) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) - .set("equality_delete_record_count", 1L) // should be incremented now - .set("equality_delete_file_count", 1) // should be incremented now - .set("spec_id", 0) + .set("equality_delete_record_count", 2L) // should be incremented now + .set("equality_delete_file_count", 2) // should be incremented now .set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", eqDeleteCommitId) .build()); @@ -2240,22 +2247,26 @@ private DeleteFile writePositionDeletes( } private DeleteFile writePosDeleteFile(Table table) { + return writePosDeleteFile(table, 0L); + } + + private DeleteFile writePosDeleteFile(Table table, long pos) { DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null); PartitionSpec dataFileSpec = table.specs().get(dataFile.specId()); StructLike dataFilePartition = dataFile.partition(); PositionDelete delete = PositionDelete.create(); - delete.set(dataFile.path(), 0L, null); + delete.set(dataFile.path(), pos, null); return writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete)); } - private DeleteFile writeEqDeleteFile(Table table) { + private DeleteFile writeEqDeleteFile(Table table, String dataValue) { List deletes = Lists.newArrayList(); - Schema deleteRowSchema = SCHEMA.select("id"); + Schema deleteRowSchema = SCHEMA.select("data"); Record delete = GenericRecord.create(deleteRowSchema); - deletes.add(delete.copy("id", 1)); + deletes.add(delete.copy("data", dataValue)); try { return FileHelpers.writeDeleteFile( table, diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 486713e52e30..ebd0933a9566 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1600,9 +1600,15 @@ public void testPartitionsTableDeleteStats() { Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table partitionsTable = loadTable(tableIdentifier, "partitions"); Dataset df1 = - spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")), + SimpleRecord.class); Dataset df2 = - spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class); + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")), + SimpleRecord.class); df1.select("id", "data") .write() @@ -1622,8 +1628,9 @@ public void testPartitionsTableDeleteStats() { // test position deletes table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); - DeleteFile deleteFile = writePosDeleteFile(table); - table.newRowDelta().addDeletes(deleteFile).commit(); + DeleteFile deleteFile1 = writePosDeleteFile(table, 0); + DeleteFile deleteFile2 = writePosDeleteFile(table, 1); + table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit(); table.refresh(); long posDeleteCommitId = table.currentSnapshot().snapshotId(); @@ -1646,7 +1653,7 @@ public void testPartitionsTableDeleteStats() { expected.add( builder .set("partition", partitionBuilder.set("id", 1).build()) - .set("record_count", 1L) + .set("record_count", 3L) .set("file_count", 1) .set( "total_data_file_size_in_bytes", @@ -1662,13 +1669,13 @@ public void testPartitionsTableDeleteStats() { expected.add( builder .set("partition", partitionBuilder.set("id", 2).build()) - .set("record_count", 1L) + .set("record_count", 3L) .set("file_count", 1) .set( "total_data_file_size_in_bytes", totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) - .set("position_delete_record_count", 1L) // should be incremented now - .set("position_delete_file_count", 1) // should be incremented now + .set("position_delete_record_count", 2L) // should be incremented now + .set("position_delete_file_count", 2) // should be incremented now .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) .set("spec_id", 0) @@ -1682,8 +1689,9 @@ public void testPartitionsTableDeleteStats() { } // test equality delete - DeleteFile eqDeleteFile = writeEqDeleteFile(table); - table.newRowDelta().addDeletes(eqDeleteFile).commit(); + DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d"); + DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f"); + table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit(); table.refresh(); long eqDeleteCommitId = table.currentSnapshot().snapshotId(); actual = @@ -1699,12 +1707,12 @@ public void testPartitionsTableDeleteStats() { 0, builder .set("partition", partitionBuilder.set("id", 1).build()) - .set("record_count", 1L) + .set("record_count", 3L) .set("file_count", 1) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) - .set("equality_delete_record_count", 1L) // should be incremented now - .set("equality_delete_file_count", 1) // should be incremented now + .set("equality_delete_record_count", 2L) // should be incremented now + .set("equality_delete_file_count", 2) // should be incremented now .set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", eqDeleteCommitId) .build()); @@ -2237,22 +2245,26 @@ private DeleteFile writePositionDeletes( } private DeleteFile writePosDeleteFile(Table table) { + return writePosDeleteFile(table, 0L); + } + + private DeleteFile writePosDeleteFile(Table table, long pos) { DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null); PartitionSpec dataFileSpec = table.specs().get(dataFile.specId()); StructLike dataFilePartition = dataFile.partition(); PositionDelete delete = PositionDelete.create(); - delete.set(dataFile.path(), 0L, null); + delete.set(dataFile.path(), pos, null); return writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete)); } - private DeleteFile writeEqDeleteFile(Table table) { + private DeleteFile writeEqDeleteFile(Table table, String dataValue) { List deletes = Lists.newArrayList(); - Schema deleteRowSchema = SCHEMA.select("id"); + Schema deleteRowSchema = SCHEMA.select("data"); Record delete = GenericRecord.create(deleteRowSchema); - deletes.add(delete.copy("id", 1)); + deletes.add(delete.copy("data", dataValue)); try { return FileHelpers.writeDeleteFile( table, diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 29ccba5a27c7..4f585eee51f4 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1602,9 +1602,15 @@ public void testPartitionsTableDeleteStats() { Table table = createTable(tableIdentifier, SCHEMA, SPEC); Table partitionsTable = loadTable(tableIdentifier, "partitions"); Dataset df1 = - spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")), + SimpleRecord.class); Dataset df2 = - spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class); + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")), + SimpleRecord.class); df1.select("id", "data") .write() @@ -1624,8 +1630,9 @@ public void testPartitionsTableDeleteStats() { // test position deletes table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); - DeleteFile deleteFile = writePosDeleteFile(table); - table.newRowDelta().addDeletes(deleteFile).commit(); + DeleteFile deleteFile1 = writePosDeleteFile(table, 0); + DeleteFile deleteFile2 = writePosDeleteFile(table, 1); + table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit(); table.refresh(); long posDeleteCommitId = table.currentSnapshot().snapshotId(); @@ -1648,7 +1655,7 @@ public void testPartitionsTableDeleteStats() { expected.add( builder .set("partition", partitionBuilder.set("id", 1).build()) - .set("record_count", 1L) + .set("record_count", 3L) .set("file_count", 1) .set( "total_data_file_size_in_bytes", @@ -1664,13 +1671,13 @@ public void testPartitionsTableDeleteStats() { expected.add( builder .set("partition", partitionBuilder.set("id", 2).build()) - .set("record_count", 1L) + .set("record_count", 3L) .set("file_count", 1) .set( "total_data_file_size_in_bytes", totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) - .set("position_delete_record_count", 1L) // should be incremented now - .set("position_delete_file_count", 1) // should be incremented now + .set("position_delete_record_count", 2L) // should be incremented now + .set("position_delete_file_count", 2) // should be incremented now .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) .set("spec_id", 0) @@ -1684,8 +1691,9 @@ public void testPartitionsTableDeleteStats() { } // test equality delete - DeleteFile eqDeleteFile = writeEqDeleteFile(table); - table.newRowDelta().addDeletes(eqDeleteFile).commit(); + DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d"); + DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f"); + table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit(); table.refresh(); long eqDeleteCommitId = table.currentSnapshot().snapshotId(); actual = @@ -1701,12 +1709,12 @@ public void testPartitionsTableDeleteStats() { 0, builder .set("partition", partitionBuilder.set("id", 1).build()) - .set("record_count", 1L) + .set("record_count", 3L) .set("file_count", 1) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) - .set("equality_delete_record_count", 1L) // should be incremented now - .set("equality_delete_file_count", 1) // should be incremented now + .set("equality_delete_record_count", 2L) // should be incremented now + .set("equality_delete_file_count", 2) // should be incremented now .set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", eqDeleteCommitId) .build()); @@ -2262,22 +2270,26 @@ private DeleteFile writePositionDeletes( } private DeleteFile writePosDeleteFile(Table table) { + return writePosDeleteFile(table, 0L); + } + + private DeleteFile writePosDeleteFile(Table table, long pos) { DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null); PartitionSpec dataFileSpec = table.specs().get(dataFile.specId()); StructLike dataFilePartition = dataFile.partition(); PositionDelete delete = PositionDelete.create(); - delete.set(dataFile.path(), 0L, null); + delete.set(dataFile.path(), pos, null); return writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete)); } - private DeleteFile writeEqDeleteFile(Table table) { + private DeleteFile writeEqDeleteFile(Table table, String dataValue) { List deletes = Lists.newArrayList(); - Schema deleteRowSchema = SCHEMA.select("id"); + Schema deleteRowSchema = SCHEMA.select("data"); Record delete = GenericRecord.create(deleteRowSchema); - deletes.add(delete.copy("id", 1)); + deletes.add(delete.copy("data", dataValue)); try { return FileHelpers.writeDeleteFile( table,