Skip to content

Commit

Permalink
Make IndexStoreListener a pluggable interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Deng committed Nov 6, 2024
1 parent 4213cc2 commit b50c474
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 34 deletions.
15 changes: 1 addition & 14 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
Expand Down Expand Up @@ -1412,18 +1413,4 @@ private static void tryWriteTempFile(Path path) throws IOException {
}
}
}

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.internal
*/
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;

import java.util.List;

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.api
*/
@PublicApi(since = "2.19.0")
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
final class CompositeIndexStoreListener implements IndexStoreListener {
private final List<IndexStoreListener> listeners;
private final Logger logger = LogManager.getLogger(CompositeIndexStoreListener.class);

public CompositeIndexStoreListener(List<IndexStoreListener> listeners) {
this.listeners = listeners;
}

public void addListener(IndexStoreListener listener) {
this.listeners.add(listener);
}

@Override
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {
for (IndexStoreListener listener : listeners) {
try {
listener.beforeShardPathDeleted(shardId, indexSettings, env);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("beforeShardPathDeleted listener [{}] failed", listener), e);
}
}
}

@Override
public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {
for (IndexStoreListener listener : listeners) {
try {
listener.beforeIndexPathDeleted(index, indexSettings, env);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("beforeIndexPathDeleted listener [{}] failed", listener), e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.IndexStoreListener;

import java.io.IOException;
import java.nio.file.DirectoryStream;
Expand All @@ -33,7 +34,7 @@
*
* @opensearch.internal
*/
public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener {
public class FileCacheCleaner implements IndexStoreListener {
private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class);

private final Provider<FileCache> fileCacheProvider;
Expand Down
17 changes: 11 additions & 6 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.remote.RemoteIndexPathUploader;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
Expand Down Expand Up @@ -279,7 +280,6 @@
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import javax.net.ssl.SNIHostName;

import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -548,11 +548,16 @@ protected Node(
*/
this.environment = new Environment(settings, initialEnvironment.configDir(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
Environment.assertEquivalent(initialEnvironment, this.environment);
if (DiscoveryNode.isSearchNode(settings) == false) {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
} else {
nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache));
}
IndexStoreListener.CompositeIndexStoreListener compositeIndexStoreListener = new IndexStoreListener.CompositeIndexStoreListener(
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getIndexStoreListener)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList())
);
compositeIndexStoreListener.addListener(new FileCacheCleaner(this::fileCache));
nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache));
logger.info(
"node name [{}], node ID [{}], cluster name [{}], roles {}",
NODE_NAME_SETTING.get(tmpSettings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.indices.recovery.RecoveryState;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

/**
* A plugin that provides alternative directory implementations.
Expand Down Expand Up @@ -105,4 +107,11 @@ interface RecoveryStateFactory {
default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
return Collections.emptyMap();
}

/**
* The {@link IndexStoreListener}s for this plugin which are triggered upon shard/index path deletion
*/
default Optional<IndexStoreListener> getIndexStoreListener() {
return Optional.empty();
}
}
46 changes: 33 additions & 13 deletions server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.node.Node;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.NodeRoles;
Expand All @@ -65,14 +66,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.test.NodeRoles.nonClusterManagerNode;
import static org.opensearch.test.NodeRoles.nonDataNode;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.opensearch.test.NodeRoles.nonClusterManagerNode;
import static org.opensearch.test.NodeRoles.nonDataNode;

@LuceneTestCase.SuppressFileSystems("ExtrasFS") // TODO: fix test to allow extras
public class NodeEnvironmentTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -360,24 +361,39 @@ protected void doRun() throws Exception {
}

public void testIndexStoreListener() throws Exception {
final AtomicInteger shardCounter = new AtomicInteger(0);
final AtomicInteger indexCounter = new AtomicInteger(0);
final AtomicInteger shardCounter1 = new AtomicInteger(0);
final AtomicInteger shardCounter2 = new AtomicInteger(0);
final AtomicInteger indexCounter1 = new AtomicInteger(0);
final AtomicInteger indexCounter2 = new AtomicInteger(0);
final Index index = new Index("foo", "fooUUID");
final ShardId shardId = new ShardId(index, 0);
final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() {
final IndexStoreListener listener1 = new IndexStoreListener() {
@Override
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(shardId, inShardId);
shardCounter1.incrementAndGet();
}

@Override
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(index, inIndex);
indexCounter1.incrementAndGet();
}
};
final IndexStoreListener listener2 = new IndexStoreListener() {
@Override
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(shardId, inShardId);
shardCounter.incrementAndGet();
shardCounter2.incrementAndGet();
}

@Override
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(index, inIndex);
indexCounter.incrementAndGet();
indexCounter2.incrementAndGet();
}
};
final NodeEnvironment env = newNodeEnvironment(listener);
final NodeEnvironment env = newNodeEnvironment(new IndexStoreListener.CompositeIndexStoreListener(List.of(listener1, listener2)));

for (Path path : env.indexPaths(index)) {
Files.createDirectories(path.resolve("0"));
Expand All @@ -386,26 +402,30 @@ public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, N
for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path.resolve("0")));
}
assertEquals(0, shardCounter.get());
assertEquals(0, shardCounter1.get());
assertEquals(0, shardCounter2.get());

env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path.resolve("0")));
}
assertEquals(1, shardCounter.get());
assertEquals(1, shardCounter1.get());
assertEquals(1, shardCounter2.get());

for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path));
}
assertEquals(0, indexCounter.get());
assertEquals(0, indexCounter1.get());
assertEquals(0, indexCounter2.get());

env.deleteIndexDirectorySafe(index, 5000, idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path));
}
assertEquals(1, indexCounter.get());
assertEquals(1, indexCounter1.get());
assertEquals(1, indexCounter2.get());
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
env.close();
}
Expand Down Expand Up @@ -680,7 +700,7 @@ public NodeEnvironment newNodeEnvironment() throws IOException {
return newNodeEnvironment(Settings.EMPTY);
}

public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException {
public NodeEnvironment newNodeEnvironment(IndexStoreListener listener) throws IOException {
Settings build = buildEnvSettings(Settings.EMPTY);
return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener);
}
Expand Down

0 comments on commit b50c474

Please sign in to comment.