Skip to content

Commit

Permalink
Flink: Fix TestIcebergSourceWithWatermarkExtractor flakiness (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary authored and devangjhabakh committed Apr 22, 2024
1 parent 53bb7fa commit eda07c5
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 93 deletions.
3 changes: 3 additions & 0 deletions flink/v1.16/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
// These artifacts are shaded and included in the orc-core fat jar
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.hive', module: 'hive-storage-api'
exclude group: 'org.slf4j'
}

testImplementation libs.flink116.connector.test.utils
Expand Down Expand Up @@ -111,6 +112,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

testImplementation libs.awaitility
Expand Down Expand Up @@ -202,6 +204,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

integrationImplementation("${libs.hive2.exec.get().module}:${libs.hive2.exec.get().getVersion()}:core") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -78,25 +78,23 @@
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
private static final int PARALLELISM = 4;
private static final String SOURCE_NAME = "IcebergSource";
private static final int RECORD_NUM_FOR_2_SPLITS = 200;
private static final ConcurrentMap<Long, Integer> windows = Maps.newConcurrentMap();

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.setConfiguration(
reporter.addToConfiguration(
// disable classloader check as Avro may cache class in the serializers.
new Configuration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false)))
.setConfiguration(reporter.addToConfiguration(new Configuration()))
.withHaLeadershipControl()
.build());

Expand Down Expand Up @@ -273,23 +271,40 @@ public void apply(
public void testThrottling() throws Exception {
GenericAppenderHelper dataAppender = appender();

// Generate records with the following pattern:
// - File 1 - Later records (Watermark 6000000)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
// - File 2 - First records (Watermark 0)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch =
// Generate records in advance

// File 1 - Later records (Watermark 6.000.000 - 100 min)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
List<Record> batch1 =
ImmutableList.of(
generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103"));
dataAppender.appendToTable(batch);

batch = Lists.newArrayListWithCapacity(100);
// File 2 - First records (Watermark 0 - 0 min)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch2 = Lists.newArrayListWithCapacity(100);
for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
batch2.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
}

dataAppender.appendToTable(batch);
// File 3 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch3 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));

// File 4 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch4 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));

// File 5 - Records which will remove the block (Watermark 5.400.000 - 90 min)
List<Record> batch5 =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Expand All @@ -304,6 +319,11 @@ public void testThrottling() throws Exception {

try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
CommonTestUtils.waitForAllTaskRunning(
miniClusterResource.getMiniCluster(), jobClient.getJobID(), false);

// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
Expand All @@ -325,18 +345,7 @@ public void testThrottling() throws Exception {

// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
List<Record> newBatch1 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));
List<Record> newBatch2 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));
dataAppender.appendToTable(
dataAppender.writeFile(newBatch1), dataAppender.writeFile(newBatch2));
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
Expand All @@ -352,10 +361,7 @@ public void testThrottling() throws Exception {
.until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65));

// Add some new records which should unblock the throttled reader
batch =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));
dataAppender.appendToTable(batch);
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

Expand Down
3 changes: 3 additions & 0 deletions flink/v1.17/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
// These artifacts are shaded and included in the orc-core fat jar
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.hive', module: 'hive-storage-api'
exclude group: 'org.slf4j'
}

testImplementation libs.flink117.connector.test.utils
Expand Down Expand Up @@ -111,6 +112,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

testImplementation libs.awaitility
Expand Down Expand Up @@ -202,6 +204,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

integrationImplementation("${libs.hive2.exec.get().module}:${libs.hive2.exec.get().getVersion()}:core") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -77,22 +79,25 @@
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
private static final int PARALLELISM = 4;
private static final String SOURCE_NAME = "IcebergSource";
private static final int RECORD_NUM_FOR_2_SPLITS = 200;
private static final ConcurrentMap<Long, Integer> windows = Maps.newConcurrentMap();

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.setConfiguration(reporter.addToConfiguration(new Configuration()))
.setConfiguration(
reporter.addToConfiguration(
new Configuration().set(PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS, true)))
.withHaLeadershipControl()
.build());

Expand Down Expand Up @@ -269,23 +274,40 @@ public void apply(
public void testThrottling() throws Exception {
GenericAppenderHelper dataAppender = appender();

// Generate records with the following pattern:
// - File 1 - Later records (Watermark 6000000)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
// - File 2 - First records (Watermark 0)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch =
// Generate records in advance

// File 1 - Later records (Watermark 6.000.000 - 100 min)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
List<Record> batch1 =
ImmutableList.of(
generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103"));
dataAppender.appendToTable(batch);

batch = Lists.newArrayListWithCapacity(100);
// File 2 - First records (Watermark 0 - 0 min)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch2 = Lists.newArrayListWithCapacity(100);
for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
batch2.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
}

dataAppender.appendToTable(batch);
// File 3 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch3 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));

// File 4 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch4 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));

// File 5 - Records which will remove the block (Watermark 5.400.000 - 90 min)
List<Record> batch5 =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Expand All @@ -300,6 +322,11 @@ public void testThrottling() throws Exception {

try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
CommonTestUtils.waitForAllTaskRunning(
miniClusterResource.getMiniCluster(), jobClient.getJobID(), false);

// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
Expand All @@ -321,18 +348,7 @@ public void testThrottling() throws Exception {

// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
List<Record> newBatch1 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));
List<Record> newBatch2 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));
dataAppender.appendToTable(
dataAppender.writeFile(newBatch1), dataAppender.writeFile(newBatch2));
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
Expand All @@ -348,10 +364,7 @@ public void testThrottling() throws Exception {
.until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65));

// Add some new records which should unblock the throttled reader
batch =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));
dataAppender.appendToTable(batch);
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

Expand Down
3 changes: 3 additions & 0 deletions flink/v1.18/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
// These artifacts are shaded and included in the orc-core fat jar
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.hive', module: 'hive-storage-api'
exclude group: 'org.slf4j'
}

testImplementation libs.flink118.connector.test.utils
Expand Down Expand Up @@ -111,6 +112,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

testImplementation libs.awaitility
Expand Down Expand Up @@ -202,6 +204,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

integrationImplementation("${libs.hive2.exec.get().module}:${libs.hive2.exec.get().getVersion()}:core") {
Expand Down
Loading

0 comments on commit eda07c5

Please sign in to comment.