Skip to content

Commit

Permalink
Make IndexStoreListener a pluggable interface
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Nov 7, 2024
1 parent aa5c39b commit ca32a2f
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 29 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483/files))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521))
Expand Down
15 changes: 1 addition & 14 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
Expand Down Expand Up @@ -1412,18 +1413,4 @@ private static void tryWriteTempFile(Path path) throws IOException {
}
}
}

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.internal
*/
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;

import java.util.Collections;
import java.util.List;

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.api
*/
@PublicApi(since = "2.19.0")
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*
* @opensearch.api
*/
@PublicApi(since = "2.19.0")
final class CompositeIndexStoreListener implements IndexStoreListener {
private final List<IndexStoreListener> listeners;
private final static Logger logger = LogManager.getLogger(CompositeIndexStoreListener.class);

public CompositeIndexStoreListener(List<IndexStoreListener> listeners) {
this.listeners = Collections.unmodifiableList(listeners);
}

@Override
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {
for (IndexStoreListener listener : listeners) {
try {
listener.beforeShardPathDeleted(shardId, indexSettings, env);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("beforeShardPathDeleted listener [{}] failed", listener), e);

Check warning on line 57 in server/src/main/java/org/opensearch/index/store/IndexStoreListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/IndexStoreListener.java#L56-L57

Added lines #L56 - L57 were not covered by tests
}
}
}

@Override
public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {
for (IndexStoreListener listener : listeners) {
try {
listener.beforeIndexPathDeleted(index, indexSettings, env);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("beforeIndexPathDeleted listener [{}] failed", listener), e);

Check warning on line 68 in server/src/main/java/org/opensearch/index/store/IndexStoreListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/IndexStoreListener.java#L67-L68

Added lines #L67 - L68 were not covered by tests
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.IndexStoreListener;

import java.io.IOException;
import java.nio.file.DirectoryStream;
Expand All @@ -33,7 +34,7 @@
*
* @opensearch.internal
*/
public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener {
public class FileCacheCleaner implements IndexStoreListener {
private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class);

private final Provider<FileCache> fileCacheProvider;
Expand Down
22 changes: 20 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.remote.RemoteIndexPathUploader;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
Expand Down Expand Up @@ -548,10 +549,27 @@ protected Node(
*/
this.environment = new Environment(settings, initialEnvironment.configDir(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
Environment.assertEquivalent(initialEnvironment, this.environment);
Stream<IndexStoreListener> indexStoreListenerStream = pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getIndexStoreListener)
.filter(Optional::isPresent)
.map(Optional::get);
// FileCache is only initialized on search nodes, so we only create FileCacheCleaner on search nodes as well
if (DiscoveryNode.isSearchNode(settings) == false) {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
nodeEnvironment = new NodeEnvironment(
settings,
environment,
new IndexStoreListener.CompositeIndexStoreListener(indexStoreListenerStream.collect(Collectors.toList()))
);
} else {
nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache));
nodeEnvironment = new NodeEnvironment(
settings,
environment,
new IndexStoreListener.CompositeIndexStoreListener(
Stream.concat(indexStoreListenerStream, Stream.of(new FileCacheCleaner(this::fileCache)))
.collect(Collectors.toList())
)
);
}
logger.info(
"node name [{}], node ID [{}], cluster name [{}], roles {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.indices.recovery.RecoveryState;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

/**
* A plugin that provides alternative directory implementations.
Expand Down Expand Up @@ -105,4 +107,11 @@ interface RecoveryStateFactory {
default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
return Collections.emptyMap();
}

/**
* The {@link IndexStoreListener}s for this plugin which are triggered upon shard/index path deletion
*/
default Optional<IndexStoreListener> getIndexStoreListener() {
return Optional.empty();
}
}
42 changes: 31 additions & 11 deletions server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.node.Node;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.NodeRoles;
Expand Down Expand Up @@ -360,24 +361,39 @@ protected void doRun() throws Exception {
}

public void testIndexStoreListener() throws Exception {
final AtomicInteger shardCounter = new AtomicInteger(0);
final AtomicInteger indexCounter = new AtomicInteger(0);
final AtomicInteger shardCounter1 = new AtomicInteger(0);
final AtomicInteger shardCounter2 = new AtomicInteger(0);
final AtomicInteger indexCounter1 = new AtomicInteger(0);
final AtomicInteger indexCounter2 = new AtomicInteger(0);
final Index index = new Index("foo", "fooUUID");
final ShardId shardId = new ShardId(index, 0);
final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() {
final IndexStoreListener listener1 = new IndexStoreListener() {
@Override
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(shardId, inShardId);
shardCounter.incrementAndGet();
shardCounter1.incrementAndGet();
}

@Override
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(index, inIndex);
indexCounter.incrementAndGet();
indexCounter1.incrementAndGet();
}
};
final NodeEnvironment env = newNodeEnvironment(listener);
final IndexStoreListener listener2 = new IndexStoreListener() {
@Override
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(shardId, inShardId);
shardCounter2.incrementAndGet();
}

@Override
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(index, inIndex);
indexCounter2.incrementAndGet();
}
};
final NodeEnvironment env = newNodeEnvironment(new IndexStoreListener.CompositeIndexStoreListener(List.of(listener1, listener2)));

for (Path path : env.indexPaths(index)) {
Files.createDirectories(path.resolve("0"));
Expand All @@ -386,26 +402,30 @@ public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, N
for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path.resolve("0")));
}
assertEquals(0, shardCounter.get());
assertEquals(0, shardCounter1.get());
assertEquals(0, shardCounter2.get());

env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path.resolve("0")));
}
assertEquals(1, shardCounter.get());
assertEquals(1, shardCounter1.get());
assertEquals(1, shardCounter2.get());

for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path));
}
assertEquals(0, indexCounter.get());
assertEquals(0, indexCounter1.get());
assertEquals(0, indexCounter2.get());

env.deleteIndexDirectorySafe(index, 5000, idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path));
}
assertEquals(1, indexCounter.get());
assertEquals(1, indexCounter1.get());
assertEquals(1, indexCounter2.get());
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
env.close();
}
Expand Down Expand Up @@ -680,7 +700,7 @@ public NodeEnvironment newNodeEnvironment() throws IOException {
return newNodeEnvironment(Settings.EMPTY);
}

public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException {
public NodeEnvironment newNodeEnvironment(IndexStoreListener listener) throws IOException {
Settings build = buildEnvSettings(Settings.EMPTY);
return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener);
}
Expand Down

0 comments on commit ca32a2f

Please sign in to comment.