Skip to content

Commit

Permalink
[FLINK-36314][state/forst] Support state V1 interface in forst stateb…
Browse files Browse the repository at this point in the history
…ackend (apache#25349)
  • Loading branch information
fredia authored Sep 20, 2024
1 parent 20e9826 commit 1ba8900
Show file tree
Hide file tree
Showing 26 changed files with 5,503 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,16 @@ public StreamOperatorStateContext streamOperatorStateContext(
try {

// -------------- Keyed State Backend --------------
// TODO: Support KeyedStateBackend for AsyncKeyedStateBackend to unify the logic
keyedStatedBackend =
keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup,
managedMemoryFraction,
statsCollector,
StateBackend::createKeyedStateBackend);
if (stateBackend.supportsAsyncKeyedStateBackend()) {
asyncKeyedStateBackend =
keyedStatedBackend(
Expand All @@ -200,16 +209,6 @@ public StreamOperatorStateContext streamOperatorStateContext(
statsCollector,
StateBackend::createAsyncKeyedStateBackend);
} else {
keyedStatedBackend =
keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup,
managedMemoryFraction,
statsCollector,
StateBackend::createKeyedStateBackend);
if (keyedStatedBackend != null) {
asyncKeyedStateBackend =
new AsyncKeyedStateBackendAdaptor<>(keyedStatedBackend);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.File;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -300,6 +301,25 @@ public class ForStConfigurableOptions implements Serializable {
"If true, ForSt will use block-based filter instead of full filter, this only take effect when bloom filter is used. "
+ "The default value is 'false'.");

public static final ConfigOption<Long> COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES =
key("state.backend.forst.compaction.filter.query-time-after-num-entries")
.longType()
.defaultValue(1000L)
.withDescription(
"Number of state entries to process by compaction filter before updating current timestamp. "
+ "Updating the timestamp more often can improve cleanup speed, "
+ "but it decreases compaction performance because it uses JNI calls from native code.The default value is '1000L'.");

public static final ConfigOption<Duration> COMPACT_FILTER_PERIODIC_COMPACTION_TIME =
key("state.backend.forst.compaction.filter.periodic-compaction-time")
.durationType()
.defaultValue(Duration.ofDays(30))
.withDescription(
"Periodic compaction could speed up expired state entries cleanup, especially for state"
+ " entries rarely accessed. Files older than this value will be picked up for compaction,"
+ " and re-written to the same level as they were before. It makes sure a file goes through"
+ " compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'.");

static final ConfigOption<?>[] CANDIDATE_CONFIGS =
new ConfigOption<?>[] {
// configurable DBOptions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.forst;

import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlUtils;
import org.apache.flink.runtime.state.ttl.TtlValue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.FlinkCompactionFilter;
import org.rocksdb.FlinkCompactionFilter.FlinkCompactionFilterFactory;
import org.rocksdb.InfoLogLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.time.Duration;
import java.util.LinkedHashMap;

/** RocksDB compaction filter utils for state with TTL. */
public class ForStDBTtlCompactFiltersManager {
private static final Logger LOG = LoggerFactory.getLogger(FlinkCompactionFilter.class);

private final TtlTimeProvider ttlTimeProvider;

/** Registered compaction filter factories. */
private final LinkedHashMap<String, FlinkCompactionFilterFactory> compactionFilterFactories;

/** Created column family options. */
private final LinkedHashMap<String, ColumnFamilyOptions> columnFamilyOptionsMap;

/**
* Number of state entries to process by compaction filter before updating current timestamp.
*/
private final long queryTimeAfterNumEntries;

/**
* Periodic compaction could speed up expired state entries cleanup, especially for state
* entries rarely accessed. Files older than this value will be picked up for compaction, and
* re-written to the same level as they were before. It makes sure a file goes through
* compaction filters periodically. 0 means turning off periodic compaction.
*/
private final Duration periodicCompactionTime;

public ForStDBTtlCompactFiltersManager(
TtlTimeProvider ttlTimeProvider,
long queryTimeAfterNumEntries,
Duration periodicCompactionTime) {
this.ttlTimeProvider = ttlTimeProvider;
this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
this.periodicCompactionTime = periodicCompactionTime;
this.compactionFilterFactories = new LinkedHashMap<>();
this.columnFamilyOptionsMap = new LinkedHashMap<>();
}

public void setAndRegisterCompactFilterIfStateTtl(
@Nonnull RegisteredStateMetaInfoBase metaInfoBase,
@Nonnull ColumnFamilyOptions options) {

if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) {
RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase =
(RegisteredKeyValueStateBackendMetaInfo) metaInfoBase;
if (TtlStateFactory.TtlSerializer.isTtlStateSerializer(
kvMetaInfoBase.getStateSerializer())) {
createAndSetCompactFilterFactory(metaInfoBase.getName(), options);
}
}
}

private void createAndSetCompactFilterFactory(
String stateName, @Nonnull ColumnFamilyOptions options) {

FlinkCompactionFilterFactory compactionFilterFactory =
new FlinkCompactionFilterFactory(
new TimeProviderWrapper(ttlTimeProvider), createRocksDbNativeLogger());
//noinspection resource
options.setCompactionFilterFactory(compactionFilterFactory);
compactionFilterFactories.put(stateName, compactionFilterFactory);
columnFamilyOptionsMap.put(stateName, options);
}

private static org.rocksdb.Logger createRocksDbNativeLogger() {
if (LOG.isDebugEnabled()) {
// options are always needed for org.rocksdb.Logger construction (no other constructor)
// the logger level gets configured from the options in native code
try (DBOptions opts = new DBOptions().setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)) {
return new org.rocksdb.Logger(opts) {
@Override
protected void log(InfoLogLevel infoLogLevel, String logMsg) {
LOG.debug("RocksDB filter native code log: " + logMsg);
}
};
}
} else {
return null;
}
}

public void configCompactFilter(
@Nonnull StateDescriptor<?, ?> stateDesc, TypeSerializer<?> stateSerializer) {
StateTtlConfig ttlConfig = stateDesc.getTtlConfig();
if (ttlConfig.isEnabled() && ttlConfig.getCleanupStrategies().inRocksdbCompactFilter()) {
FlinkCompactionFilterFactory compactionFilterFactory =
compactionFilterFactories.get(stateDesc.getName());
Preconditions.checkNotNull(compactionFilterFactory);
long ttl = ttlConfig.getTimeToLive().toMillis();

ColumnFamilyOptions columnFamilyOptions =
columnFamilyOptionsMap.get(stateDesc.getName());
Preconditions.checkNotNull(columnFamilyOptions);

StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy =
ttlConfig.getCleanupStrategies().getRocksdbCompactFilterCleanupStrategy();

Duration periodicCompactionTime = this.periodicCompactionTime;
long queryTimeAfterNumEntries = this.queryTimeAfterNumEntries;

if (rocksdbCompactFilterCleanupStrategy != null) {
periodicCompactionTime =
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime();
queryTimeAfterNumEntries =
rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
}
if (periodicCompactionTime != null) {
columnFamilyOptions.setPeriodicCompactionSeconds(
periodicCompactionTime.getSeconds());
}

FlinkCompactionFilter.Config config;
if (stateDesc instanceof ListStateDescriptor) {
TypeSerializer<?> elemSerializer =
((ListSerializer<?>) stateSerializer).getElementSerializer();
int len = elemSerializer.getLength();
if (len > 0) {
config =
FlinkCompactionFilter.Config.createForFixedElementList(
ttl,
queryTimeAfterNumEntries,
len + 1); // plus one byte for list element delimiter
} else {
config =
FlinkCompactionFilter.Config.createForList(
ttl,
queryTimeAfterNumEntries,
new ListElementFilterFactory<>(elemSerializer.duplicate()));
}
} else if (stateDesc instanceof MapStateDescriptor) {
config = FlinkCompactionFilter.Config.createForMap(ttl, queryTimeAfterNumEntries);
} else {
config = FlinkCompactionFilter.Config.createForValue(ttl, queryTimeAfterNumEntries);
}
compactionFilterFactory.configure(config);
}
}

private static class ListElementFilterFactory<T>
implements FlinkCompactionFilter.ListElementFilterFactory {
private final TypeSerializer<T> serializer;

private ListElementFilterFactory(TypeSerializer<T> serializer) {
this.serializer = serializer;
}

@Override
public FlinkCompactionFilter.ListElementFilter createListElementFilter() {
return new ListElementFilter<>(serializer);
}
}

private static class TimeProviderWrapper implements FlinkCompactionFilter.TimeProvider {
private final TtlTimeProvider ttlTimeProvider;

private TimeProviderWrapper(TtlTimeProvider ttlTimeProvider) {
this.ttlTimeProvider = ttlTimeProvider;
}

@Override
public long currentTimestamp() {
return ttlTimeProvider.currentTimestamp();
}
}

private static class ListElementFilter<T> implements FlinkCompactionFilter.ListElementFilter {
private final TypeSerializer<T> serializer;
private DataInputDeserializer input;

private ListElementFilter(TypeSerializer<T> serializer) {
this.serializer = serializer;
this.input = new DataInputDeserializer();
}

@Override
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
input.setBuffer(bytes);
int lastElementOffset = 0;
while (input.available() > 0) {
try {
long timestamp = nextElementLastAccessTimestamp();
if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
break;
}
lastElementOffset = input.getPosition();
} catch (IOException e) {
throw new FlinkRuntimeException(
"Failed to deserialize list element for TTL compaction filter", e);
}
}
return lastElementOffset;
}

private long nextElementLastAccessTimestamp() throws IOException {
TtlValue<?> ttlValue = (TtlValue<?>) serializer.deserialize(input);
if (input.available() > 0) {
input.skipBytesToRead(1);
}
return ttlValue.getLastAccessTimestamp();
}
}

public void disposeAndClearRegisteredCompactionFactories() {
for (FlinkCompactionFilterFactory factory : compactionFilterFactories.values()) {
IOUtils.closeQuietly(factory);
}
compactionFilterFactories.clear();
columnFamilyOptionsMap.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -61,6 +62,8 @@ public class ForStDBWriteBatchWrapper implements AutoCloseable {
/** List of all objects that we need to close in close(). */
private final List<AutoCloseable> toClose;

private volatile boolean cancelled;

public ForStDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, long writeBatchSize) {
this(rocksDB, null, 500, writeBatchSize);
}
Expand Down Expand Up @@ -157,4 +160,12 @@ private void flushIfNeeded() throws RocksDBException {
long getDataSize() {
return batch.getDataSize();
}

public void markCancelled() {
this.cancelled = true;
}

public Closeable getCancelCloseable() {
return this::markCancelled;
}
}
Loading

0 comments on commit 1ba8900

Please sign in to comment.