Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
gang_ye committed Aug 28, 2023
1 parent 6107ac3 commit 7bdb2b0
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* AggregatedStatistics is used by {@link DataStatisticsCoordinator} to collect {@link DataStatistics}
* from {@link DataStatisticsOperator} subtasks for specific checkpoint. It stores the merged {@link
* DataStatistics} result from all reported subtasks.
* AggregatedStatistics is used by {@link DataStatisticsCoordinator} to collect {@link
* DataStatistics} from {@link DataStatisticsOperator} subtasks for specific checkpoint. It stores
* the merged {@link DataStatistics} result from all reported subtasks.
*/
class AggregatedStatistics<D extends DataStatistics<D, S>, S> implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@
/**
* DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
* DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all
* subtasks data statistics completes, DataStatisticsCoordinator will send the aggregated
* data statistics back to {@link DataStatisticsOperator}. In the end a custom
* partitioner will distribute traffic based on the aggregated data statistics to improve data
* clustering.
* subtasks data statistics completes, DataStatisticsCoordinator will send the aggregated data
* statistics back to {@link DataStatisticsOperator}. In the end a custom partitioner will
* distribute traffic based on the aggregated data statistics to improve data clustering.
*/
@Internal
class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements OperatorCoordinator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ public void handleOperatorEvent(OperatorEvent event) {
"Operator %s subtask %s received unexpected operator event %s",
operatorName, subtaskIndex, event.getClass()));
DataStatisticsEvent<D, S> statisticsEvent = (DataStatisticsEvent<D, S>) event;
LOG.info("Operator {} received global data event from coordinator checkpoint {}", operatorName, statisticsEvent.checkpointId());
LOG.info(
"Operator {} received global data event from coordinator checkpoint {}",
operatorName,
statisticsEvent.checkpointId());
globalStatistics =
DataStatisticsUtil.deserializeDataStatistics(
statisticsEvent.statisticsBytes(), statisticsSerializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ class DataStatisticsOrRecord<D extends DataStatistics<D, S>, S> implements Seria

private DataStatisticsOrRecord(DataStatistics<D, S> statistics, RowData record) {
Preconditions.checkArgument(
record != null ^ statistics != null,
"DataStatistics or record, not neither or both");
record != null ^ statistics != null, "DataStatistics or record, not neither or both");
this.statistics = statistics;
this.record = record;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ static <D extends DataStatistics<D, S>, S> byte[] serializeAggregatedStatistics(
}

@SuppressWarnings("unchecked")
static <D extends DataStatistics<D, S>, S> AggregatedStatistics<D, S> deserializeAggregatedStatistics(
byte[] bytes, TypeSerializer<DataStatistics<D, S>> statisticsSerializer)
throws IOException {
static <D extends DataStatistics<D, S>, S>
AggregatedStatistics<D, S> deserializeAggregatedStatistics(
byte[] bytes, TypeSerializer<DataStatistics<D, S>> statisticsSerializer)
throws IOException {
ByteArrayInputStream bytesIn = new ByteArrayInputStream(bytes);
ObjectInputStream in = new ObjectInputStream(bytesIn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class TestAggregatedStatisticsTracker {
private final TypeSerializer<RowData> rowSerializer = new RowDataSerializer(rowType);
private final TypeSerializer<DataStatistics<MapDataStatistics, Map<RowData, Long>>>
statisticsSerializer = MapDataStatisticsSerializer.fromKeySerializer(rowSerializer);
private AggregatedStatisticsTracker<MapDataStatistics, Map<RowData, Long>> aggregatedStatisticsTracker;
private AggregatedStatisticsTracker<MapDataStatistics, Map<RowData, Long>>
aggregatedStatisticsTracker;

@Before
public void before() throws Exception {
Expand Down Expand Up @@ -98,7 +99,8 @@ public void receiveOlderDataStatisticEventTest() {
Assert.assertNull(
aggregatedStatisticsTracker.receiveDataStatisticEventAndCheckCompletion(
1, checkpoint1Subtask1DataStatisticEvent));
// Receive event from old checkpoint, aggregatedStatisticsAggregatorTracker lastCompletedAggregator
// Receive event from old checkpoint, aggregatedStatisticsAggregatorTracker
// lastCompletedAggregator
// and inProgressAggregator won't be updated
Assert.assertEquals(2, aggregatedStatisticsTracker.inProgressStatistics().checkpointId());
}
Expand Down

0 comments on commit 7bdb2b0

Please sign in to comment.