From 7bdb2b095d55a821eae6023a966d373af55e5fe7 Mon Sep 17 00:00:00 2001 From: gang_ye Date: Mon, 28 Aug 2023 09:31:20 -0700 Subject: [PATCH] fix style --- .../iceberg/flink/sink/shuffle/AggregatedStatistics.java | 6 +++--- .../flink/sink/shuffle/DataStatisticsCoordinator.java | 7 +++---- .../iceberg/flink/sink/shuffle/DataStatisticsOperator.java | 5 ++++- .../iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java | 3 +-- .../iceberg/flink/sink/shuffle/DataStatisticsUtil.java | 7 ++++--- .../sink/shuffle/TestAggregatedStatisticsTracker.java | 6 ++++-- 6 files changed, 19 insertions(+), 15 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatistics.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatistics.java index 4a02c9651ed7..157f04b8b0ed 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatistics.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatistics.java @@ -24,9 +24,9 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** - * AggregatedStatistics is used by {@link DataStatisticsCoordinator} to collect {@link DataStatistics} - * from {@link DataStatisticsOperator} subtasks for specific checkpoint. It stores the merged {@link - * DataStatistics} result from all reported subtasks. + * AggregatedStatistics is used by {@link DataStatisticsCoordinator} to collect {@link + * DataStatistics} from {@link DataStatisticsOperator} subtasks for specific checkpoint. It stores + * the merged {@link DataStatistics} result from all reported subtasks. */ class AggregatedStatistics, S> implements Serializable { diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java index ff1a64483049..1e85c8902c50 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java @@ -46,10 +46,9 @@ /** * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all - * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregated - * data statistics back to {@link DataStatisticsOperator}. In the end a custom - * partitioner will distribute traffic based on the aggregated data statistics to improve data - * clustering. + * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregated data + * statistics back to {@link DataStatisticsOperator}. In the end a custom partitioner will + * distribute traffic based on the aggregated data statistics to improve data clustering. */ @Internal class DataStatisticsCoordinator, S> implements OperatorCoordinator { diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java index c49f12b5a49e..d00d5d2e5aa9 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java @@ -115,7 +115,10 @@ public void handleOperatorEvent(OperatorEvent event) { "Operator %s subtask %s received unexpected operator event %s", operatorName, subtaskIndex, event.getClass())); DataStatisticsEvent statisticsEvent = (DataStatisticsEvent) event; - LOG.info("Operator {} received global data event from coordinator checkpoint {}", operatorName, statisticsEvent.checkpointId()); + LOG.info( + "Operator {} received global data event from coordinator checkpoint {}", + operatorName, + statisticsEvent.checkpointId()); globalStatistics = DataStatisticsUtil.deserializeDataStatistics( statisticsEvent.statisticsBytes(), statisticsSerializer); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java index 323aee17d39f..889e85112e16 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOrRecord.java @@ -43,8 +43,7 @@ class DataStatisticsOrRecord, S> implements Seria private DataStatisticsOrRecord(DataStatistics statistics, RowData record) { Preconditions.checkArgument( - record != null ^ statistics != null, - "DataStatistics or record, not neither or both"); + record != null ^ statistics != null, "DataStatistics or record, not neither or both"); this.statistics = statistics; this.record = record; } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsUtil.java index 051805327578..2737b1346f0f 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsUtil.java @@ -77,9 +77,10 @@ static , S> byte[] serializeAggregatedStatistics( } @SuppressWarnings("unchecked") - static , S> AggregatedStatistics deserializeAggregatedStatistics( - byte[] bytes, TypeSerializer> statisticsSerializer) - throws IOException { + static , S> + AggregatedStatistics deserializeAggregatedStatistics( + byte[] bytes, TypeSerializer> statisticsSerializer) + throws IOException { ByteArrayInputStream bytesIn = new ByteArrayInputStream(bytes); ObjectInputStream in = new ObjectInputStream(bytesIn); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestAggregatedStatisticsTracker.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestAggregatedStatisticsTracker.java index 71ca00c8d6ac..34e1c21633f8 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestAggregatedStatisticsTracker.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestAggregatedStatisticsTracker.java @@ -43,7 +43,8 @@ public class TestAggregatedStatisticsTracker { private final TypeSerializer rowSerializer = new RowDataSerializer(rowType); private final TypeSerializer>> statisticsSerializer = MapDataStatisticsSerializer.fromKeySerializer(rowSerializer); - private AggregatedStatisticsTracker> aggregatedStatisticsTracker; + private AggregatedStatisticsTracker> + aggregatedStatisticsTracker; @Before public void before() throws Exception { @@ -98,7 +99,8 @@ public void receiveOlderDataStatisticEventTest() { Assert.assertNull( aggregatedStatisticsTracker.receiveDataStatisticEventAndCheckCompletion( 1, checkpoint1Subtask1DataStatisticEvent)); - // Receive event from old checkpoint, aggregatedStatisticsAggregatorTracker lastCompletedAggregator + // Receive event from old checkpoint, aggregatedStatisticsAggregatorTracker + // lastCompletedAggregator // and inProgressAggregator won't be updated Assert.assertEquals(2, aggregatedStatisticsTracker.inProgressStatistics().checkpointId()); }