From 5d0d76f9ed71caee190d728d89bfa16c8df43857 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Sat, 16 Dec 2023 08:41:36 +0100 Subject: [PATCH] Flink: Fix TestIcebergSourceWithWatermarkExtractor flakiness --- flink/v1.18/build.gradle | 3 + ...stIcebergSourceWithWatermarkExtractor.java | 72 ++++++++++--------- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/flink/v1.18/build.gradle b/flink/v1.18/build.gradle index 55578d3b117d..c08ae5d8cc1f 100644 --- a/flink/v1.18/build.gradle +++ b/flink/v1.18/build.gradle @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { // These artifacts are shaded and included in the orc-core fat jar exclude group: 'com.google.protobuf', module: 'protobuf-java' exclude group: 'org.apache.hive', module: 'hive-storage-api' + exclude group: 'org.slf4j' } testImplementation libs.flink118.connector.test.utils @@ -111,6 +112,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { exclude group: 'com.tdunning', module: 'json' exclude group: 'javax.transaction', module: 'transaction-api' exclude group: 'com.zaxxer', module: 'HikariCP' + exclude group: 'org.slf4j' } testImplementation libs.awaitility @@ -202,6 +204,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") { exclude group: 'com.tdunning', module: 'json' exclude group: 'javax.transaction', module: 'transaction-api' exclude group: 'com.zaxxer', module: 'HikariCP' + exclude group: 'org.slf4j' } integrationImplementation("${libs.hive2.exec.get().module}:${libs.hive2.exec.get().getVersion()}:core") { diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java index aa4b9cd79e55..7428d30f4261 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java @@ -40,11 +40,11 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -78,7 +78,6 @@ import org.junit.rules.TemporaryFolder; public class TestIcebergSourceWithWatermarkExtractor implements Serializable { - private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); private static final int PARALLELISM = 4; private static final String SOURCE_NAME = "IcebergSource"; private static final int RECORD_NUM_FOR_2_SPLITS = 200; @@ -86,6 +85,8 @@ public class TestIcebergSourceWithWatermarkExtractor implements Serializable { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + @Rule public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( @@ -93,10 +94,7 @@ public class TestIcebergSourceWithWatermarkExtractor implements Serializable { .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(PARALLELISM) .setRpcServiceSharing(RpcServiceSharing.DEDICATED) - .setConfiguration( - reporter.addToConfiguration( - // disable classloader check as Avro may cache class in the serializers. - new Configuration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false))) + .setConfiguration(reporter.addToConfiguration(new Configuration())) .withHaLeadershipControl() .build()); @@ -273,23 +271,40 @@ public void apply( public void testThrottling() throws Exception { GenericAppenderHelper dataAppender = appender(); - // Generate records with the following pattern: - // - File 1 - Later records (Watermark 6000000) - // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") - // - File 2 - First records (Watermark 0) - // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... - // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... - List batch = + // Generate records in advance + + // File 1 - Later records (Watermark 6.000.000 - 100 min) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + List batch1 = ImmutableList.of( generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103")); - dataAppender.appendToTable(batch); - batch = Lists.newArrayListWithCapacity(100); + // File 2 - First records (Watermark 0 - 0 min) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + List batch2 = Lists.newArrayListWithCapacity(100); for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { - batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i)); + batch2.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i)); } - dataAppender.appendToTable(batch); + // File 3 - Some records will be blocked (Watermark 900.000 - 15 min) + List batch3 = + ImmutableList.of( + generateRecord(15, "file_3-recordTs_15"), + generateRecord(16, "file_3-recordTs_16"), + generateRecord(17, "file_3-recordTs_17")); + + // File 4 - Some records will be blocked (Watermark 900.000 - 15 min) + List batch4 = + ImmutableList.of( + generateRecord(15, "file_4-recordTs_15"), + generateRecord(16, "file_4-recordTs_16"), + generateRecord(17, "file_4-recordTs_17")); + + // File 5 - Records which will remove the block (Watermark 5.400.000 - 90 min) + List batch5 = + ImmutableList.of( + generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91")); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); @@ -304,6 +319,11 @@ public void testThrottling() throws Exception { try (CloseableIterator resultIterator = stream.collectAsync()) { JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test"); + CommonTestUtils.waitForAllTaskRunning( + miniClusterResource.getMiniCluster(), jobClient.getJobID(), false); + + // Insert the first data into the table + dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2)); // Check that the read the non-blocked data // The first RECORD_NUM_FOR_2_SPLITS should be read @@ -325,18 +345,7 @@ public void testThrottling() throws Exception { // Add some old records with 2 splits, so even if the blocked gets one split, the other reader // one gets one as well - List newBatch1 = - ImmutableList.of( - generateRecord(15, "file_3-recordTs_15"), - generateRecord(16, "file_3-recordTs_16"), - generateRecord(17, "file_3-recordTs_17")); - List newBatch2 = - ImmutableList.of( - generateRecord(15, "file_4-recordTs_15"), - generateRecord(16, "file_4-recordTs_16"), - generateRecord(17, "file_4-recordTs_17")); - dataAppender.appendToTable( - dataAppender.writeFile(newBatch1), dataAppender.writeFile(newBatch2)); + dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4)); // The records received will highly depend on scheduling // We minimally get 3 records from the non-blocked reader // We might get 1 record from the blocked reader (as part of the previous batch - @@ -352,10 +361,7 @@ public void testThrottling() throws Exception { .until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65)); // Add some new records which should unblock the throttled reader - batch = - ImmutableList.of( - generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91")); - dataAppender.appendToTable(batch); + dataAppender.appendToTable(batch5); // We should get all the records at this point waitForRecords(resultIterator, 6);