diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java index 1e85c8902c50..1899de9e1b2d 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java @@ -185,7 +185,9 @@ private void sendDataStatisticsToSubtasks( return null; }, - String.format("Failed to send global data statistics for checkpoint %d", checkpointId)); + String.format( + "Failed to send operator %s coordinator global data statistics for checkpoint %d", + operatorName, checkpointId)); } @Override diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsTracker.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsTracker.java new file mode 100644 index 000000000000..5691811b380f --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsTracker.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.HashSet; +import java.util.Set; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * GlobalStatisticsTracker is used by {@link DataStatisticsCoordinator} to track the in progress + * {@link GlobalStatistics} received from {@link DataStatisticsOperator} subtasks for specific + * checkpoint. + */ +@Internal +class GlobalStatisticsTracker, S> { + private static final Logger LOG = LoggerFactory.getLogger(GlobalStatisticsTracker.class); + private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 90; + private final String operatorName; + private final TypeSerializer> statisticsSerializer; + private final int parallelism; + private volatile GlobalStatistics inProgressStatistics; + private final Set inProgressSubtaskSet; + + GlobalStatisticsTracker( + String operatorName, + TypeSerializer> statisticsSerializer, + int parallelism) { + this.operatorName = operatorName; + this.statisticsSerializer = statisticsSerializer; + this.parallelism = parallelism; + this.inProgressSubtaskSet = new HashSet<>(); + } + + GlobalStatistics receiveDataStatisticEventAndCheckCompletion( + int subtask, DataStatisticsEvent event) { + long checkpointId = event.checkpointId(); + + if (inProgressStatistics != null && inProgressStatistics.checkpointId() > checkpointId) { + LOG.debug( + "Expect data statistics for operator {} checkpoint {}, but receive event from older checkpoint {}. Ignore it.", + operatorName, + inProgressStatistics.checkpointId(), + checkpointId); + return null; + } + + GlobalStatistics completedStatistics = null; + if (inProgressStatistics != null && inProgressStatistics.checkpointId() < checkpointId) { + if ((double) inProgressSubtaskSet.size() / parallelism * 100 + >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) { + completedStatistics = inProgressStatistics; + LOG.info( + "Received data statistics from {} subtasks out of total {} for operator {} at checkpoint {}. " + + "Complete data statistics aggregation as it is more than the threshold of {} percentage", + inProgressSubtaskSet.size(), + parallelism, + operatorName, + inProgressStatistics.checkpointId(), + EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE); + } else { + LOG.info( + "Received data statistics from {} subtasks out of total {} for operator {} at checkpoint {}. " + + "Aborting the incomplete aggregation for checkpoint {} " + + "and starting a new data statistics for checkpoint {}", + inProgressSubtaskSet.size(), + parallelism, + operatorName, + inProgressStatistics.checkpointId(), + EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE, + checkpointId); + } + + inProgressStatistics = null; + inProgressSubtaskSet.clear(); + } + + if (inProgressStatistics == null) { + inProgressStatistics = new GlobalStatistics<>(checkpointId, statisticsSerializer); + inProgressSubtaskSet.clear(); + } + + if (!inProgressSubtaskSet.add(subtask)) { + LOG.debug( + "Ignore duplicated data statistics from operator {} subtask {} for checkpoint {}.", + operatorName, + subtask, + checkpointId); + } else { + inProgressStatistics.mergeDataStatistic( + operatorName, + event.checkpointId(), + DataStatisticsUtil.deserializeDataStatistics( + event.statisticsBytes(), statisticsSerializer)); + } + + if (inProgressSubtaskSet.size() == parallelism) { + completedStatistics = inProgressStatistics; + LOG.info( + "Received data statistics from all {} operators {} for checkpoint {}. Return last completed aggregator {}.", + parallelism, + operatorName, + inProgressStatistics.checkpointId(), + completedStatistics.dataStatistics()); + inProgressStatistics = new GlobalStatistics<>(checkpointId + 1, statisticsSerializer); + inProgressSubtaskSet.clear(); + } + + return completedStatistics; + } + + @VisibleForTesting + GlobalStatistics inProgressStatistics() { + return inProgressStatistics; + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestAggregatedStatisticsTracker.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestAggregatedStatisticsTracker.java index 34e1c21633f8..da20b013d211 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestAggregatedStatisticsTracker.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestAggregatedStatisticsTracker.java @@ -74,7 +74,7 @@ public void receiveNewerDataStatisticEvent() { aggregatedStatisticsTracker.receiveDataStatisticEventAndCheckCompletion( 0, checkpoint2Subtask0DataStatisticEvent)); - // Checkpoint 2 is newer than checkpoint1, thus drop inProgressAggregator for checkpoint1 + // Checkpoint 2 is newer than checkpoint1, thus dropping in progress statistics for checkpoint1 Assert.assertEquals(2, aggregatedStatisticsTracker.inProgressStatistics().checkpointId()); } @@ -96,12 +96,11 @@ public void receiveOlderDataStatisticEventTest() { DataStatisticsEvent> checkpoint1Subtask1DataStatisticEvent = DataStatisticsEvent.create(1, checkpoint1Subtask1DataStatistic, statisticsSerializer); + // Receive event from old checkpoint, aggregatedStatisticsAggregatorTracker won't return + // completed statistics and in progress statistics won't be updated Assert.assertNull( aggregatedStatisticsTracker.receiveDataStatisticEventAndCheckCompletion( 1, checkpoint1Subtask1DataStatisticEvent)); - // Receive event from old checkpoint, aggregatedStatisticsAggregatorTracker - // lastCompletedAggregator - // and inProgressAggregator won't be updated Assert.assertEquals(2, aggregatedStatisticsTracker.inProgressStatistics().checkpointId()); } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinatorProvider.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinatorProvider.java index 008725eca27f..8616c58a238a 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinatorProvider.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinatorProvider.java @@ -129,7 +129,7 @@ public void testCheckpointAndReset() throws Exception { (DataStatisticsCoordinator>) coordinator.getInternalCoordinator(); Assert.assertNotEquals( - "The restored shuffle coordinator should be a different instance", + "The restored coordinator should be a different instance", restoredDataStatisticsCoordinator, dataStatisticsCoordinator); // Verify restored data statistics