From ced897ca699d4e91ac8da6ac26bec107ee64ee74 Mon Sep 17 00:00:00 2001 From: Tom Tanaka <43331405+tomtongue@users.noreply.github.com> Date: Wed, 3 Apr 2024 19:28:29 +0900 Subject: [PATCH] Core, Data, Flink: Migrate TableTestBase related classes to JUnit5 (#10080) --- .../iceberg/util/TestTableScanUtil.java | 8 +- .../iceberg/io/TestPartitioningWriters.java | 95 +++++++++--------- .../iceberg/io/TestPositionDeltaWriters.java | 36 +++---- .../iceberg/io/TestRollingFileWriters.java | 49 +++++----- .../flink/sink/TestDeltaTaskWriter.java | 97 ++++++++----------- .../flink/sink/TestIcebergFilesCommitter.java | 55 +++++------ .../source/TestStreamingMonitorFunction.java | 16 ++- .../source/TestStreamingReaderOperator.java | 36 ++++--- .../flink/sink/TestDeltaTaskWriter.java | 97 ++++++++----------- .../flink/sink/TestIcebergFilesCommitter.java | 55 +++++------ .../source/TestStreamingMonitorFunction.java | 16 ++- .../source/TestStreamingReaderOperator.java | 36 ++++--- .../flink/sink/TestDeltaTaskWriter.java | 97 ++++++++----------- .../flink/sink/TestIcebergFilesCommitter.java | 55 +++++------ .../source/TestStreamingMonitorFunction.java | 16 ++- .../source/TestStreamingReaderOperator.java | 36 ++++--- 16 files changed, 382 insertions(+), 418 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java index d868fba90190..eb713a4d2e0b 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java @@ -19,6 +19,7 @@ package org.apache.iceberg.util; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.Arrays; import java.util.Collections; @@ -49,7 +50,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -163,13 +163,13 @@ public void testTaskGroupPlanningCorruptedOffset() { TableScanUtil.planTaskGroups(CloseableIterable.withNoopClose(baseFileScanTasks), 1, 1, 0)) { for (FileScanTask fileScanTask : task.tasks()) { DataFile taskDataFile = fileScanTask.file(); - Assertions.assertThat(taskDataFile.splitOffsets()).isNull(); + assertThat(taskDataFile.splitOffsets()).isNull(); taskCount++; } } // 10 tasks since the split offsets are ignored and there are 1 byte splits for a 10 byte file - Assertions.assertThat(taskCount).isEqualTo(10); + assertThat(taskCount).isEqualTo(10); } @Test @@ -280,7 +280,7 @@ public void testTaskGroupPlanningByPartition() { ImmutableList.of( taskWithPartition(SPEC1, PARTITION1, 128), taskWithPartition(SPEC2, PARTITION2, 128)); - Assertions.assertThatThrownBy( + assertThatThrownBy( () -> TableScanUtil.planTaskGroups(tasks2, 128, 10, 4, SPEC2.partitionType())) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Cannot find field"); diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index 88d3c16c2dca..8dc031314eda 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -19,6 +19,7 @@ package org.apache.iceberg.io; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -38,8 +39,6 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.util.StructLikeSet; -import org.assertj.core.api.Assertions; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -72,7 +71,7 @@ protected FileFormat format() { @BeforeEach public void setupTable() throws Exception { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); - Assert.assertTrue(tableDir.delete()); // created during table creation + assertThat(tableDir.delete()).isTrue(); // created during table creation this.metadataDir = new File(tableDir, "metadata"); this.table = create(SCHEMA, PartitionSpec.unpartitioned()); @@ -86,10 +85,10 @@ public void testClusteredDataWriterNoRecords() throws IOException { new ClusteredDataWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.close(); - Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + assertThat(writer.result().dataFiles()).isEmpty(); writer.close(); - Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + assertThat(writer.result().dataFiles()).isEmpty(); } @TestTemplate @@ -111,7 +110,7 @@ public void testClusteredDataWriterMultiplePartitions() throws IOException { writer.close(); DataWriteResult result = writer.result(); - Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size()); + assertThat(result.dataFiles()).hasSize(3); RowDelta rowDelta = table.newRowDelta(); result.dataFiles().forEach(rowDelta::addRows); @@ -120,7 +119,7 @@ public void testClusteredDataWriterMultiplePartitions() throws IOException { List expectedRows = ImmutableList.of( toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, "bbb"), toRow(4, "bbb"), toRow(5, "ccc")); - Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); } @TestTemplate @@ -139,8 +138,7 @@ public void testClusteredDataWriterOutOfOrderPartitions() throws IOException { writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb")); writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc")); - Assertions.assertThatThrownBy( - () -> writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"))) + assertThatThrownBy(() -> writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"))) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Encountered records that belong to already closed files") .hasMessageEndingWith("partition 'data=aaa' in spec " + spec); @@ -159,14 +157,14 @@ public void testClusteredEqualityDeleteWriterNoRecords() throws IOException { writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); } @TestTemplate @@ -222,17 +220,16 @@ public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException writer.close(); DeleteWriteResult result = writer.result(); - Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size()); - Assert.assertEquals( - "Must not reference data files", 0, writer.result().referencedDataFiles().size()); - Assert.assertFalse("Must not reference data files", writer.result().referencesDataFiles()); + assertThat(result.deleteFiles()).hasSize(3); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); RowDelta rowDelta = table.newRowDelta(); result.deleteFiles().forEach(rowDelta::addDeletes); rowDelta.commit(); List expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, "bbb"), toRow(13, "ccc")); - Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); } @TestTemplate @@ -265,13 +262,13 @@ public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() thro writer.write(toRow(5, "ccc"), identitySpec, partitionKey(identitySpec, "ccc")); writer.write(toRow(6, "ddd"), identitySpec, partitionKey(identitySpec, "ddd")); - Assertions.assertThatThrownBy( + assertThatThrownBy( () -> writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"))) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Encountered records that belong to already closed files") .hasMessageEndingWith("partition 'data=ccc' in spec " + identitySpec); - Assertions.assertThatThrownBy(() -> writer.write(toRow(7, "aaa"), unpartitionedSpec, null)) + assertThatThrownBy(() -> writer.write(toRow(7, "aaa"), unpartitionedSpec, null)) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Encountered records that belong to already closed files") .hasMessageEndingWith("spec []"); @@ -297,14 +294,14 @@ private void checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity delet writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); } @TestTemplate @@ -373,17 +370,16 @@ private void checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity d writer.close(); DeleteWriteResult result = writer.result(); - Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size()); - Assert.assertEquals( - "Must reference 3 data files", 3, writer.result().referencedDataFiles().size()); - Assert.assertTrue("Must reference data files", writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).hasSize(3); + assertThat(writer.result().referencedDataFiles()).hasSize(3); + assertThat(writer.result().referencesDataFiles()).isTrue(); RowDelta rowDelta = table.newRowDelta(); result.deleteFiles().forEach(rowDelta::addDeletes); rowDelta.commit(); List expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, "bbb"), toRow(13, "ccc")); - Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); } @TestTemplate @@ -433,7 +429,7 @@ private void checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions( identitySpec, partitionKey(identitySpec, "ddd")); - Assertions.assertThatThrownBy( + assertThatThrownBy( () -> { PositionDelete positionDelete = positionDelete("file-5.parquet", 1L, null); writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc")); @@ -442,7 +438,7 @@ private void checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions( .hasMessageContaining("Encountered records that belong to already closed files") .hasMessageEndingWith("partition 'data=ccc' in spec " + identitySpec); - Assertions.assertThatThrownBy( + assertThatThrownBy( () -> { PositionDelete positionDelete = positionDelete("file-1.parquet", 3L, null); writer.write(positionDelete, unpartitionedSpec, null); @@ -516,10 +512,10 @@ public void testFanoutDataWriterNoRecords() throws IOException { new FanoutDataWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.close(); - Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + assertThat(writer.result().dataFiles()).isEmpty(); writer.close(); - Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + assertThat(writer.result().dataFiles()).isEmpty(); } @TestTemplate @@ -541,7 +537,7 @@ public void testFanoutDataWriterMultiplePartitions() throws IOException { writer.close(); DataWriteResult result = writer.result(); - Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size()); + assertThat(result.dataFiles()).hasSize(3); RowDelta rowDelta = table.newRowDelta(); result.dataFiles().forEach(rowDelta::addRows); @@ -550,7 +546,7 @@ public void testFanoutDataWriterMultiplePartitions() throws IOException { List expectedRows = ImmutableList.of( toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, "bbb"), toRow(4, "bbb"), toRow(5, "ccc")); - Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); } @TestTemplate @@ -571,14 +567,14 @@ private void checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity dele writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); } @TestTemplate @@ -657,17 +653,16 @@ private void checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords( writer.close(); DeleteWriteResult result = writer.result(); - Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size()); - Assert.assertEquals( - "Must reference 3 data files", 3, writer.result().referencedDataFiles().size()); - Assert.assertTrue("Must reference data files", writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).hasSize(3); + assertThat(writer.result().referencedDataFiles()).hasSize(3); + assertThat(writer.result().referencesDataFiles()).isTrue(); RowDelta rowDelta = table.newRowDelta(); result.deleteFiles().forEach(rowDelta::addDeletes); rowDelta.commit(); List expectedRows = ImmutableList.of(toRow(12, "bbb")); - Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); } @TestTemplate diff --git a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java index 2de7bf7324c0..177982a59cb3 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.io; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -34,7 +36,6 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.util.StructLikeSet; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -67,7 +68,7 @@ protected FileFormat format() { @BeforeEach public void setupTable() throws Exception { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); - Assert.assertTrue(tableDir.delete()); // created during table creation + assertThat(tableDir.delete()).isTrue(); // created during table creation this.metadataDir = new File(tableDir, "metadata"); this.table = create(SCHEMA, PartitionSpec.unpartitioned()); @@ -94,9 +95,9 @@ public void testPositionDeltaWithOneDataWriter() throws IOException { DeleteFile[] deleteFiles = result.deleteFiles(); CharSequence[] referencedDataFiles = result.referencedDataFiles(); - Assert.assertEquals("Must be 1 data files", 1, dataFiles.length); - Assert.assertEquals("Must be no delete files", 0, deleteFiles.length); - Assert.assertEquals("Must not reference data files", 0, referencedDataFiles.length); + assertThat(dataFiles).hasSize(1); + assertThat(deleteFiles).isEmpty(); + assertThat(referencedDataFiles).isEmpty(); } @TestTemplate @@ -121,9 +122,9 @@ public void testPositionDeltaInsertOnly() throws IOException { DeleteFile[] deleteFiles = result.deleteFiles(); CharSequence[] referencedDataFiles = result.referencedDataFiles(); - Assert.assertEquals("Must be 1 data files", 1, dataFiles.length); - Assert.assertEquals("Must be no delete files", 0, deleteFiles.length); - Assert.assertEquals("Must not reference data files", 0, referencedDataFiles.length); + assertThat(dataFiles).hasSize(1); + assertThat(deleteFiles).isEmpty(); + assertThat(referencedDataFiles).isEmpty(); RowDelta rowDelta = table.newRowDelta(); for (DataFile dataFile : dataFiles) { @@ -132,7 +133,7 @@ public void testPositionDeltaInsertOnly() throws IOException { rowDelta.commit(); List expectedRows = ImmutableList.of(toRow(1, "aaa")); - Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); } @TestTemplate @@ -177,10 +178,9 @@ public void testPositionDeltaDeleteOnly() throws IOException { DeleteFile[] deleteFiles = result.deleteFiles(); CharSequence[] referencedDataFiles = result.referencedDataFiles(); - Assert.assertEquals("Must be 0 data files", 0, dataFiles.length); - Assert.assertEquals("Must be 2 delete files", 2, deleteFiles.length); - Assert.assertEquals("Must reference 2 data files", 2, referencedDataFiles.length); - + assertThat(dataFiles).isEmpty(); + assertThat(deleteFiles).hasSize(2); + assertThat(referencedDataFiles).hasSize(2); RowDelta rowDelta = table.newRowDelta(); for (DeleteFile deleteFile : deleteFiles) { rowDelta.addDeletes(deleteFile); @@ -188,7 +188,7 @@ public void testPositionDeltaDeleteOnly() throws IOException { rowDelta.commit(); List expectedRows = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, "bbb")); - Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); } @TestTemplate @@ -234,9 +234,9 @@ public void testPositionDeltaMultipleSpecs() throws IOException { DeleteFile[] deleteFiles = result.deleteFiles(); CharSequence[] referencedDataFiles = result.referencedDataFiles(); - Assert.assertEquals("Must be 1 data files", 1, dataFiles.length); - Assert.assertEquals("Must be 2 delete files", 2, deleteFiles.length); - Assert.assertEquals("Must reference 2 data files", 2, referencedDataFiles.length); + assertThat(dataFiles).hasSize(1); + assertThat(deleteFiles).hasSize(2); + assertThat(referencedDataFiles).hasSize(2); RowDelta rowDelta = table.newRowDelta(); for (DataFile dataFile : dataFiles) { @@ -249,6 +249,6 @@ public void testPositionDeltaMultipleSpecs() throws IOException { List expectedRows = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, "bbb"), toRow(10, "ccc")); - Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); } } diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java index f6d5d41b72b3..6407fd0cbf70 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.io; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -33,7 +35,6 @@ import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -74,7 +75,7 @@ protected FileFormat format() { @BeforeEach public void setupTable() throws Exception { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); - Assert.assertTrue(tableDir.delete()); // created during table creation + assertThat(tableDir.delete()).isTrue(); // created during table creation this.metadataDir = new File(tableDir, "metadata"); @@ -97,10 +98,10 @@ public void testRollingDataWriterNoRecords() throws IOException { writerFactory, fileFactory, table.io(), DEFAULT_FILE_SIZE, table.spec(), partition); writer.close(); - Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + assertThat(writer.result().dataFiles()).isEmpty(); writer.close(); - Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + assertThat(writer.result().dataFiles()).isEmpty(); } @TestTemplate @@ -122,7 +123,7 @@ public void testRollingDataWriterSplitData() throws IOException { // call close again to ensure it is idempotent writer.close(); - Assert.assertEquals(4, writer.result().dataFiles().size()); + assertThat(writer.result().dataFiles()).hasSize(4); } @TestTemplate @@ -136,14 +137,14 @@ public void testRollingEqualityDeleteWriterNoRecords() throws IOException { writerFactory, fileFactory, table.io(), DEFAULT_FILE_SIZE, table.spec(), partition); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); } @TestTemplate @@ -168,10 +169,9 @@ public void testRollingEqualityDeleteWriterSplitDeletes() throws IOException { // call close again to ensure it is idempotent writer.close(); - DeleteWriteResult result = writer.result(); - Assert.assertEquals(4, result.deleteFiles().size()); - Assert.assertEquals(0, result.referencedDataFiles().size()); - Assert.assertFalse(result.referencesDataFiles()); + assertThat(writer.result().deleteFiles()).hasSize(4); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); } @TestTemplate @@ -182,14 +182,14 @@ public void testRollingPositionDeleteWriterNoRecords() throws IOException { writerFactory, fileFactory, table.io(), DEFAULT_FILE_SIZE, table.spec(), partition); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); writer.close(); - Assert.assertEquals(0, writer.result().deleteFiles().size()); - Assert.assertEquals(0, writer.result().referencedDataFiles().size()); - Assert.assertFalse(writer.result().referencesDataFiles()); + assertThat(writer.result().deleteFiles()).isEmpty(); + assertThat(writer.result().referencedDataFiles()).isEmpty(); + assertThat(writer.result().referencesDataFiles()).isFalse(); } @TestTemplate @@ -212,9 +212,8 @@ public void testRollingPositionDeleteWriterSplitDeletes() throws IOException { // call close again to ensure it is idempotent writer.close(); - DeleteWriteResult result = writer.result(); - Assert.assertEquals(4, result.deleteFiles().size()); - Assert.assertEquals(1, result.referencedDataFiles().size()); - Assert.assertTrue(result.referencesDataFiles()); + assertThat(writer.result().deleteFiles()).hasSize(4); + assertThat(writer.result().referencedDataFiles()).hasSize(1); + assertThat(writer.result().referencesDataFiles()).isTrue(); } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index f780c6135bee..21f3ee2c655a 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateAfter; import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateBefore; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; @@ -66,8 +67,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeSet; -import org.assertj.core.api.Assertions; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -90,7 +89,7 @@ protected static List parameters() { @BeforeEach public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); - Assert.assertTrue(tableDir.delete()); // created by table create + assertThat(tableDir.delete()).isTrue(); // created by table create this.metadataDir = new File(tableDir, "metadata"); } @@ -132,18 +131,17 @@ private void testCdcEvents(boolean partitioned) throws IOException { writer.write(createDelete(3, "ccc")); // 1 pos-delete and 1 eq-delete. WriteResult result = writer.complete(); - Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length); - Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(partitioned ? 7 : 1); + assertThat(result.deleteFiles()).hasSize(partitioned ? 3 : 1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records.", - expectedRowSet( - createRecord(1, "eee"), - createRecord(2, "ddd"), - createRecord(4, "fff"), - createRecord(5, "ggg")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet( + createRecord(1, "eee"), + createRecord(2, "ddd"), + createRecord(4, "fff"), + createRecord(5, "ggg"))); // Start the 2nd transaction. writer = taskWriterFactory.create(); @@ -160,14 +158,13 @@ private void testCdcEvents(boolean partitioned) throws IOException { writer.write(createDelete(4, "fff")); // 1 eq-delete. result = writer.complete(); - Assert.assertEquals(partitioned ? 2 : 1, result.dataFiles().length); - Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(partitioned ? 2 : 1); + assertThat(result.deleteFiles()).hasSize(partitioned ? 3 : 1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", - expectedRowSet(createRecord(1, "eee"), createRecord(5, "iii"), createRecord(6, "hhh")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet(createRecord(1, "eee"), createRecord(5, "iii"), createRecord(6, "hhh"))); } @TestTemplate @@ -194,11 +191,11 @@ private void testWritePureEqDeletes(boolean partitioned) throws IOException { writer.write(createDelete(3, "ccc")); WriteResult result = writer.complete(); - Assert.assertEquals(0, result.dataFiles().length); - Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length); + assertThat(result.dataFiles()).isEmpty(); + assertThat(result.deleteFiles()).hasSize(partitioned ? 3 : 1); commitTransaction(result); - Assert.assertEquals("Should have no record", expectedRowSet(), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(expectedRowSet()); } @TestTemplate @@ -232,14 +229,11 @@ private void testAbort(boolean partitioned) throws IOException { .filter(p -> p.toFile().isFile()) .filter(p -> !p.toString().endsWith(".crc")) .collect(Collectors.toList()); - Assert.assertEquals( - "Should have expected file count, but files are: " + files, - partitioned ? 4 : 2, - files.size()); + assertThat(files).hasSize(partitioned ? 4 : 2); writer.abort(); for (Path file : files) { - Assert.assertFalse(Files.exists(file)); + assertThat(file).doesNotExist(); } } @@ -268,14 +262,13 @@ public void testPartitionedTableWithDataAsKey() throws IOException { writer.write(createInsert(4, "ccc")); WriteResult result = writer.complete(); - Assert.assertEquals(3, result.dataFiles().length); - Assert.assertEquals(1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(3); + assertThat(result.deleteFiles()).hasSize(1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", - expectedRowSet(createRecord(2, "aaa"), createRecord(3, "bbb"), createRecord(4, "ccc")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet(createRecord(2, "aaa"), createRecord(3, "bbb"), createRecord(4, "ccc"))); // Start the 2nd transaction. writer = taskWriterFactory.create(); @@ -284,18 +277,17 @@ public void testPartitionedTableWithDataAsKey() throws IOException { writer.write(createDelete(7, "ccc")); // 1 eq-delete. result = writer.complete(); - Assert.assertEquals(2, result.dataFiles().length); - Assert.assertEquals(1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(2); + assertThat(result.deleteFiles()).hasSize(1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", - expectedRowSet( - createRecord(2, "aaa"), - createRecord(5, "aaa"), - createRecord(3, "bbb"), - createRecord(6, "bbb")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet( + createRecord(2, "aaa"), + createRecord(5, "aaa"), + createRecord(3, "bbb"), + createRecord(6, "bbb"))); } @TestTemplate @@ -312,15 +304,12 @@ public void testPartitionedTableWithDataAndIdAsKey() throws IOException { writer.write(createDelete(2, "aaa")); // 1 pos-delete. WriteResult result = writer.complete(); - Assert.assertEquals(1, result.dataFiles().length); - Assert.assertEquals(1, result.deleteFiles().length); - Assert.assertEquals( - Sets.newHashSet(FileContent.POSITION_DELETES), - Sets.newHashSet(result.deleteFiles()[0].content())); + assertThat(result.dataFiles()).hasSize(1); + assertThat(result.deleteFiles()).hasSize(1); + assertThat(result.deleteFiles()[0].content()).isEqualTo(FileContent.POSITION_DELETES); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", expectedRowSet(createRecord(1, "aaa")), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(createRecord(1, "aaa"))); } @TestTemplate @@ -361,10 +350,10 @@ public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException { WriteResult result = writer.complete(); // One data file - Assertions.assertThat(result.dataFiles().length).isEqualTo(1); + assertThat(result.dataFiles()).hasSize(1); // One eq delete file + one pos delete file - Assertions.assertThat(result.deleteFiles().length).isEqualTo(2); - Assertions.assertThat( + assertThat(result.deleteFiles()).hasSize(2); + assertThat( Arrays.stream(result.deleteFiles()) .map(ContentFile::content) .collect(Collectors.toSet())) @@ -376,7 +365,7 @@ public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException { int cutPrecisionNano = start.getNano() / 1000000 * 1000000; expectedRecord.setField("ts", start.withNano(cutPrecisionNano)); - Assertions.assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(expectedRecord)); + assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(expectedRecord)); } private void commitTransaction(WriteResult result) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 4459f224afe6..06b6bc9a977b 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.flink.sink.IcebergFilesCommitter.MAX_CONTINUOUS_EMPTY_COMMITS; import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -75,8 +76,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.ThreadPools; -import org.junit.Assert; -import org.junit.Assume; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -111,7 +110,7 @@ public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); this.metadataDir = new File(tableDir, "metadata"); - Assert.assertTrue(tableDir.delete()); + assertThat(tableDir.delete()).isTrue(); // Construct the iceberg table. table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned()); @@ -217,9 +216,8 @@ public void testCommitTxn() throws Exception { SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch); assertSnapshotSize(i); assertMaxCommittedCheckpointId(jobID, operatorId, i); - Assert.assertEquals( - TestIcebergFilesCommitter.class.getName(), - SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test")); + assertThat(SimpleDataUtil.latestSnapshot(table, branch).summary()) + .containsEntry("flink.test", TestIcebergFilesCommitter.class.getName()); } } } @@ -715,9 +713,8 @@ public void testBoundedStream() throws Exception { SimpleDataUtil.assertTableRows(table, tableRows, branch); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE); - Assert.assertEquals( - TestIcebergFilesCommitter.class.getName(), - SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test")); + assertThat(SimpleDataUtil.latestSnapshot(table, branch).summary()) + .containsEntry("flink.test", TestIcebergFilesCommitter.class.getName()); } } @@ -745,16 +742,16 @@ public void testFlinkManifests() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); - Assert.assertEquals( - "File name should have the expected pattern.", - String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), - manifestPath.getFileName().toString()); + assertThat(manifestPath.getFileName()) + .asString() + .isEqualTo( + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1)); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( createTestingManifestFile(manifestPath), table.io(), table.specs()); - Assert.assertEquals(1, dataFiles.size()); + assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); // 3. notifyCheckpointComplete for checkpoint#1 @@ -767,7 +764,9 @@ public void testFlinkManifests() throws Exception { @TestTemplate public void testDeleteFiles() throws Exception { - Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(2); long timestamp = 0; long checkpoint = 10; @@ -792,16 +791,16 @@ public void testDeleteFiles() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); - Assert.assertEquals( - "File name should have the expected pattern.", - String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), - manifestPath.getFileName().toString()); + assertThat(manifestPath.getFileName()) + .asString() + .isEqualTo( + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1)); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( createTestingManifestFile(manifestPath), table.io(), table.specs()); - Assert.assertEquals(1, dataFiles.size()); + assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); // 3. notifyCheckpointComplete for checkpoint#1 @@ -836,7 +835,9 @@ public void testDeleteFiles() throws Exception { @TestTemplate public void testCommitTwoCheckpointsInSingleTxn() throws Exception { - Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(2); long timestamp = 0; long checkpoint = 10; @@ -882,8 +883,7 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4), branch); assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); assertFlinkManifests(0); - Assert.assertEquals( - "Should have committed 2 txn.", 2, ImmutableList.copyOf(table.snapshots()).size()); + assertThat(table.snapshots()).hasSize(2); } } @@ -1047,10 +1047,7 @@ private List assertFlinkManifests(int expectedCount) throws IOException { Files.list(flinkManifestFolder.toPath()) .filter(p -> !p.toString().endsWith(".crc")) .collect(Collectors.toList()); - Assert.assertEquals( - String.format("Expected %s flink manifests, but the list is: %s", expectedCount, manifests), - expectedCount, - manifests.size()); + assertThat(manifests).hasSize(expectedCount); return manifests; } @@ -1084,12 +1081,12 @@ private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId( table, jobID.toString(), operatorID.toHexString(), branch); - Assert.assertEquals(expectedId, actualId); + assertThat(actualId).isEqualTo(expectedId); } private void assertSnapshotSize(int expectedSnapshotSize) { table.refresh(); - Assert.assertEquals(expectedSnapshotSize, Lists.newArrayList(table.snapshots()).size()); + assertThat(table.snapshots()).hasSize(expectedSnapshotSize); } private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index a612617835b0..f7b13598bc2a 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.source; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -51,9 +52,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; -import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -78,7 +77,7 @@ protected static List parameters() { public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); this.metadataDir = new File(tableDir, "metadata"); - Assert.assertTrue(tableDir.delete()); + assertThat(tableDir.delete()).isTrue(); // Construct the iceberg table. table = create(SCHEMA, PartitionSpec.unpartitioned()); @@ -249,7 +248,7 @@ public void testInvalidMaxPlanningSnapshotCount() { .monitorInterval(Duration.ofMillis(100)) .maxPlanningSnapshotCount(0) .build(); - Assertions.assertThatThrownBy(() -> createFunction(scanContext1)) + assertThatThrownBy(() -> createFunction(scanContext1)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("The max-planning-snapshot-count must be greater than zero"); @@ -259,7 +258,7 @@ public void testInvalidMaxPlanningSnapshotCount() { .maxPlanningSnapshotCount(-10) .build(); - Assertions.assertThatThrownBy(() -> createFunction(scanContext2)) + assertThatThrownBy(() -> createFunction(scanContext2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("The max-planning-snapshot-count must be greater than zero"); } @@ -282,7 +281,7 @@ public void testConsumeWithMaxPlanningSnapshotCount() throws Exception { FlinkInputSplit[] expectedSplits = FlinkSplitPlanner.planInputSplits(table, scanContext, ThreadPools.getWorkerPool()); - Assert.assertEquals("should produce 9 splits", 9, expectedSplits.length); + assertThat(expectedSplits).hasSize(9); // This covers three cases that maxPlanningSnapshotCount is less than, equal or greater than the // total splits number @@ -306,10 +305,7 @@ public void testConsumeWithMaxPlanningSnapshotCount() throws Exception { function.monitorAndForwardSplits(); if (maxPlanningSnapshotCount < 10) { - Assert.assertEquals( - "Should produce same splits as max-planning-snapshot-count", - maxPlanningSnapshotCount, - sourceContext.splits.size()); + assertThat(sourceContext.splits).hasSize(maxPlanningSnapshotCount); } } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java index f96426a59a2d..1606ee9f9648 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink.source; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -49,7 +51,6 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -73,7 +74,7 @@ protected static List parameters() { public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); this.metadataDir = new File(tableDir, "metadata"); - Assert.assertTrue(tableDir.delete()); + assertThat(tableDir.delete()).isTrue(); // Construct the iceberg table. table = create(SCHEMA, PartitionSpec.unpartitioned()); @@ -84,7 +85,7 @@ public void testProcessAllRecords() throws Exception { List> expectedRecords = generateRecordsAndCommitTxn(10); List splits = generateSplits(); - Assert.assertEquals("Should have 10 splits", 10, splits.size()); + assertThat(splits).hasSize(10); try (OneInputStreamOperatorTestHarness harness = createReader()) { harness.setup(); @@ -98,7 +99,7 @@ public void testProcessAllRecords() throws Exception { harness.processElement(splits.get(i), -1); // Run the mail-box once to read all records from the given split. - Assert.assertTrue("Should processed 1 split", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should processed 1 split").isTrue(); // Assert the output has expected elements. expected.addAll(expectedRecords.get(i)); @@ -115,7 +116,7 @@ public void testTriggerCheckpoint() throws Exception { List> expectedRecords = generateRecordsAndCommitTxn(3); List splits = generateSplits(); - Assert.assertEquals("Should have 3 splits", 3, splits.size()); + assertThat(splits).hasSize(3); long timestamp = 0; try (OneInputStreamOperatorTestHarness harness = createReader()) { @@ -131,17 +132,18 @@ public void testTriggerCheckpoint() throws Exception { // Trigger snapshot state, it will start to work once all records from split0 are read. processor.getMainMailboxExecutor().execute(() -> harness.snapshot(1, 3), "Trigger snapshot"); - Assert.assertTrue("Should have processed the split0", processor.runMailboxStep()); - Assert.assertTrue( - "Should have processed the snapshot state action", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should have processed the split0").isTrue(); + assertThat(processor.runMailboxStep()) + .as("Should have processed the snapshot state action") + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expectedRecords.get(0), SCHEMA); // Read records from split1. - Assert.assertTrue("Should have processed the split1", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should have processed the split1").isTrue(); // Read records from split2. - Assert.assertTrue("Should have processed the split2", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should have processed the split2").isTrue(); TestHelpers.assertRecords( readOutputValues(harness), Lists.newArrayList(Iterables.concat(expectedRecords)), SCHEMA); @@ -153,7 +155,7 @@ public void testCheckpointRestore() throws Exception { List> expectedRecords = generateRecordsAndCommitTxn(15); List splits = generateSplits(); - Assert.assertEquals("Should have 10 splits", 15, splits.size()); + assertThat(splits).hasSize(15); OperatorSubtaskState state; List expected = Lists.newArrayList(); @@ -170,7 +172,9 @@ public void testCheckpointRestore() throws Exception { SteppingMailboxProcessor localMailbox = createLocalMailbox(harness); for (int i = 0; i < 5; i++) { expected.addAll(expectedRecords.get(i)); - Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); + assertThat(localMailbox.runMailboxStep()) + .as("Should have processed the split#" + i) + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } @@ -190,7 +194,9 @@ public void testCheckpointRestore() throws Exception { for (int i = 5; i < 10; i++) { expected.addAll(expectedRecords.get(i)); - Assert.assertTrue("Should have processed one split#" + i, localMailbox.runMailboxStep()); + assertThat(localMailbox.runMailboxStep()) + .as("Should have processed the split#" + i) + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } @@ -200,7 +206,9 @@ public void testCheckpointRestore() throws Exception { expected.addAll(expectedRecords.get(i)); harness.processElement(splits.get(i), 1); - Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); + assertThat(localMailbox.runMailboxStep()) + .as("Should have processed the split#" + i) + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index f780c6135bee..21f3ee2c655a 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateAfter; import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateBefore; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; @@ -66,8 +67,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeSet; -import org.assertj.core.api.Assertions; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -90,7 +89,7 @@ protected static List parameters() { @BeforeEach public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); - Assert.assertTrue(tableDir.delete()); // created by table create + assertThat(tableDir.delete()).isTrue(); // created by table create this.metadataDir = new File(tableDir, "metadata"); } @@ -132,18 +131,17 @@ private void testCdcEvents(boolean partitioned) throws IOException { writer.write(createDelete(3, "ccc")); // 1 pos-delete and 1 eq-delete. WriteResult result = writer.complete(); - Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length); - Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(partitioned ? 7 : 1); + assertThat(result.deleteFiles()).hasSize(partitioned ? 3 : 1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records.", - expectedRowSet( - createRecord(1, "eee"), - createRecord(2, "ddd"), - createRecord(4, "fff"), - createRecord(5, "ggg")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet( + createRecord(1, "eee"), + createRecord(2, "ddd"), + createRecord(4, "fff"), + createRecord(5, "ggg"))); // Start the 2nd transaction. writer = taskWriterFactory.create(); @@ -160,14 +158,13 @@ private void testCdcEvents(boolean partitioned) throws IOException { writer.write(createDelete(4, "fff")); // 1 eq-delete. result = writer.complete(); - Assert.assertEquals(partitioned ? 2 : 1, result.dataFiles().length); - Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(partitioned ? 2 : 1); + assertThat(result.deleteFiles()).hasSize(partitioned ? 3 : 1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", - expectedRowSet(createRecord(1, "eee"), createRecord(5, "iii"), createRecord(6, "hhh")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet(createRecord(1, "eee"), createRecord(5, "iii"), createRecord(6, "hhh"))); } @TestTemplate @@ -194,11 +191,11 @@ private void testWritePureEqDeletes(boolean partitioned) throws IOException { writer.write(createDelete(3, "ccc")); WriteResult result = writer.complete(); - Assert.assertEquals(0, result.dataFiles().length); - Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length); + assertThat(result.dataFiles()).isEmpty(); + assertThat(result.deleteFiles()).hasSize(partitioned ? 3 : 1); commitTransaction(result); - Assert.assertEquals("Should have no record", expectedRowSet(), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(expectedRowSet()); } @TestTemplate @@ -232,14 +229,11 @@ private void testAbort(boolean partitioned) throws IOException { .filter(p -> p.toFile().isFile()) .filter(p -> !p.toString().endsWith(".crc")) .collect(Collectors.toList()); - Assert.assertEquals( - "Should have expected file count, but files are: " + files, - partitioned ? 4 : 2, - files.size()); + assertThat(files).hasSize(partitioned ? 4 : 2); writer.abort(); for (Path file : files) { - Assert.assertFalse(Files.exists(file)); + assertThat(file).doesNotExist(); } } @@ -268,14 +262,13 @@ public void testPartitionedTableWithDataAsKey() throws IOException { writer.write(createInsert(4, "ccc")); WriteResult result = writer.complete(); - Assert.assertEquals(3, result.dataFiles().length); - Assert.assertEquals(1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(3); + assertThat(result.deleteFiles()).hasSize(1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", - expectedRowSet(createRecord(2, "aaa"), createRecord(3, "bbb"), createRecord(4, "ccc")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet(createRecord(2, "aaa"), createRecord(3, "bbb"), createRecord(4, "ccc"))); // Start the 2nd transaction. writer = taskWriterFactory.create(); @@ -284,18 +277,17 @@ public void testPartitionedTableWithDataAsKey() throws IOException { writer.write(createDelete(7, "ccc")); // 1 eq-delete. result = writer.complete(); - Assert.assertEquals(2, result.dataFiles().length); - Assert.assertEquals(1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(2); + assertThat(result.deleteFiles()).hasSize(1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", - expectedRowSet( - createRecord(2, "aaa"), - createRecord(5, "aaa"), - createRecord(3, "bbb"), - createRecord(6, "bbb")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet( + createRecord(2, "aaa"), + createRecord(5, "aaa"), + createRecord(3, "bbb"), + createRecord(6, "bbb"))); } @TestTemplate @@ -312,15 +304,12 @@ public void testPartitionedTableWithDataAndIdAsKey() throws IOException { writer.write(createDelete(2, "aaa")); // 1 pos-delete. WriteResult result = writer.complete(); - Assert.assertEquals(1, result.dataFiles().length); - Assert.assertEquals(1, result.deleteFiles().length); - Assert.assertEquals( - Sets.newHashSet(FileContent.POSITION_DELETES), - Sets.newHashSet(result.deleteFiles()[0].content())); + assertThat(result.dataFiles()).hasSize(1); + assertThat(result.deleteFiles()).hasSize(1); + assertThat(result.deleteFiles()[0].content()).isEqualTo(FileContent.POSITION_DELETES); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", expectedRowSet(createRecord(1, "aaa")), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(createRecord(1, "aaa"))); } @TestTemplate @@ -361,10 +350,10 @@ public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException { WriteResult result = writer.complete(); // One data file - Assertions.assertThat(result.dataFiles().length).isEqualTo(1); + assertThat(result.dataFiles()).hasSize(1); // One eq delete file + one pos delete file - Assertions.assertThat(result.deleteFiles().length).isEqualTo(2); - Assertions.assertThat( + assertThat(result.deleteFiles()).hasSize(2); + assertThat( Arrays.stream(result.deleteFiles()) .map(ContentFile::content) .collect(Collectors.toSet())) @@ -376,7 +365,7 @@ public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException { int cutPrecisionNano = start.getNano() / 1000000 * 1000000; expectedRecord.setField("ts", start.withNano(cutPrecisionNano)); - Assertions.assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(expectedRecord)); + assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(expectedRecord)); } private void commitTransaction(WriteResult result) { diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 4459f224afe6..06b6bc9a977b 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.flink.sink.IcebergFilesCommitter.MAX_CONTINUOUS_EMPTY_COMMITS; import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -75,8 +76,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.ThreadPools; -import org.junit.Assert; -import org.junit.Assume; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -111,7 +110,7 @@ public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); this.metadataDir = new File(tableDir, "metadata"); - Assert.assertTrue(tableDir.delete()); + assertThat(tableDir.delete()).isTrue(); // Construct the iceberg table. table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned()); @@ -217,9 +216,8 @@ public void testCommitTxn() throws Exception { SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch); assertSnapshotSize(i); assertMaxCommittedCheckpointId(jobID, operatorId, i); - Assert.assertEquals( - TestIcebergFilesCommitter.class.getName(), - SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test")); + assertThat(SimpleDataUtil.latestSnapshot(table, branch).summary()) + .containsEntry("flink.test", TestIcebergFilesCommitter.class.getName()); } } } @@ -715,9 +713,8 @@ public void testBoundedStream() throws Exception { SimpleDataUtil.assertTableRows(table, tableRows, branch); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE); - Assert.assertEquals( - TestIcebergFilesCommitter.class.getName(), - SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test")); + assertThat(SimpleDataUtil.latestSnapshot(table, branch).summary()) + .containsEntry("flink.test", TestIcebergFilesCommitter.class.getName()); } } @@ -745,16 +742,16 @@ public void testFlinkManifests() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); - Assert.assertEquals( - "File name should have the expected pattern.", - String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), - manifestPath.getFileName().toString()); + assertThat(manifestPath.getFileName()) + .asString() + .isEqualTo( + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1)); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( createTestingManifestFile(manifestPath), table.io(), table.specs()); - Assert.assertEquals(1, dataFiles.size()); + assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); // 3. notifyCheckpointComplete for checkpoint#1 @@ -767,7 +764,9 @@ public void testFlinkManifests() throws Exception { @TestTemplate public void testDeleteFiles() throws Exception { - Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(2); long timestamp = 0; long checkpoint = 10; @@ -792,16 +791,16 @@ public void testDeleteFiles() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); - Assert.assertEquals( - "File name should have the expected pattern.", - String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), - manifestPath.getFileName().toString()); + assertThat(manifestPath.getFileName()) + .asString() + .isEqualTo( + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1)); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( createTestingManifestFile(manifestPath), table.io(), table.specs()); - Assert.assertEquals(1, dataFiles.size()); + assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); // 3. notifyCheckpointComplete for checkpoint#1 @@ -836,7 +835,9 @@ public void testDeleteFiles() throws Exception { @TestTemplate public void testCommitTwoCheckpointsInSingleTxn() throws Exception { - Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(2); long timestamp = 0; long checkpoint = 10; @@ -882,8 +883,7 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4), branch); assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); assertFlinkManifests(0); - Assert.assertEquals( - "Should have committed 2 txn.", 2, ImmutableList.copyOf(table.snapshots()).size()); + assertThat(table.snapshots()).hasSize(2); } } @@ -1047,10 +1047,7 @@ private List assertFlinkManifests(int expectedCount) throws IOException { Files.list(flinkManifestFolder.toPath()) .filter(p -> !p.toString().endsWith(".crc")) .collect(Collectors.toList()); - Assert.assertEquals( - String.format("Expected %s flink manifests, but the list is: %s", expectedCount, manifests), - expectedCount, - manifests.size()); + assertThat(manifests).hasSize(expectedCount); return manifests; } @@ -1084,12 +1081,12 @@ private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId( table, jobID.toString(), operatorID.toHexString(), branch); - Assert.assertEquals(expectedId, actualId); + assertThat(actualId).isEqualTo(expectedId); } private void assertSnapshotSize(int expectedSnapshotSize) { table.refresh(); - Assert.assertEquals(expectedSnapshotSize, Lists.newArrayList(table.snapshots()).size()); + assertThat(table.snapshots()).hasSize(expectedSnapshotSize); } private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 451e3552b9ac..9c4f476b02b4 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.source; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -51,9 +52,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; -import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -78,7 +77,7 @@ protected static List parameters() { public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); this.metadataDir = new File(tableDir, "metadata"); - Assert.assertTrue(tableDir.delete()); + assertThat(tableDir.delete()).isTrue(); // Construct the iceberg table. table = create(SCHEMA, PartitionSpec.unpartitioned()); @@ -250,7 +249,7 @@ public void testInvalidMaxPlanningSnapshotCount() { .maxPlanningSnapshotCount(0) .build(); - Assertions.assertThatThrownBy(() -> createFunction(scanContext1)) + assertThatThrownBy(() -> createFunction(scanContext1)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("The max-planning-snapshot-count must be greater than zero"); @@ -260,7 +259,7 @@ public void testInvalidMaxPlanningSnapshotCount() { .maxPlanningSnapshotCount(-10) .build(); - Assertions.assertThatThrownBy(() -> createFunction(scanContext2)) + assertThatThrownBy(() -> createFunction(scanContext2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("The max-planning-snapshot-count must be greater than zero"); } @@ -283,7 +282,7 @@ public void testConsumeWithMaxPlanningSnapshotCount() throws Exception { FlinkInputSplit[] expectedSplits = FlinkSplitPlanner.planInputSplits(table, scanContext, ThreadPools.getWorkerPool()); - Assert.assertEquals("should produce 9 splits", 9, expectedSplits.length); + assertThat(expectedSplits).hasSize(9); // This covers three cases that maxPlanningSnapshotCount is less than, equal or greater than the // total splits number @@ -307,10 +306,7 @@ public void testConsumeWithMaxPlanningSnapshotCount() throws Exception { function.monitorAndForwardSplits(); if (maxPlanningSnapshotCount < 10) { - Assert.assertEquals( - "Should produce same splits as max-planning-snapshot-count", - maxPlanningSnapshotCount, - sourceContext.splits.size()); + assertThat(sourceContext.splits).hasSize(maxPlanningSnapshotCount); } } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java index f96426a59a2d..1606ee9f9648 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink.source; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -49,7 +51,6 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -73,7 +74,7 @@ protected static List parameters() { public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); this.metadataDir = new File(tableDir, "metadata"); - Assert.assertTrue(tableDir.delete()); + assertThat(tableDir.delete()).isTrue(); // Construct the iceberg table. table = create(SCHEMA, PartitionSpec.unpartitioned()); @@ -84,7 +85,7 @@ public void testProcessAllRecords() throws Exception { List> expectedRecords = generateRecordsAndCommitTxn(10); List splits = generateSplits(); - Assert.assertEquals("Should have 10 splits", 10, splits.size()); + assertThat(splits).hasSize(10); try (OneInputStreamOperatorTestHarness harness = createReader()) { harness.setup(); @@ -98,7 +99,7 @@ public void testProcessAllRecords() throws Exception { harness.processElement(splits.get(i), -1); // Run the mail-box once to read all records from the given split. - Assert.assertTrue("Should processed 1 split", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should processed 1 split").isTrue(); // Assert the output has expected elements. expected.addAll(expectedRecords.get(i)); @@ -115,7 +116,7 @@ public void testTriggerCheckpoint() throws Exception { List> expectedRecords = generateRecordsAndCommitTxn(3); List splits = generateSplits(); - Assert.assertEquals("Should have 3 splits", 3, splits.size()); + assertThat(splits).hasSize(3); long timestamp = 0; try (OneInputStreamOperatorTestHarness harness = createReader()) { @@ -131,17 +132,18 @@ public void testTriggerCheckpoint() throws Exception { // Trigger snapshot state, it will start to work once all records from split0 are read. processor.getMainMailboxExecutor().execute(() -> harness.snapshot(1, 3), "Trigger snapshot"); - Assert.assertTrue("Should have processed the split0", processor.runMailboxStep()); - Assert.assertTrue( - "Should have processed the snapshot state action", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should have processed the split0").isTrue(); + assertThat(processor.runMailboxStep()) + .as("Should have processed the snapshot state action") + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expectedRecords.get(0), SCHEMA); // Read records from split1. - Assert.assertTrue("Should have processed the split1", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should have processed the split1").isTrue(); // Read records from split2. - Assert.assertTrue("Should have processed the split2", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should have processed the split2").isTrue(); TestHelpers.assertRecords( readOutputValues(harness), Lists.newArrayList(Iterables.concat(expectedRecords)), SCHEMA); @@ -153,7 +155,7 @@ public void testCheckpointRestore() throws Exception { List> expectedRecords = generateRecordsAndCommitTxn(15); List splits = generateSplits(); - Assert.assertEquals("Should have 10 splits", 15, splits.size()); + assertThat(splits).hasSize(15); OperatorSubtaskState state; List expected = Lists.newArrayList(); @@ -170,7 +172,9 @@ public void testCheckpointRestore() throws Exception { SteppingMailboxProcessor localMailbox = createLocalMailbox(harness); for (int i = 0; i < 5; i++) { expected.addAll(expectedRecords.get(i)); - Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); + assertThat(localMailbox.runMailboxStep()) + .as("Should have processed the split#" + i) + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } @@ -190,7 +194,9 @@ public void testCheckpointRestore() throws Exception { for (int i = 5; i < 10; i++) { expected.addAll(expectedRecords.get(i)); - Assert.assertTrue("Should have processed one split#" + i, localMailbox.runMailboxStep()); + assertThat(localMailbox.runMailboxStep()) + .as("Should have processed the split#" + i) + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } @@ -200,7 +206,9 @@ public void testCheckpointRestore() throws Exception { expected.addAll(expectedRecords.get(i)); harness.processElement(splits.get(i), 1); - Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); + assertThat(localMailbox.runMailboxStep()) + .as("Should have processed the split#" + i) + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index f780c6135bee..21f3ee2c655a 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateAfter; import static org.apache.iceberg.flink.SimpleDataUtil.createUpdateBefore; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; @@ -66,8 +67,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeSet; -import org.assertj.core.api.Assertions; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -90,7 +89,7 @@ protected static List parameters() { @BeforeEach public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); - Assert.assertTrue(tableDir.delete()); // created by table create + assertThat(tableDir.delete()).isTrue(); // created by table create this.metadataDir = new File(tableDir, "metadata"); } @@ -132,18 +131,17 @@ private void testCdcEvents(boolean partitioned) throws IOException { writer.write(createDelete(3, "ccc")); // 1 pos-delete and 1 eq-delete. WriteResult result = writer.complete(); - Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length); - Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(partitioned ? 7 : 1); + assertThat(result.deleteFiles()).hasSize(partitioned ? 3 : 1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records.", - expectedRowSet( - createRecord(1, "eee"), - createRecord(2, "ddd"), - createRecord(4, "fff"), - createRecord(5, "ggg")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet( + createRecord(1, "eee"), + createRecord(2, "ddd"), + createRecord(4, "fff"), + createRecord(5, "ggg"))); // Start the 2nd transaction. writer = taskWriterFactory.create(); @@ -160,14 +158,13 @@ private void testCdcEvents(boolean partitioned) throws IOException { writer.write(createDelete(4, "fff")); // 1 eq-delete. result = writer.complete(); - Assert.assertEquals(partitioned ? 2 : 1, result.dataFiles().length); - Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(partitioned ? 2 : 1); + assertThat(result.deleteFiles()).hasSize(partitioned ? 3 : 1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", - expectedRowSet(createRecord(1, "eee"), createRecord(5, "iii"), createRecord(6, "hhh")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet(createRecord(1, "eee"), createRecord(5, "iii"), createRecord(6, "hhh"))); } @TestTemplate @@ -194,11 +191,11 @@ private void testWritePureEqDeletes(boolean partitioned) throws IOException { writer.write(createDelete(3, "ccc")); WriteResult result = writer.complete(); - Assert.assertEquals(0, result.dataFiles().length); - Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length); + assertThat(result.dataFiles()).isEmpty(); + assertThat(result.deleteFiles()).hasSize(partitioned ? 3 : 1); commitTransaction(result); - Assert.assertEquals("Should have no record", expectedRowSet(), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(expectedRowSet()); } @TestTemplate @@ -232,14 +229,11 @@ private void testAbort(boolean partitioned) throws IOException { .filter(p -> p.toFile().isFile()) .filter(p -> !p.toString().endsWith(".crc")) .collect(Collectors.toList()); - Assert.assertEquals( - "Should have expected file count, but files are: " + files, - partitioned ? 4 : 2, - files.size()); + assertThat(files).hasSize(partitioned ? 4 : 2); writer.abort(); for (Path file : files) { - Assert.assertFalse(Files.exists(file)); + assertThat(file).doesNotExist(); } } @@ -268,14 +262,13 @@ public void testPartitionedTableWithDataAsKey() throws IOException { writer.write(createInsert(4, "ccc")); WriteResult result = writer.complete(); - Assert.assertEquals(3, result.dataFiles().length); - Assert.assertEquals(1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(3); + assertThat(result.deleteFiles()).hasSize(1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", - expectedRowSet(createRecord(2, "aaa"), createRecord(3, "bbb"), createRecord(4, "ccc")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet(createRecord(2, "aaa"), createRecord(3, "bbb"), createRecord(4, "ccc"))); // Start the 2nd transaction. writer = taskWriterFactory.create(); @@ -284,18 +277,17 @@ public void testPartitionedTableWithDataAsKey() throws IOException { writer.write(createDelete(7, "ccc")); // 1 eq-delete. result = writer.complete(); - Assert.assertEquals(2, result.dataFiles().length); - Assert.assertEquals(1, result.deleteFiles().length); + assertThat(result.dataFiles()).hasSize(2); + assertThat(result.deleteFiles()).hasSize(1); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", - expectedRowSet( - createRecord(2, "aaa"), - createRecord(5, "aaa"), - createRecord(3, "bbb"), - createRecord(6, "bbb")), - actualRowSet("*")); + assertThat(actualRowSet("*")) + .isEqualTo( + expectedRowSet( + createRecord(2, "aaa"), + createRecord(5, "aaa"), + createRecord(3, "bbb"), + createRecord(6, "bbb"))); } @TestTemplate @@ -312,15 +304,12 @@ public void testPartitionedTableWithDataAndIdAsKey() throws IOException { writer.write(createDelete(2, "aaa")); // 1 pos-delete. WriteResult result = writer.complete(); - Assert.assertEquals(1, result.dataFiles().length); - Assert.assertEquals(1, result.deleteFiles().length); - Assert.assertEquals( - Sets.newHashSet(FileContent.POSITION_DELETES), - Sets.newHashSet(result.deleteFiles()[0].content())); + assertThat(result.dataFiles()).hasSize(1); + assertThat(result.deleteFiles()).hasSize(1); + assertThat(result.deleteFiles()[0].content()).isEqualTo(FileContent.POSITION_DELETES); commitTransaction(result); - Assert.assertEquals( - "Should have expected records", expectedRowSet(createRecord(1, "aaa")), actualRowSet("*")); + assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(createRecord(1, "aaa"))); } @TestTemplate @@ -361,10 +350,10 @@ public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException { WriteResult result = writer.complete(); // One data file - Assertions.assertThat(result.dataFiles().length).isEqualTo(1); + assertThat(result.dataFiles()).hasSize(1); // One eq delete file + one pos delete file - Assertions.assertThat(result.deleteFiles().length).isEqualTo(2); - Assertions.assertThat( + assertThat(result.deleteFiles()).hasSize(2); + assertThat( Arrays.stream(result.deleteFiles()) .map(ContentFile::content) .collect(Collectors.toSet())) @@ -376,7 +365,7 @@ public void testEqualityColumnOnCustomPrecisionTSColumn() throws IOException { int cutPrecisionNano = start.getNano() / 1000000 * 1000000; expectedRecord.setField("ts", start.withNano(cutPrecisionNano)); - Assertions.assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(expectedRecord)); + assertThat(actualRowSet("*")).isEqualTo(expectedRowSet(expectedRecord)); } private void commitTransaction(WriteResult result) { diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 4459f224afe6..06b6bc9a977b 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.flink.sink.IcebergFilesCommitter.MAX_CONTINUOUS_EMPTY_COMMITS; import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -75,8 +76,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.ThreadPools; -import org.junit.Assert; -import org.junit.Assume; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -111,7 +110,7 @@ public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); this.metadataDir = new File(tableDir, "metadata"); - Assert.assertTrue(tableDir.delete()); + assertThat(tableDir.delete()).isTrue(); // Construct the iceberg table. table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned()); @@ -217,9 +216,8 @@ public void testCommitTxn() throws Exception { SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch); assertSnapshotSize(i); assertMaxCommittedCheckpointId(jobID, operatorId, i); - Assert.assertEquals( - TestIcebergFilesCommitter.class.getName(), - SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test")); + assertThat(SimpleDataUtil.latestSnapshot(table, branch).summary()) + .containsEntry("flink.test", TestIcebergFilesCommitter.class.getName()); } } } @@ -715,9 +713,8 @@ public void testBoundedStream() throws Exception { SimpleDataUtil.assertTableRows(table, tableRows, branch); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE); - Assert.assertEquals( - TestIcebergFilesCommitter.class.getName(), - SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test")); + assertThat(SimpleDataUtil.latestSnapshot(table, branch).summary()) + .containsEntry("flink.test", TestIcebergFilesCommitter.class.getName()); } } @@ -745,16 +742,16 @@ public void testFlinkManifests() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); - Assert.assertEquals( - "File name should have the expected pattern.", - String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), - manifestPath.getFileName().toString()); + assertThat(manifestPath.getFileName()) + .asString() + .isEqualTo( + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1)); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( createTestingManifestFile(manifestPath), table.io(), table.specs()); - Assert.assertEquals(1, dataFiles.size()); + assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); // 3. notifyCheckpointComplete for checkpoint#1 @@ -767,7 +764,9 @@ public void testFlinkManifests() throws Exception { @TestTemplate public void testDeleteFiles() throws Exception { - Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(2); long timestamp = 0; long checkpoint = 10; @@ -792,16 +791,16 @@ public void testDeleteFiles() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); - Assert.assertEquals( - "File name should have the expected pattern.", - String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), - manifestPath.getFileName().toString()); + assertThat(manifestPath.getFileName()) + .asString() + .isEqualTo( + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1)); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( createTestingManifestFile(manifestPath), table.io(), table.specs()); - Assert.assertEquals(1, dataFiles.size()); + assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); // 3. notifyCheckpointComplete for checkpoint#1 @@ -836,7 +835,9 @@ public void testDeleteFiles() throws Exception { @TestTemplate public void testCommitTwoCheckpointsInSingleTxn() throws Exception { - Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(2); long timestamp = 0; long checkpoint = 10; @@ -882,8 +883,7 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4), branch); assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); assertFlinkManifests(0); - Assert.assertEquals( - "Should have committed 2 txn.", 2, ImmutableList.copyOf(table.snapshots()).size()); + assertThat(table.snapshots()).hasSize(2); } } @@ -1047,10 +1047,7 @@ private List assertFlinkManifests(int expectedCount) throws IOException { Files.list(flinkManifestFolder.toPath()) .filter(p -> !p.toString().endsWith(".crc")) .collect(Collectors.toList()); - Assert.assertEquals( - String.format("Expected %s flink manifests, but the list is: %s", expectedCount, manifests), - expectedCount, - manifests.size()); + assertThat(manifests).hasSize(expectedCount); return manifests; } @@ -1084,12 +1081,12 @@ private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId( table, jobID.toString(), operatorID.toHexString(), branch); - Assert.assertEquals(expectedId, actualId); + assertThat(actualId).isEqualTo(expectedId); } private void assertSnapshotSize(int expectedSnapshotSize) { table.refresh(); - Assert.assertEquals(expectedSnapshotSize, Lists.newArrayList(table.snapshots()).size()); + assertThat(table.snapshots()).hasSize(expectedSnapshotSize); } private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 451e3552b9ac..9c4f476b02b4 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.source; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -51,9 +52,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; -import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -78,7 +77,7 @@ protected static List parameters() { public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); this.metadataDir = new File(tableDir, "metadata"); - Assert.assertTrue(tableDir.delete()); + assertThat(tableDir.delete()).isTrue(); // Construct the iceberg table. table = create(SCHEMA, PartitionSpec.unpartitioned()); @@ -250,7 +249,7 @@ public void testInvalidMaxPlanningSnapshotCount() { .maxPlanningSnapshotCount(0) .build(); - Assertions.assertThatThrownBy(() -> createFunction(scanContext1)) + assertThatThrownBy(() -> createFunction(scanContext1)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("The max-planning-snapshot-count must be greater than zero"); @@ -260,7 +259,7 @@ public void testInvalidMaxPlanningSnapshotCount() { .maxPlanningSnapshotCount(-10) .build(); - Assertions.assertThatThrownBy(() -> createFunction(scanContext2)) + assertThatThrownBy(() -> createFunction(scanContext2)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("The max-planning-snapshot-count must be greater than zero"); } @@ -283,7 +282,7 @@ public void testConsumeWithMaxPlanningSnapshotCount() throws Exception { FlinkInputSplit[] expectedSplits = FlinkSplitPlanner.planInputSplits(table, scanContext, ThreadPools.getWorkerPool()); - Assert.assertEquals("should produce 9 splits", 9, expectedSplits.length); + assertThat(expectedSplits).hasSize(9); // This covers three cases that maxPlanningSnapshotCount is less than, equal or greater than the // total splits number @@ -307,10 +306,7 @@ public void testConsumeWithMaxPlanningSnapshotCount() throws Exception { function.monitorAndForwardSplits(); if (maxPlanningSnapshotCount < 10) { - Assert.assertEquals( - "Should produce same splits as max-planning-snapshot-count", - maxPlanningSnapshotCount, - sourceContext.splits.size()); + assertThat(sourceContext.splits).hasSize(maxPlanningSnapshotCount); } } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java index f96426a59a2d..1606ee9f9648 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink.source; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -49,7 +51,6 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -73,7 +74,7 @@ protected static List parameters() { public void setupTable() throws IOException { this.tableDir = Files.createTempDirectory(temp, "junit").toFile(); this.metadataDir = new File(tableDir, "metadata"); - Assert.assertTrue(tableDir.delete()); + assertThat(tableDir.delete()).isTrue(); // Construct the iceberg table. table = create(SCHEMA, PartitionSpec.unpartitioned()); @@ -84,7 +85,7 @@ public void testProcessAllRecords() throws Exception { List> expectedRecords = generateRecordsAndCommitTxn(10); List splits = generateSplits(); - Assert.assertEquals("Should have 10 splits", 10, splits.size()); + assertThat(splits).hasSize(10); try (OneInputStreamOperatorTestHarness harness = createReader()) { harness.setup(); @@ -98,7 +99,7 @@ public void testProcessAllRecords() throws Exception { harness.processElement(splits.get(i), -1); // Run the mail-box once to read all records from the given split. - Assert.assertTrue("Should processed 1 split", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should processed 1 split").isTrue(); // Assert the output has expected elements. expected.addAll(expectedRecords.get(i)); @@ -115,7 +116,7 @@ public void testTriggerCheckpoint() throws Exception { List> expectedRecords = generateRecordsAndCommitTxn(3); List splits = generateSplits(); - Assert.assertEquals("Should have 3 splits", 3, splits.size()); + assertThat(splits).hasSize(3); long timestamp = 0; try (OneInputStreamOperatorTestHarness harness = createReader()) { @@ -131,17 +132,18 @@ public void testTriggerCheckpoint() throws Exception { // Trigger snapshot state, it will start to work once all records from split0 are read. processor.getMainMailboxExecutor().execute(() -> harness.snapshot(1, 3), "Trigger snapshot"); - Assert.assertTrue("Should have processed the split0", processor.runMailboxStep()); - Assert.assertTrue( - "Should have processed the snapshot state action", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should have processed the split0").isTrue(); + assertThat(processor.runMailboxStep()) + .as("Should have processed the snapshot state action") + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expectedRecords.get(0), SCHEMA); // Read records from split1. - Assert.assertTrue("Should have processed the split1", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should have processed the split1").isTrue(); // Read records from split2. - Assert.assertTrue("Should have processed the split2", processor.runMailboxStep()); + assertThat(processor.runMailboxStep()).as("Should have processed the split2").isTrue(); TestHelpers.assertRecords( readOutputValues(harness), Lists.newArrayList(Iterables.concat(expectedRecords)), SCHEMA); @@ -153,7 +155,7 @@ public void testCheckpointRestore() throws Exception { List> expectedRecords = generateRecordsAndCommitTxn(15); List splits = generateSplits(); - Assert.assertEquals("Should have 10 splits", 15, splits.size()); + assertThat(splits).hasSize(15); OperatorSubtaskState state; List expected = Lists.newArrayList(); @@ -170,7 +172,9 @@ public void testCheckpointRestore() throws Exception { SteppingMailboxProcessor localMailbox = createLocalMailbox(harness); for (int i = 0; i < 5; i++) { expected.addAll(expectedRecords.get(i)); - Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); + assertThat(localMailbox.runMailboxStep()) + .as("Should have processed the split#" + i) + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } @@ -190,7 +194,9 @@ public void testCheckpointRestore() throws Exception { for (int i = 5; i < 10; i++) { expected.addAll(expectedRecords.get(i)); - Assert.assertTrue("Should have processed one split#" + i, localMailbox.runMailboxStep()); + assertThat(localMailbox.runMailboxStep()) + .as("Should have processed the split#" + i) + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } @@ -200,7 +206,9 @@ public void testCheckpointRestore() throws Exception { expected.addAll(expectedRecords.get(i)); harness.processElement(splits.get(i), 1); - Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); + assertThat(localMailbox.runMailboxStep()) + .as("Should have processed the split#" + i) + .isTrue(); TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } }