Skip to content

Commit

Permalink
[FLINK-34457] Rename options for latency tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Feb 18, 2024
1 parent 050503c commit 5b571ef
Show file tree
Hide file tree
Showing 16 changed files with 169 additions and 69 deletions.
4 changes: 2 additions & 2 deletions docs/content.zh/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,9 @@ Please refer to the [Debugging Classloading Docs]({{< ref "docs/ops/debugging/de

{{< generated/expert_state_backends_section >}}

### State Backends Latency Tracking Options
### State Latency Tracking Options

{{< generated/state_backend_latency_tracking_section >}}
{{< generated/state_latency_tracking_section >}}

### Advanced RocksDB State Backends Options

Expand Down
4 changes: 2 additions & 2 deletions docs/content/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,9 @@ Please refer to the [Debugging Classloading Docs]({{< ref "docs/ops/debugging/de

{{< generated/expert_state_backends_section >}}

### State Backends Latency Tracking Options
### State Latency Tracking Options

{{< generated/state_backend_latency_tracking_section >}}
{{< generated/state_latency_tracking_section >}}

### Advanced RocksDB State Backends Options

Expand Down
24 changes: 0 additions & 24 deletions docs/layouts/shortcodes/generated/state_backend_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>state.backend.latency-track.history-size</h5></td>
<td style="word-wrap: break-word;">128</td>
<td>Integer</td>
<td>Defines the number of measured latencies to maintain at each state access operation.</td>
</tr>
<tr>
<td><h5>state.backend.latency-track.keyed-state-enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to track latency of keyed state operations, e.g value state put/get/clear.</td>
</tr>
<tr>
<td><h5>state.backend.latency-track.sample-interval</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>The sample interval of latency track once 'state.backend.latency-track.keyed-state-enabled' is enabled. The default value is 100, which means we would track the latency every 100 access requests.</td>
</tr>
<tr>
<td><h5>state.backend.latency-track.state-name-as-variable</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to expose state name as a variable if tracking latency.</td>
</tr>
<tr>
<td><h5>state.backend.type</h5></td>
<td style="word-wrap: break-word;">"hashmap"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,25 @@
</thead>
<tbody>
<tr>
<td><h5>state.backend.latency-track.history-size</h5></td>
<td><h5>state.latency-track.history-size</h5></td>
<td style="word-wrap: break-word;">128</td>
<td>Integer</td>
<td>Defines the number of measured latencies to maintain at each state access operation.</td>
</tr>
<tr>
<td><h5>state.backend.latency-track.keyed-state-enabled</h5></td>
<td><h5>state.latency-track.keyed-state-enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to track latency of keyed state operations, e.g value state put/get/clear.</td>
</tr>
<tr>
<td><h5>state.backend.latency-track.sample-interval</h5></td>
<td><h5>state.latency-track.sample-interval</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>The sample interval of latency track once 'state.backend.latency-track.keyed-state-enabled' is enabled. The default value is 100, which means we would track the latency every 100 access requests.</td>
<td>The sample interval of latency track once 'state.latency-track.keyed-state-enabled' is enabled. The default value is 100, which means we would track the latency every 100 access requests.</td>
</tr>
<tr>
<td><h5>state.backend.latency-track.state-name-as-variable</h5></td>
<td><h5>state.latency-track.state-name-as-variable</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to expose state name as a variable if tracking latency.</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>state.latency-track.history-size</h5></td>
<td style="word-wrap: break-word;">128</td>
<td>Integer</td>
<td>Defines the number of measured latencies to maintain at each state access operation.</td>
</tr>
<tr>
<td><h5>state.latency-track.keyed-state-enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to track latency of keyed state operations, e.g value state put/get/clear.</td>
</tr>
<tr>
<td><h5>state.latency-track.sample-interval</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>The sample interval of latency track once 'state.latency-track.keyed-state-enabled' is enabled. The default value is 100, which means we would track the latency every 100 access requests.</td>
</tr>
<tr>
<td><h5>state.latency-track.state-name-as-variable</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to expose state name as a variable if tracking latency.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ public static final class Sections {

public static final String STATE_BACKEND_ROCKSDB = "state_backend_rocksdb";

public static final String STATE_BACKEND_LATENCY_TRACKING =
"state_backend_latency_tracking";
public static final String STATE_LATENCY_TRACKING = "state_latency_tracking";

public static final String STATE_BACKEND_CHANGELOG = "state_backend_changelog";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;

/** A collection of all configuration options that relate to state backend. */
@PublicEvolving
public class StateBackendOptions {

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -63,15 +65,19 @@ public class StateBackendOptions {
.text("Recognized shortcut names are 'hashmap' and 'rocksdb'.")
.build());

@Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING)
/** @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_ENABLED} instead. */
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption<Boolean> LATENCY_TRACK_ENABLED =
ConfigOptions.key("state.backend.latency-track.keyed-state-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to track latency of keyed state operations, e.g value state put/get/clear.");

@Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING)
/** @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_SAMPLE_INTERVAL} instead. */
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption<Integer> LATENCY_TRACK_SAMPLE_INTERVAL =
ConfigOptions.key("state.backend.latency-track.sample-interval")
.intType()
Expand All @@ -82,15 +88,22 @@ public class StateBackendOptions {
+ "The default value is 100, which means we would track the latency every 100 access requests.",
LATENCY_TRACK_ENABLED.key()));

@Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING)
/** @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_HISTORY_SIZE} instead. */
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption<Integer> LATENCY_TRACK_HISTORY_SIZE =
ConfigOptions.key("state.backend.latency-track.history-size")
.intType()
.defaultValue(128)
.withDescription(
"Defines the number of measured latencies to maintain at each state access operation.");

@Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING)
/**
* @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_STATE_NAME_AS_VARIABLE}
* instead.
*/
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption<Boolean> LATENCY_TRACK_STATE_NAME_AS_VARIABLE =
ConfigOptions.key("state.backend.latency-track.state-name-as-variable")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;

/**
* A collection of all configuration options that relate to the latency tracking for state access.
*/
@PublicEvolving
public class StateLatencyTrackOptions {

@Documentation.Section(Documentation.Sections.STATE_LATENCY_TRACKING)
public static final ConfigOption<Boolean> LATENCY_TRACK_ENABLED =
ConfigOptions.key("state.latency-track.keyed-state-enabled")
.booleanType()
.defaultValue(false)
.withDeprecatedKeys("state.backend.latency-track.keyed-state-enabled")
.withDescription(
"Whether to track latency of keyed state operations, e.g value state put/get/clear.");

@Documentation.Section(Documentation.Sections.STATE_LATENCY_TRACKING)
public static final ConfigOption<Integer> LATENCY_TRACK_SAMPLE_INTERVAL =
ConfigOptions.key("state.latency-track.sample-interval")
.intType()
.defaultValue(100)
.withDeprecatedKeys("state.backend.latency-track.sample-interval")
.withDescription(
String.format(
"The sample interval of latency track once '%s' is enabled. "
+ "The default value is 100, which means we would track the latency every 100 access requests.",
LATENCY_TRACK_ENABLED.key()));

@Documentation.Section(Documentation.Sections.STATE_LATENCY_TRACKING)
public static final ConfigOption<Integer> LATENCY_TRACK_HISTORY_SIZE =
ConfigOptions.key("state.latency-track.history-size")
.intType()
.defaultValue(128)
.withDeprecatedKeys("state.backend.latency-track.history-size")
.withDescription(
"Defines the number of measured latencies to maintain at each state access operation.");

@Documentation.Section(Documentation.Sections.STATE_LATENCY_TRACKING)
public static final ConfigOption<Boolean> LATENCY_TRACK_STATE_NAME_AS_VARIABLE =
ConfigOptions.key("state.latency-track.state-name-as-variable")
.booleanType()
.defaultValue(true)
.withDeprecatedKeys("state.backend.latency-track.state-name-as-variable")
.withDescription(
"Whether to expose state name as a variable if tracking latency.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -86,12 +86,13 @@ public static Builder newBuilder() {
public static class Builder implements Serializable {
private static final long serialVersionUID = 1L;

private boolean enabled = StateBackendOptions.LATENCY_TRACK_ENABLED.defaultValue();
private boolean enabled = StateLatencyTrackOptions.LATENCY_TRACK_ENABLED.defaultValue();
private int sampleInterval =
StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL.defaultValue();
private int historySize = StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue();
StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL.defaultValue();
private int historySize =
StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue();
private boolean stateNameAsVariable =
StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.defaultValue();
StateLatencyTrackOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.defaultValue();
private MetricGroup metricGroup;

public Builder setEnabled(boolean enabled) {
Expand Down Expand Up @@ -120,12 +121,13 @@ public Builder setMetricGroup(MetricGroup metricGroup) {
}

public Builder configure(ReadableConfig config) {
this.setEnabled(config.get(StateBackendOptions.LATENCY_TRACK_ENABLED))
this.setEnabled(config.get(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED))
.setSampleInterval(
config.get(StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL))
.setHistorySize(config.get(StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE))
config.get(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL))
.setHistorySize(config.get(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE))
.setStateNameAsVariable(
config.get(StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE));
config.get(
StateLatencyTrackOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
Expand Down Expand Up @@ -277,7 +277,7 @@ protected MetricGroup getMetricGroup() {
void testEnableStateLatencyTracking() throws Exception {
ConfigurableStateBackend stateBackend = getStateBackend();
Configuration config = new Configuration();
config.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
config.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true);
StateBackend configuredBackend =
stateBackend.configure(config, Thread.currentThread().getContextClassLoader());
KeyGroupRange groupRange = new KeyGroupRange(0, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.state.metrics;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

import org.junit.jupiter.api.Test;
Expand All @@ -46,11 +46,17 @@ void testDefaultEnabledLatencyTrackingStateConfig() {
.build();
assertThat(latencyTrackingStateConfig.isEnabled()).isTrue();
assertThat(latencyTrackingStateConfig.getSampleInterval())
.isEqualTo((int) StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL.defaultValue());
.isEqualTo(
(int)
StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL
.defaultValue());
assertThat(latencyTrackingStateConfig.getHistorySize())
.isEqualTo((long) StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue());
.isEqualTo(
(long) StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue());
assertThat(latencyTrackingStateConfig.isStateNameAsVariable())
.isEqualTo(StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.defaultValue());
.isEqualTo(
StateLatencyTrackOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE
.defaultValue());
}

@Test
Expand All @@ -72,9 +78,9 @@ void testSetLatencyTrackingStateConfig() {
void testConfigureFromReadableConfig() {
LatencyTrackingStateConfig.Builder builder = LatencyTrackingStateConfig.newBuilder();
Configuration configuration = new Configuration();
configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
configuration.set(StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL, 10);
configuration.set(StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE, 500);
configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true);
configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL, 10);
configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE, 500);
LatencyTrackingStateConfig latencyTrackingStateConfig =
builder.configure(configuration)
.setMetricGroup(new UnregisteredMetricsGroup())
Expand Down
Loading

0 comments on commit 5b571ef

Please sign in to comment.