diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java index 2437eb14b287..bc1808ee77da 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java @@ -18,24 +18,18 @@ */ package org.apache.iceberg.flink.source; -import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import java.io.Serializable; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -63,15 +57,12 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.HadoopTableResource; -import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.awaitility.Awaitility; -import org.junit.Assert; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -325,11 +316,6 @@ public void testThrottling() throws Exception { // Insert the first data into the table dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2)); - // Check that the read the non-blocked data - // The first RECORD_NUM_FOR_2_SPLITS should be read - // 1 or more from the runaway reader should be arrived depending on thread scheduling - waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1); - // Get the drift metric, wait for it to be created and reach the expected state // (100 min - 20 min - 0 min) // Also this validates that the WatermarkAlignment is working @@ -346,12 +332,6 @@ public void testThrottling() throws Exception { // Add some old records with 2 splits, so even if the blocked gets one split, the other reader // one gets one as well dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4)); - // The records received will highly depend on scheduling - // We minimally get 3 records from the non-blocked reader - // We might get 1 record from the blocked reader (as part of the previous batch - - // file_1-recordTs_100) - // We might get 3 records form the non-blocked reader if it gets both new splits - waitForRecords(resultIterator, 3); // Get the drift metric, wait for it to be created and reach the expected state (100 min - 20 // min - 15 min) @@ -362,8 +342,6 @@ public void testThrottling() throws Exception { // Add some new records which should unblock the throttled reader dataAppender.appendToTable(batch5); - // We should get all the records at this point - waitForRecords(resultIterator, 6); // Wait for the new drift to decrease below the allowed drift to signal the normal state Awaitility.await() @@ -397,37 +375,6 @@ protected Record generateRecord(int minutes, String str) { return record; } - protected void assertRecords( - Collection expectedRecords, CloseableIterator iterator) throws Exception { - Set expected = - expectedRecords.stream() - .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e)) - .collect(Collectors.toSet()); - Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size())); - } - - protected Set waitForRecords(CloseableIterator iterator, int num) { - Set received = Sets.newHashSetWithExpectedSize(num); - assertThat( - CompletableFuture.supplyAsync( - () -> { - int count = 0; - while (count < num && iterator.hasNext()) { - received.add(iterator.next()); - count++; - } - - if (count < num) { - throw new IllegalStateException(String.format("Fail to get %d records.", num)); - } - - return true; - })) - .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); - - return received; - } - private Optional> findAlignmentDriftMetric(JobID jobID, long withValue) { String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT; return reporter.findMetrics(jobID, metricsName).values().stream() diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java index 2437eb14b287..bc1808ee77da 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java @@ -18,24 +18,18 @@ */ package org.apache.iceberg.flink.source; -import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import java.io.Serializable; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -63,15 +57,12 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.HadoopTableResource; -import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.awaitility.Awaitility; -import org.junit.Assert; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -325,11 +316,6 @@ public void testThrottling() throws Exception { // Insert the first data into the table dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2)); - // Check that the read the non-blocked data - // The first RECORD_NUM_FOR_2_SPLITS should be read - // 1 or more from the runaway reader should be arrived depending on thread scheduling - waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1); - // Get the drift metric, wait for it to be created and reach the expected state // (100 min - 20 min - 0 min) // Also this validates that the WatermarkAlignment is working @@ -346,12 +332,6 @@ public void testThrottling() throws Exception { // Add some old records with 2 splits, so even if the blocked gets one split, the other reader // one gets one as well dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4)); - // The records received will highly depend on scheduling - // We minimally get 3 records from the non-blocked reader - // We might get 1 record from the blocked reader (as part of the previous batch - - // file_1-recordTs_100) - // We might get 3 records form the non-blocked reader if it gets both new splits - waitForRecords(resultIterator, 3); // Get the drift metric, wait for it to be created and reach the expected state (100 min - 20 // min - 15 min) @@ -362,8 +342,6 @@ public void testThrottling() throws Exception { // Add some new records which should unblock the throttled reader dataAppender.appendToTable(batch5); - // We should get all the records at this point - waitForRecords(resultIterator, 6); // Wait for the new drift to decrease below the allowed drift to signal the normal state Awaitility.await() @@ -397,37 +375,6 @@ protected Record generateRecord(int minutes, String str) { return record; } - protected void assertRecords( - Collection expectedRecords, CloseableIterator iterator) throws Exception { - Set expected = - expectedRecords.stream() - .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e)) - .collect(Collectors.toSet()); - Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size())); - } - - protected Set waitForRecords(CloseableIterator iterator, int num) { - Set received = Sets.newHashSetWithExpectedSize(num); - assertThat( - CompletableFuture.supplyAsync( - () -> { - int count = 0; - while (count < num && iterator.hasNext()) { - received.add(iterator.next()); - count++; - } - - if (count < num) { - throw new IllegalStateException(String.format("Fail to get %d records.", num)); - } - - return true; - })) - .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); - - return received; - } - private Optional> findAlignmentDriftMetric(JobID jobID, long withValue) { String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT; return reporter.findMetrics(jobID, metricsName).values().stream() diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java index 2437eb14b287..bc1808ee77da 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java @@ -18,24 +18,18 @@ */ package org.apache.iceberg.flink.source; -import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import java.io.Serializable; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -63,15 +57,12 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.HadoopTableResource; -import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.awaitility.Awaitility; -import org.junit.Assert; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -325,11 +316,6 @@ public void testThrottling() throws Exception { // Insert the first data into the table dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2)); - // Check that the read the non-blocked data - // The first RECORD_NUM_FOR_2_SPLITS should be read - // 1 or more from the runaway reader should be arrived depending on thread scheduling - waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1); - // Get the drift metric, wait for it to be created and reach the expected state // (100 min - 20 min - 0 min) // Also this validates that the WatermarkAlignment is working @@ -346,12 +332,6 @@ public void testThrottling() throws Exception { // Add some old records with 2 splits, so even if the blocked gets one split, the other reader // one gets one as well dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4)); - // The records received will highly depend on scheduling - // We minimally get 3 records from the non-blocked reader - // We might get 1 record from the blocked reader (as part of the previous batch - - // file_1-recordTs_100) - // We might get 3 records form the non-blocked reader if it gets both new splits - waitForRecords(resultIterator, 3); // Get the drift metric, wait for it to be created and reach the expected state (100 min - 20 // min - 15 min) @@ -362,8 +342,6 @@ public void testThrottling() throws Exception { // Add some new records which should unblock the throttled reader dataAppender.appendToTable(batch5); - // We should get all the records at this point - waitForRecords(resultIterator, 6); // Wait for the new drift to decrease below the allowed drift to signal the normal state Awaitility.await() @@ -397,37 +375,6 @@ protected Record generateRecord(int minutes, String str) { return record; } - protected void assertRecords( - Collection expectedRecords, CloseableIterator iterator) throws Exception { - Set expected = - expectedRecords.stream() - .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e)) - .collect(Collectors.toSet()); - Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size())); - } - - protected Set waitForRecords(CloseableIterator iterator, int num) { - Set received = Sets.newHashSetWithExpectedSize(num); - assertThat( - CompletableFuture.supplyAsync( - () -> { - int count = 0; - while (count < num && iterator.hasNext()) { - received.add(iterator.next()); - count++; - } - - if (count < num) { - throw new IllegalStateException(String.format("Fail to get %d records.", num)); - } - - return true; - })) - .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); - - return received; - } - private Optional> findAlignmentDriftMetric(JobID jobID, long withValue) { String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT; return reporter.findMetrics(jobID, metricsName).values().stream()