From 0a40ca82b3c302a2f6ca3ab9f624586b89bbdfc8 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Sun, 18 Feb 2024 17:20:25 +0800 Subject: [PATCH] [FLINK-34457] Rename options for latency tracking --- .../configuration/StateBackendOptions.java | 21 ++++-- .../StateLatencyTrackOptions.java | 68 +++++++++++++++++++ .../metrics/LatencyTrackingStateConfig.java | 20 +++--- .../runtime/state/StateBackendTestBase.java | 4 +- .../LatencyTrackingStateConfigTest.java | 20 ++++-- .../metrics/LatencyTrackingStateTestBase.java | 8 +-- ...legateEmbeddedRocksDBStateBackendTest.java | 4 +- ...ChangelogDelegateFileStateBackendTest.java | 4 +- .../ChangelogDelegateHashMapTest.java | 4 +- ...angelogDelegateMemoryStateBackendTest.java | 4 +- 10 files changed, 123 insertions(+), 34 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/configuration/StateLatencyTrackOptions.java diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java index 2778c0f59bd6bf..deec87c5cd48a9 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java @@ -18,11 +18,13 @@ package org.apache.flink.configuration; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.TextElement; /** A collection of all configuration options that relate to state backend. */ +@PublicEvolving public class StateBackendOptions { // ------------------------------------------------------------------------ @@ -63,7 +65,9 @@ public class StateBackendOptions { .text("Recognized shortcut names are 'hashmap' and 'rocksdb'.") .build()); - @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING) + /** @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_ENABLED} instead. */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") public static final ConfigOption LATENCY_TRACK_ENABLED = ConfigOptions.key("state.backend.latency-track.keyed-state-enabled") .booleanType() @@ -71,7 +75,9 @@ public class StateBackendOptions { .withDescription( "Whether to track latency of keyed state operations, e.g value state put/get/clear."); - @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING) + /** @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_SAMPLE_INTERVAL} instead. */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") public static final ConfigOption LATENCY_TRACK_SAMPLE_INTERVAL = ConfigOptions.key("state.backend.latency-track.sample-interval") .intType() @@ -82,7 +88,9 @@ public class StateBackendOptions { + "The default value is 100, which means we would track the latency every 100 access requests.", LATENCY_TRACK_ENABLED.key())); - @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING) + /** @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_HISTORY_SIZE} instead. */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") public static final ConfigOption LATENCY_TRACK_HISTORY_SIZE = ConfigOptions.key("state.backend.latency-track.history-size") .intType() @@ -90,7 +98,12 @@ public class StateBackendOptions { .withDescription( "Defines the number of measured latencies to maintain at each state access operation."); - @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING) + /** + * @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_STATE_NAME_AS_VARIABLE} + * instead. + */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") public static final ConfigOption LATENCY_TRACK_STATE_NAME_AS_VARIABLE = ConfigOptions.key("state.backend.latency-track.state-name-as-variable") .booleanType() diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateLatencyTrackOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateLatencyTrackOptions.java new file mode 100644 index 00000000000000..6d75b8969c7056 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/StateLatencyTrackOptions.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.Documentation; + +/** + * A collection of all configuration options that relate to the latency tracking for state access. + */ +@PublicEvolving +public class StateLatencyTrackOptions { + + @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING) + public static final ConfigOption LATENCY_TRACK_ENABLED = + ConfigOptions.key("state.latency-track.keyed-state-enabled") + .booleanType() + .defaultValue(false) + .withDeprecatedKeys("state.backend.latency-track.keyed-state-enabled") + .withDescription( + "Whether to track latency of keyed state operations, e.g value state put/get/clear."); + + @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING) + public static final ConfigOption LATENCY_TRACK_SAMPLE_INTERVAL = + ConfigOptions.key("state.latency-track.sample-interval") + .intType() + .defaultValue(100) + .withDeprecatedKeys("state.backend.latency-track.sample-interval") + .withDescription( + String.format( + "The sample interval of latency track once '%s' is enabled. " + + "The default value is 100, which means we would track the latency every 100 access requests.", + LATENCY_TRACK_ENABLED.key())); + + @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING) + public static final ConfigOption LATENCY_TRACK_HISTORY_SIZE = + ConfigOptions.key("state.latency-track.history-size") + .intType() + .defaultValue(128) + .withDeprecatedKeys("state.backend.latency-track.history-size") + .withDescription( + "Defines the number of measured latencies to maintain at each state access operation."); + + @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING) + public static final ConfigOption LATENCY_TRACK_STATE_NAME_AS_VARIABLE = + ConfigOptions.key("state.latency-track.state-name-as-variable") + .booleanType() + .defaultValue(true) + .withDeprecatedKeys("state.backend.latency-track.state-name-as-variable") + .withDescription( + "Whether to expose state name as a variable if tracking latency."); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfig.java index d4425e14c30583..9c918040d37dc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfig.java @@ -20,7 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateLatencyTrackOptions; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.Preconditions; @@ -86,12 +86,13 @@ public static Builder newBuilder() { public static class Builder implements Serializable { private static final long serialVersionUID = 1L; - private boolean enabled = StateBackendOptions.LATENCY_TRACK_ENABLED.defaultValue(); + private boolean enabled = StateLatencyTrackOptions.LATENCY_TRACK_ENABLED.defaultValue(); private int sampleInterval = - StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL.defaultValue(); - private int historySize = StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue(); + StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL.defaultValue(); + private int historySize = + StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue(); private boolean stateNameAsVariable = - StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.defaultValue(); + StateLatencyTrackOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.defaultValue(); private MetricGroup metricGroup; public Builder setEnabled(boolean enabled) { @@ -120,12 +121,13 @@ public Builder setMetricGroup(MetricGroup metricGroup) { } public Builder configure(ReadableConfig config) { - this.setEnabled(config.get(StateBackendOptions.LATENCY_TRACK_ENABLED)) + this.setEnabled(config.get(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED)) .setSampleInterval( - config.get(StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL)) - .setHistorySize(config.get(StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE)) + config.get(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL)) + .setHistorySize(config.get(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE)) .setStateNameAsVariable( - config.get(StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE)); + config.get( + StateLatencyTrackOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE)); return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 1cec488d3891e6..86389a00f2c372 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -48,7 +48,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateLatencyTrackOptions; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.testutils.CheckedThread; @@ -277,7 +277,7 @@ protected MetricGroup getMetricGroup() { void testEnableStateLatencyTracking() throws Exception { ConfigurableStateBackend stateBackend = getStateBackend(); Configuration config = new Configuration(); - config.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true); + config.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true); StateBackend configuredBackend = stateBackend.configure(config, Thread.currentThread().getContextClassLoader()); KeyGroupRange groupRange = new KeyGroupRange(0, 1); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfigTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfigTest.java index 4e5ac17ccd300f..5c3d88f27bd78e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfigTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfigTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.state.metrics; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateLatencyTrackOptions; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.junit.jupiter.api.Test; @@ -46,11 +46,17 @@ void testDefaultEnabledLatencyTrackingStateConfig() { .build(); assertThat(latencyTrackingStateConfig.isEnabled()).isTrue(); assertThat(latencyTrackingStateConfig.getSampleInterval()) - .isEqualTo((int) StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL.defaultValue()); + .isEqualTo( + (int) + StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL + .defaultValue()); assertThat(latencyTrackingStateConfig.getHistorySize()) - .isEqualTo((long) StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue()); + .isEqualTo( + (long) StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue()); assertThat(latencyTrackingStateConfig.isStateNameAsVariable()) - .isEqualTo(StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.defaultValue()); + .isEqualTo( + StateLatencyTrackOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE + .defaultValue()); } @Test @@ -72,9 +78,9 @@ void testSetLatencyTrackingStateConfig() { void testConfigureFromReadableConfig() { LatencyTrackingStateConfig.Builder builder = LatencyTrackingStateConfig.newBuilder(); Configuration configuration = new Configuration(); - configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true); - configuration.set(StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL, 10); - configuration.set(StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE, 500); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL, 10); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE, 500); LatencyTrackingStateConfig latencyTrackingStateConfig = builder.configure(configuration) .setMetricGroup(new UnregisteredMetricsGroup()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateTestBase.java index 8054fa1fbf043a..c03968033232d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateTestBase.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateLatencyTrackOptions; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; @@ -58,11 +58,11 @@ protected AbstractKeyedStateBackend createKeyedBackend(TypeSerializer keyS KeyGroupRange keyGroupRange = new KeyGroupRange(0, 127); int numberOfKeyGroups = keyGroupRange.getNumberOfKeyGroups(); Configuration configuration = new Configuration(); - configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true); - configuration.set(StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL, SAMPLE_INTERVAL); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL, SAMPLE_INTERVAL); // use a very large value to not let metrics data overridden. int historySize = 1000_000; - configuration.set(StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE, historySize); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE, historySize); HashMapStateBackend stateBackend = new HashMapStateBackend() .configure(configuration, Thread.currentThread().getContextClassLoader()); diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java index 909c3c61704801..e2e11497565f7c 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateLatencyTrackOptions; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendTest; import org.apache.flink.runtime.execution.Environment; @@ -105,7 +105,7 @@ public void testMaterializedRestoreWithWrappedState() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); Configuration configuration = new Configuration(); - configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true); StateBackend stateBackend = getStateBackend() .configure(configuration, Thread.currentThread().getContextClassLoader()); diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java index e306f8451f8edc..84f5dbe77e4757 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateLatencyTrackOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -101,7 +101,7 @@ public void testMaterializedRestoreWithWrappedState() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); Configuration configuration = new Configuration(); - configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true); StateBackend stateBackend = getStateBackend() .configure(configuration, Thread.currentThread().getContextClassLoader()); diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java index 7de866250d6a72..bf01fe521904a6 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateLatencyTrackOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; @@ -92,7 +92,7 @@ public void testMaterializedRestoreWithWrappedState() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); Configuration configuration = new Configuration(); - configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true); StateBackend stateBackend = getStateBackend() .configure(configuration, Thread.currentThread().getContextClassLoader()); diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateMemoryStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateMemoryStateBackendTest.java index ebcd5a4804b5ca..91b7c3baab87e3 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateMemoryStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateMemoryStateBackendTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateLatencyTrackOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -101,7 +101,7 @@ public void testMaterializedRestoreWithWrappedState() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); Configuration configuration = new Configuration(); - configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true); + configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true); StateBackend stateBackend = getStateBackend() .configure(configuration, Thread.currentThread().getContextClassLoader());