From f551a60d4e9f6c3f7a5c1ff3c5d283c0999a584d Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Thu, 24 Oct 2024 17:18:53 +0200 Subject: [PATCH] Steven's comments --- .../maintenance/api/ExpireSnapshots.java | 3 +- .../api/MaintenanceTaskBuilder.java | 31 +++--- .../maintenance/api/TableMaintenance.java | 41 +++++--- .../operator/DeleteFilesProcessor.java | 20 ++-- .../maintenance/operator/LockRemover.java | 22 ++++- .../operator/TableMaintenanceMetrics.java | 6 +- .../maintenance/operator/TriggerManager.java | 33 +++---- .../maintenance/api/TestExpireSnapshots.java | 79 +++++++++------ .../maintenance/api/TestTableMaintenance.java | 74 ++++++++------ .../MetricsReporterFactoryForTests.java | 65 ++++++++++--- .../operator/OperatorTestBase.java | 5 +- .../operator/TestDeleteFilesProcessor.java | 2 +- .../maintenance/operator/TestLockRemover.java | 96 +++++++++++++------ .../operator/TestTriggerManager.java | 50 +++++----- 14 files changed, 344 insertions(+), 183 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java index c42195f9d6e8..9cde5cb173e1 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java @@ -112,7 +112,8 @@ DataStream append(DataStream trigger) { .transform( operatorName(DELETE_FILES_OPERATOR_NAME), TypeInformation.of(Void.class), - new DeleteFilesProcessor(name(), tableLoader().loadTable(), deleteBatchSize)) + new DeleteFilesProcessor( + index(), taskName(), tableLoader().loadTable(), deleteBatchSize)) .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) .setParallelism(parallelism()); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java index 647a29223020..3dffb59171e3 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java @@ -31,7 +31,8 @@ @SuppressWarnings("unchecked") public abstract class MaintenanceTaskBuilder> { private int index; - private String name; + private String taskName; + private String tableName; private TableLoader tableLoader; private String uidSuffix = null; private String slotSharingGroup = null; @@ -160,8 +161,12 @@ protected int index() { return index; } - protected String name() { - return name; + protected String taskName() { + return taskName; + } + + protected String tableName() { + return tableName; } protected TableLoader tableLoader() { @@ -190,21 +195,23 @@ TriggerEvaluator evaluator() { DataStream append( DataStream sourceStream, - int defaultTaskIndex, - String defaultTaskName, + int taskIndex, + String taskName, + String tableName, TableLoader newTableLoader, - String mainUidSuffix, - String mainSlotSharingGroup, + String defaultUidSuffix, + String defaultSlotSharingGroup, int mainParallelism) { - Preconditions.checkNotNull(defaultTaskName, "Task name should not be null"); + Preconditions.checkNotNull(taskName, "Task name should not be null"); Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null"); - this.index = defaultTaskIndex; - this.name = defaultTaskName; + this.index = taskIndex; + this.taskName = taskName; + this.tableName = tableName; this.tableLoader = newTableLoader; if (uidSuffix == null) { - uidSuffix = name + "_" + index + "_" + mainUidSuffix; + uidSuffix = this.taskName + "_" + index + "_" + defaultUidSuffix; } if (parallelism == null) { @@ -212,7 +219,7 @@ DataStream append( } if (slotSharingGroup == null) { - slotSharingGroup = mainSlotSharingGroup; + slotSharingGroup = defaultSlotSharingGroup; } return append(sourceStream); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index 149ebdb0f813..ecd958fb219f 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -52,7 +52,7 @@ /** Creates the table maintenance graph. */ public class TableMaintenance { - static final String SOURCE_OPERATOR_NAME = "Monitor source"; + static final String SOURCE_OPERATOR_NAME_PREFIX = "Monitor source for "; static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager"; static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner"; static final String FILTER_OPERATOR_NAME_PREFIX = "Filter "; @@ -217,8 +217,11 @@ public void append() throws IOException { } try (TableLoader loader = tableLoader.clone()) { + loader.open(); + String tableName = loader.loadTable().name(); DataStream triggers = - DataStreamUtils.reinterpretAsKeyedStream(changeStream(loader), unused -> true) + DataStreamUtils.reinterpretAsKeyedStream( + changeStream(tableName, loader), unused -> true) .process( new TriggerManager( loader, @@ -240,18 +243,25 @@ public void append() throws IOException { // Add the specific tasks DataStream unioned = null; for (int i = 0; i < taskBuilders.size(); ++i) { - int finalIndex = i; + int taskIndex = i; DataStream filtered = triggers - .filter(t -> t.taskId() != null && t.taskId() == finalIndex) - .name(FILTER_OPERATOR_NAME_PREFIX + i) + .filter(t -> t.taskId() != null && t.taskId() == taskIndex) + .name(FILTER_OPERATOR_NAME_PREFIX + taskIndex) .forceNonParallel() - .uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix) + .uid(FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" + uidSuffix) .slotSharingGroup(slotSharingGroup); - MaintenanceTaskBuilder builder = taskBuilders.get(i); + MaintenanceTaskBuilder builder = taskBuilders.get(taskIndex); DataStream result = builder.append( - filtered, i, taskNames.get(i), loader, uidSuffix, slotSharingGroup, parallelism); + filtered, + taskIndex, + taskNames.get(taskIndex), + tableName, + loader, + uidSuffix, + slotSharingGroup, + parallelism); if (unioned == null) { unioned = result; } else { @@ -264,31 +274,32 @@ public void append() throws IOException { .transform( LOCK_REMOVER_OPERATOR_NAME, TypeInformation.of(Void.class), - new LockRemover(lockFactory, taskNames)) + new LockRemover(tableName, lockFactory, taskNames)) .forceNonParallel() .uid("lock-remover-" + uidSuffix) .slotSharingGroup(slotSharingGroup); } } - private DataStream changeStream(TableLoader loader) { + private DataStream changeStream(String tableName, TableLoader loader) { if (inputStream == null) { // Create a monitor source to provide the TableChange stream MonitorSource source = new MonitorSource( loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack); - return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME) - .uid(SOURCE_OPERATOR_NAME + uidSuffix) + return env.fromSource( + source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME_PREFIX + tableName) + .uid(SOURCE_OPERATOR_NAME_PREFIX + uidSuffix) .slotSharingGroup(slotSharingGroup) .forceNonParallel(); } else { return inputStream.global(); } } - } - private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskId) { - return String.format("%s [%d]", streamBuilder.getClass().getSimpleName(), taskId); + private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskIndex) { + return String.format("%s [%d]", streamBuilder.getClass().getSimpleName(), taskIndex); + } } @Internal diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java index c3ef059e9c46..dc7846c4c4d3 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java @@ -40,7 +40,8 @@ public class DeleteFilesProcessor extends AbstractStreamOperator implements OneInputStreamOperator { private static final Logger LOG = LoggerFactory.getLogger(DeleteFilesProcessor.class); - private final String name; + private final String taskIndex; + private final String taskName; private final SupportsBulkOperations io; private final String tableName; private final Set filesToDelete = Sets.newHashSet(); @@ -49,8 +50,8 @@ public class DeleteFilesProcessor extends AbstractStreamOperator private transient Counter failedCounter; private transient Counter succeededCounter; - public DeleteFilesProcessor(String name, Table table, int batchSize) { - Preconditions.checkNotNull(name, "Name should no be null"); + public DeleteFilesProcessor(int taskIndex, String taskName, Table table, int batchSize) { + Preconditions.checkNotNull(taskName, "Task name should no be null"); Preconditions.checkNotNull(table, "Table should no be null"); FileIO fileIO = table.io(); @@ -59,7 +60,8 @@ public DeleteFilesProcessor(String name, Table table, int batchSize) { "%s doesn't support bulk delete", fileIO.getClass().getSimpleName()); - this.name = name; + this.taskIndex = String.valueOf(taskIndex); + this.taskName = taskName; this.io = (SupportsBulkOperations) fileIO; this.tableName = table.name(); this.batchSize = batchSize; @@ -70,12 +72,18 @@ public void open() throws Exception { this.failedCounter = getRuntimeContext() .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex) .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER); this.succeededCounter = getRuntimeContext() .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex) .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java index f4cc0e8a0158..14d590162c8b 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java @@ -67,6 +67,7 @@ public class LockRemover extends AbstractStreamOperator implements OneInputStreamOperator { private static final Logger LOG = LoggerFactory.getLogger(LockRemover.class); + private final String tableName; private final TriggerLockFactory lockFactory; private final List maintenanceTaskNames; @@ -77,12 +78,14 @@ public class LockRemover extends AbstractStreamOperator private transient TriggerLockFactory.Lock recoveryLock; private transient long lastProcessedTaskStartEpoch = 0L; - public LockRemover(TriggerLockFactory lockFactory, List maintenanceTaskNames) { + public LockRemover( + String tableName, TriggerLockFactory lockFactory, List maintenanceTaskNames) { Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); Preconditions.checkArgument( maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(), "Invalid maintenance task names: null or empty"); + this.tableName = tableName; this.lockFactory = lockFactory; this.maintenanceTaskNames = maintenanceTaskNames; } @@ -94,22 +97,31 @@ public void open() throws Exception { Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); this.failedTaskResultCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); this.taskLastRunDurationMs = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); - for (String name : maintenanceTaskNames) { + for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) { succeededTaskResultCounters.add( getRuntimeContext() .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) .counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER)); failedTaskResultCounters.add( getRuntimeContext() .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) .counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER)); AtomicLong duration = new AtomicLong(0); taskLastRunDurationMs.add(duration); getRuntimeContext() .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) .gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, duration::get); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java index c57ed5092504..6147c3a5fd16 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java @@ -19,8 +19,10 @@ package org.apache.iceberg.flink.maintenance.operator; public class TableMaintenanceMetrics { - public static final String GROUP_KEY = "maintenanceTask"; - public static final String GROUP_VALUE_DEFAULT = "maintenanceTask"; + public static final String GROUP_KEY = "maintenance"; + public static final String TASK_NAME_KEY = "taskName"; + public static final String TASK_INDEX_KEY = "taskIndex"; + public static final String TABLE_NAME_KEY = "tableName"; // TriggerManager metrics public static final String RATE_LIMITER_TRIGGERED = "rateLimiterTriggered"; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java index cd20c6e011a3..a96e99d94299 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -125,30 +124,32 @@ public void open(Configuration parameters) throws Exception { this.rateLimiterTriggeredCounter = getRuntimeContext() .getMetricGroup() - .addGroup( - TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); this.concurrentRunThrottledCounter = getRuntimeContext() .getMetricGroup() - .addGroup( - TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) .counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED); this.nothingToTriggerCounter = getRuntimeContext() .getMetricGroup() - .addGroup( - TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); - this.triggerCounters = - maintenanceTaskNames.stream() - .map( - name -> - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) - .counter(TableMaintenanceMetrics.TRIGGERED)) - .collect(Collectors.toList()); + this.triggerCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); + for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) { + triggerCounters.add( + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) + .counter(TableMaintenanceMetrics.TRIGGERED)); + } this.nextEvaluationTimeState = getRuntimeContext() diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java index a105f840adc9..f80129f966e1 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java @@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; +import java.util.List; import java.util.Set; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; @@ -69,14 +70,16 @@ void testExpireSnapshots() throws Exception { .append( infra.triggerStream(), 0, - DUMMY_NAME, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, tableLoader(), "OTHER", StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 1) .sinkTo(infra.sink()); - runAndWaitForSuccess(infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(3L)); + runAndWaitForSuccess( + infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(table.name(), 3L)); table.refresh(); assertThat(Sets.newHashSet(table.snapshots())).hasSize(1); @@ -99,7 +102,8 @@ void testFailure() throws Exception { .append( infra.triggerStream(), 0, - DUMMY_NAME, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, tableLoader(), UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, @@ -131,16 +135,22 @@ void testFailure() throws Exception { // Check the metrics. There are no expired snapshots or data files because ExpireSnapshots has // no max age of number of snapshots set, so no files are removed. MetricsReporterFactoryForTests.assertCounters( - new ImmutableMap.Builder() + new ImmutableMap.Builder, Long>() .put( - DELETE_FILES_OPERATOR_NAME + "[0]." + DUMMY_NAME + "." + DELETE_FILE_FAILED_COUNTER, + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + table.name(), + DUMMY_TASK_NAME, + "0", + DELETE_FILE_FAILED_COUNTER), 0L) .put( - DELETE_FILES_OPERATOR_NAME - + "[0]." - + DUMMY_NAME - + "." - + DELETE_FILE_SUCCEEDED_COUNTER, + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + table.name(), + DUMMY_TASK_NAME, + "0", + DELETE_FILE_SUCCEEDED_COUNTER), 0L) .build()); } @@ -153,7 +163,8 @@ void testUidAndSlotSharingGroup() { .append( infra.triggerStream(), 0, - DUMMY_NAME, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, tableLoader(), UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, @@ -170,7 +181,8 @@ void testUidAndSlotSharingGroupUnset() { .append( infra.triggerStream(), 0, - DUMMY_NAME, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, tableLoader(), UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, @@ -193,45 +205,50 @@ void testMetrics() throws Exception { .append( infra.triggerStream(), 0, - DUMMY_NAME, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, tableLoader(), UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 1) .sinkTo(infra.sink()); - runAndWaitForSuccess(infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(1L)); + runAndWaitForSuccess( + infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(table.name(), 1L)); // Check the metrics Awaitility.await() .untilAsserted( () -> MetricsReporterFactoryForTests.assertCounters( - new ImmutableMap.Builder() + new ImmutableMap.Builder, Long>() .put( - DELETE_FILES_OPERATOR_NAME - + "[0]." - + DUMMY_NAME - + "." - + DELETE_FILE_FAILED_COUNTER, + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + table.name(), + DUMMY_TASK_NAME, + "0", + DELETE_FILE_FAILED_COUNTER), 0L) .put( - DELETE_FILES_OPERATOR_NAME - + "[0]." - + DUMMY_NAME - + "." - + DELETE_FILE_SUCCEEDED_COUNTER, + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + table.name(), + DUMMY_TASK_NAME, + "0", + DELETE_FILE_SUCCEEDED_COUNTER), 1L) .build())); } - private static boolean checkDeleteFinished(Long expectedDeleteNum) { + private static boolean checkDeleteFinished(String tableName, Long expectedDeleteNum) { return expectedDeleteNum.equals( MetricsReporterFactoryForTests.counter( - DELETE_FILES_OPERATOR_NAME - + "[0]." - + DUMMY_NAME - + "." - + DELETE_FILE_SUCCEEDED_COUNTER)); + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + tableName, + DUMMY_TASK_NAME, + "0", + DELETE_FILE_SUCCEEDED_COUNTER))); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java index f4c1f8380e89..0e4a72bd16f8 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java @@ -20,11 +20,10 @@ import static org.apache.iceberg.flink.SimpleDataUtil.createRowData; import static org.apache.iceberg.flink.maintenance.api.TableMaintenance.LOCK_REMOVER_OPERATOR_NAME; -import static org.apache.iceberg.flink.maintenance.api.TableMaintenance.SOURCE_OPERATOR_NAME; +import static org.apache.iceberg.flink.maintenance.api.TableMaintenance.SOURCE_OPERATOR_NAME_PREFIX; import static org.apache.iceberg.flink.maintenance.api.TableMaintenance.TRIGGER_MANAGER_OPERATOR_NAME; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.FAILED_TASK_COUNTER; -import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER; @@ -188,37 +187,58 @@ void testMetrics() throws Exception { .until( () -> MetricsReporterFactoryForTests.counter( - LOCK_REMOVER_OPERATOR_NAME + "." + TASKS[0] + "." + SUCCEEDED_TASK_COUNTER) + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, + table.name(), + TASKS[0], + "0", + SUCCEEDED_TASK_COUNTER)) .equals(2L)); MetricsReporterFactoryForTests.assertCounters( - new ImmutableMap.Builder() - .put(LOCK_REMOVER_OPERATOR_NAME + "." + TASKS[0] + "." + SUCCEEDED_TASK_COUNTER, 2L) - .put(LOCK_REMOVER_OPERATOR_NAME + "." + TASKS[0] + "." + FAILED_TASK_COUNTER, 0L) - .put(TRIGGER_MANAGER_OPERATOR_NAME + "." + TASKS[0] + "." + TRIGGERED, 2L) - .put(LOCK_REMOVER_OPERATOR_NAME + "." + TASKS[1] + "." + SUCCEEDED_TASK_COUNTER, 0L) - .put(LOCK_REMOVER_OPERATOR_NAME + "." + TASKS[1] + "." + FAILED_TASK_COUNTER, 1L) - .put(TRIGGER_MANAGER_OPERATOR_NAME + "." + TASKS[1] + "." + TRIGGERED, 1L) + new ImmutableMap.Builder, Long>() .put( - TRIGGER_MANAGER_OPERATOR_NAME - + "." - + GROUP_VALUE_DEFAULT - + "." - + NOTHING_TO_TRIGGER, + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, + table.name(), + TASKS[0], + "0", + SUCCEEDED_TASK_COUNTER), + 2L) + .put( + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, table.name(), TASKS[0], "0", FAILED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + TRIGGER_MANAGER_OPERATOR_NAME, table.name(), TASKS[0], "0", TRIGGERED), + 2L) + .put( + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, + table.name(), + TASKS[1], + "1", + SUCCEEDED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, table.name(), TASKS[1], "1", FAILED_TASK_COUNTER), + 1L) + .put( + ImmutableList.of( + TRIGGER_MANAGER_OPERATOR_NAME, table.name(), TASKS[1], "1", TRIGGERED), + 1L) + .put( + ImmutableList.of(TRIGGER_MANAGER_OPERATOR_NAME, table.name(), NOTHING_TO_TRIGGER), -1L) .put( - TRIGGER_MANAGER_OPERATOR_NAME - + "." - + GROUP_VALUE_DEFAULT - + "." - + CONCURRENT_RUN_THROTTLED, + ImmutableList.of( + TRIGGER_MANAGER_OPERATOR_NAME, table.name(), CONCURRENT_RUN_THROTTLED), -1L) .put( - TRIGGER_MANAGER_OPERATOR_NAME - + "." - + GROUP_VALUE_DEFAULT - + "." - + RATE_LIMITER_TRIGGERED, + ImmutableList.of( + TRIGGER_MANAGER_OPERATOR_NAME, table.name(), RATE_LIMITER_TRIGGERED), -1L) .build()); } @@ -382,7 +402,7 @@ private Transformation monitorSource() { // Some checks to make sure this is the transformation we are looking for assertThat(result).isInstanceOf(SourceTransformation.class); - assertThat(result.getName()).isEqualTo(SOURCE_OPERATOR_NAME); + assertThat(result.getName()).startsWith(SOURCE_OPERATOR_NAME_PREFIX); return result; } @@ -405,7 +425,7 @@ DataStream append(DataStream trigger) { return trigger .map(new DummyMaintenanceTask(success)) .name(name) - .uid(uidSuffix() + "-test-mapper-" + name) + .uid(uidSuffix() + "-test-mapper-" + name + "-" + id) .slotSharingGroup(slotSharingGroup()) .forceNonParallel(); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java index 7a523035b7fb..ed66ff3df076 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java @@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -38,10 +39,24 @@ public class MetricsReporterFactoryForTests implements MetricReporterFactory { private static final TestMetricsReporter INSTANCE = new TestMetricsReporter(); - private static final Pattern FULL_METRIC_NAME = + private static final Pattern TASK_METRIC_NAME = Pattern.compile( "\\.taskmanager\\.[^.]+\\.[^.]+\\.([^.]+)\\.\\d+\\." + TableMaintenanceMetrics.GROUP_KEY + + "\\." + + TableMaintenanceMetrics.TABLE_NAME_KEY + + "\\.([^.]+)\\." + + TableMaintenanceMetrics.TASK_NAME_KEY + + "\\.([^.]+)\\." + + TableMaintenanceMetrics.TASK_INDEX_KEY + + "\\.([^.]+)\\.([^.]+)"); + + private static final Pattern MAIN_METRIC_NAME = + Pattern.compile( + "\\.taskmanager\\.[^.]+\\.[^.]+\\.([^.]+)\\.\\d+\\." + + TableMaintenanceMetrics.GROUP_KEY + + "\\." + + TableMaintenanceMetrics.TABLE_NAME_KEY + "\\.([^.]+)\\.([^.]+)"); private static Map counters = Maps.newConcurrentMap(); @@ -72,20 +87,26 @@ public static void reset() { gauges = Maps.newConcurrentMap(); } - public static Long counter(String name) { - return counterValues().get(name); + public static Long counter(List parts) { + return counterValues().get(longName(parts)); } - public static Long gauge(String name) { - return gaugeValues().get(name); + public static Long gauge(List parts) { + return gaugeValues().get(longName(parts)); } - public static void assertGauges(Map expected) { - assertThat(filter(gaugeValues(), expected)).isEqualTo(filter(expected, expected)); + public static void assertGauges(Map, Long> expected) { + Map transformed = + expected.entrySet().stream() + .collect(Collectors.toMap(k -> longName(k.getKey()), Map.Entry::getValue)); + assertThat(filter(gaugeValues(), transformed)).isEqualTo(filter(transformed, transformed)); } - public static void assertCounters(Map expected) { - assertThat(filter(counterValues(), expected)).isEqualTo(filter(expected, expected)); + public static void assertCounters(Map, Long> expected) { + Map transformed = + expected.entrySet().stream() + .collect(Collectors.toMap(k -> longName(k.getKey()), Map.Entry::getValue)); + assertThat(filter(counterValues(), transformed)).isEqualTo(filter(transformed, transformed)); } private static Map gaugeValues() { @@ -113,12 +134,30 @@ private static Map filter(Map original, Map parts) { + return parts.stream().map(s -> s.replaceAll("\\.", "_")).collect(Collectors.joining(".")); } private static class TestMetricsReporter implements MetricReporter { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 94c6b4c45500..5e05f40e53cf 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -73,7 +73,8 @@ public class OperatorTestBase { static final long EVENT_TIME = 10L; static final long EVENT_TIME_2 = 11L; - protected static final String DUMMY_NAME = "dummy"; + protected static final String DUMMY_TASK_NAME = "dummyTask"; + protected static final String DUMMY_TABLE_NAME = "dummyTable"; @RegisterExtension protected static final MiniClusterExtension MINI_CLUSTER_EXTENSION = @@ -91,7 +92,7 @@ public class OperatorTestBase { new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE); @BeforeEach - void before() throws IOException { + void before() { LOCK_FACTORY.open(); MetricsReporterFactoryForTests.reset(); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java index 1160f6bff601..d70c4aafd59a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java @@ -90,7 +90,7 @@ private void deleteFile(TableLoader tableLoader, String fileName) throws Excepti tableLoader().open(); try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>( - new DeleteFilesProcessor(DUMMY_NAME, tableLoader.loadTable(), 10), + new DeleteFilesProcessor(0, DUMMY_TASK_NAME, tableLoader.loadTable(), 10), StringSerializer.INSTANCE)) { testHarness.open(); testHarness.processElement(fileName, System.currentTimeMillis()); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java index a6d78ed93682..f75c765df967 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java @@ -26,6 +26,7 @@ import java.io.File; import java.time.Duration; import java.util.Collection; +import java.util.List; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; @@ -46,6 +47,7 @@ import org.apache.flink.util.Collector; import org.apache.iceberg.flink.maintenance.api.TaskResult; import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.awaitility.Awaitility; @@ -74,9 +76,9 @@ void testProcess() throws Exception { source .dataStream() .transform( - DUMMY_NAME, + DUMMY_TASK_NAME, TypeInformation.of(Void.class), - new LockRemover(new TestingLockFactory(), Lists.newArrayList(TASKS))) + new LockRemover(DUMMY_TABLE_NAME, new TestingLockFactory(), Lists.newArrayList(TASKS))) .setParallelism(1); JobClient jobClient = null; @@ -131,9 +133,9 @@ void testMetrics() throws Exception { source .dataStream() .transform( - DUMMY_NAME, + DUMMY_TASK_NAME, TypeInformation.of(Void.class), - new LockRemover(new TestingLockFactory(), Lists.newArrayList(TASKS))) + new LockRemover(DUMMY_TABLE_NAME, new TestingLockFactory(), Lists.newArrayList(TASKS))) .setParallelism(1); JobClient jobClient = null; @@ -152,31 +154,57 @@ void testMetrics() throws Exception { .until( () -> MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + TASKS[1] + "." + SUCCEEDED_TASK_COUNTER) + ImmutableList.of( + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + TASKS[1], + "1", + SUCCEEDED_TASK_COUNTER)) .equals(3L)); // Final check all the counters MetricsReporterFactoryForTests.assertCounters( - new ImmutableMap.Builder() - .put(DUMMY_NAME + "." + TASKS[0] + "." + SUCCEEDED_TASK_COUNTER, 2L) - .put(DUMMY_NAME + "." + TASKS[0] + "." + FAILED_TASK_COUNTER, 1L) - .put(DUMMY_NAME + "." + TASKS[1] + "." + SUCCEEDED_TASK_COUNTER, 3L) - .put(DUMMY_NAME + "." + TASKS[1] + "." + FAILED_TASK_COUNTER, 0L) - .put(DUMMY_NAME + "." + TASKS[2] + "." + SUCCEEDED_TASK_COUNTER, 0L) - .put(DUMMY_NAME + "." + TASKS[2] + "." + FAILED_TASK_COUNTER, 0L) + new ImmutableMap.Builder, Long>() + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", SUCCEEDED_TASK_COUNTER), + 2L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", FAILED_TASK_COUNTER), + 1L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", SUCCEEDED_TASK_COUNTER), + 3L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", FAILED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", SUCCEEDED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", FAILED_TASK_COUNTER), + 0L) .build()); assertThat( MetricsReporterFactoryForTests.gauge( - DUMMY_NAME + "." + TASKS[0] + "." + LAST_RUN_DURATION_MS)) + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", LAST_RUN_DURATION_MS))) .isPositive(); assertThat( MetricsReporterFactoryForTests.gauge( - DUMMY_NAME + "." + TASKS[1] + "." + LAST_RUN_DURATION_MS)) + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", LAST_RUN_DURATION_MS))) .isGreaterThan(time); assertThat( MetricsReporterFactoryForTests.gauge( - DUMMY_NAME + "." + TASKS[2] + "." + LAST_RUN_DURATION_MS)) + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", LAST_RUN_DURATION_MS))) .isZero(); } finally { closeJobClient(jobClient); @@ -200,9 +228,10 @@ void testRecovery() throws Exception { .dataStream() .union(source2.dataStream()) .transform( - DUMMY_NAME, + DUMMY_TASK_NAME, TypeInformation.of(Void.class), - new LockRemover(new TestingLockFactory(), Lists.newArrayList(TASKS[0]))) + new LockRemover( + DUMMY_TABLE_NAME, new TestingLockFactory(), Lists.newArrayList(TASKS[0]))) .setParallelism(1); JobClient jobClient = null; @@ -220,7 +249,12 @@ void testRecovery() throws Exception { .until( () -> MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + TASKS[0] + "." + SUCCEEDED_TASK_COUNTER) + ImmutableList.of( + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + TASKS[0], + "0", + SUCCEEDED_TASK_COUNTER)) .equals(2L)); // We did not remove the recovery lock, as no watermark received from the other source @@ -242,20 +276,21 @@ private void processAndCheck(ManualSource source, TaskResult input) private void processAndCheck( ManualSource source, TaskResult input, String counterPrefix) { + List counterKey = + ImmutableList.of( + (counterPrefix != null ? counterPrefix : "") + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + TASKS[input.taskIndex()], + String.valueOf(input.taskIndex()), + input.success() ? SUCCEEDED_TASK_COUNTER : FAILED_TASK_COUNTER); + Long counterValue = MetricsReporterFactoryForTests.counter(counterKey); + Long expected = counterValue != null ? counterValue + 1 : 1L; + source.sendRecord(input); source.sendWatermark(input.startEpoch()); - String counterName = - (counterPrefix != null ? counterPrefix : "") - .concat( - input.success() - ? DUMMY_NAME + "." + TASKS[input.taskIndex()] + "." + SUCCEEDED_TASK_COUNTER - : DUMMY_NAME + "." + TASKS[input.taskIndex()] + "." + FAILED_TASK_COUNTER); - Long counterValue = MetricsReporterFactoryForTests.counter(counterName); - Long expected = counterValue != null ? counterValue + 1 : 1L; - Awaitility.await() - .until(() -> expected.equals(MetricsReporterFactoryForTests.counter(counterName))); + .until(() -> expected.equals(MetricsReporterFactoryForTests.counter(counterKey))); } private static class TestingLockFactory implements TriggerLockFactory { @@ -389,9 +424,10 @@ public void flatMap( } }) .transform( - DUMMY_NAME, + DUMMY_TASK_NAME, TypeInformation.of(Void.class), - new LockRemover(new TestingLockFactory(), Lists.newArrayList(TASKS[0]))); + new LockRemover( + DUMMY_TABLE_NAME, new TestingLockFactory(), Lists.newArrayList(TASKS[0]))); } } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java index 7c3418f942ec..a70d27279460 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java @@ -19,7 +19,6 @@ package org.apache.iceberg.flink.maintenance.operator; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED; -import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED; @@ -36,9 +35,11 @@ import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.Table; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.awaitility.Awaitility; @@ -54,12 +55,14 @@ class TestTriggerManager extends OperatorTestBase { private long processingTime = 0L; private TriggerLockFactory.Lock lock; private TriggerLockFactory.Lock recoveringLock; + private String tableName; @BeforeEach void before() { - createTable(); + Table table = createTable(); this.lock = LOCK_FACTORY.createLock(); this.recoveringLock = LOCK_FACTORY.createRecoveryLock(); + this.tableName = table.name(); } @Test @@ -421,7 +424,7 @@ void testTriggerMetrics() throws Exception { .dataStream() .keyBy(unused -> true) .process(manager) - .name(DUMMY_NAME) + .name(DUMMY_TASK_NAME) .forceNonParallel() .sinkTo(sink); @@ -437,7 +440,7 @@ void testTriggerMetrics() throws Exception { () -> { Long notingCounter = MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER); + ImmutableList.of(DUMMY_TASK_NAME, tableName, NOTHING_TO_TRIGGER)); return notingCounter != null && notingCounter.equals(1L); }); @@ -446,7 +449,8 @@ void testTriggerMetrics() throws Exception { // Wait until we receive the trigger assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); assertThat( - MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + TASKS[0] + "." + TRIGGERED)) + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED))) .isEqualTo(1L); lock.unlock(); @@ -458,20 +462,22 @@ void testTriggerMetrics() throws Exception { assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); lock.unlock(); assertThat( - MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + TASKS[0] + "." + TRIGGERED)) + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED))) .isEqualTo(2L); assertThat( - MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + TASKS[1] + "." + TRIGGERED)) + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1", TRIGGERED))) .isEqualTo(1L); // Final check all the counters MetricsReporterFactoryForTests.assertCounters( - new ImmutableMap.Builder() - .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED, -1L) - .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_THROTTLED, -1L) - .put(DUMMY_NAME + "." + TASKS[0] + "." + TRIGGERED, 2L) - .put(DUMMY_NAME + "." + TASKS[1] + "." + TRIGGERED, 1L) - .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, 1L) + new ImmutableMap.Builder, Long>() + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, RATE_LIMITER_TRIGGERED), -1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, CONCURRENT_RUN_THROTTLED), -1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED), 2L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1", TRIGGERED), 1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, NOTHING_TO_TRIGGER), 1L) .build()); } finally { closeJobClient(jobClient); @@ -493,7 +499,7 @@ void testRateLimiterMetrics() throws Exception { .dataStream() .keyBy(unused -> true) .process(manager) - .name(DUMMY_NAME) + .name(DUMMY_TASK_NAME) .forceNonParallel() .sinkTo(sink); @@ -514,7 +520,7 @@ void testRateLimiterMetrics() throws Exception { .until( () -> MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED) + ImmutableList.of(DUMMY_TASK_NAME, tableName, RATE_LIMITER_TRIGGERED)) .equals(1L)); // Final check all the counters @@ -539,7 +545,7 @@ void testConcurrentRunMetrics() throws Exception { .dataStream() .keyBy(unused -> true) .process(manager) - .name(DUMMY_NAME) + .name(DUMMY_TASK_NAME) .forceNonParallel() .sinkTo(sink); @@ -557,7 +563,7 @@ void testConcurrentRunMetrics() throws Exception { .until( () -> MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_THROTTLED) + ImmutableList.of(DUMMY_TASK_NAME, tableName, CONCURRENT_RUN_THROTTLED)) .equals(1L)); // Final check all the counters @@ -577,15 +583,15 @@ private static Stream parametersForTestRecovery() { private void assertCounters(long rateLimiterTrigger, long concurrentRunTrigger) { MetricsReporterFactoryForTests.assertCounters( - new ImmutableMap.Builder() + new ImmutableMap.Builder, Long>() .put( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED, + ImmutableList.of(DUMMY_TASK_NAME, tableName, RATE_LIMITER_TRIGGERED), rateLimiterTrigger) .put( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_THROTTLED, + ImmutableList.of(DUMMY_TASK_NAME, tableName, CONCURRENT_RUN_THROTTLED), concurrentRunTrigger) - .put(DUMMY_NAME + "." + TASKS[0] + "." + TRIGGERED, 1L) - .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, 0L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED), 1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, NOTHING_TO_TRIGGER), 0L) .build()); }