Skip to content

Commit

Permalink
Changing indicesOptions to lenient for _list/shards
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Nov 19, 2024
1 parent 4663f2b commit b6a8c8b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
Expand All @@ -30,7 +29,6 @@
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.search.SearchService.NO_TIMEOUT;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class TransportCatShardsActionIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -132,26 +130,14 @@ public void onFailure(Exception e) {

public void testCatShardsSuccessWithPaginationWithClosedIndices() throws InterruptedException, ExecutionException {
internalCluster().startClusterManagerOnlyNodes(1);
List<String> nodes = internalCluster().startDataOnlyNodes(3);
final int numIndices = 3;
final int numShards = 5;
final int numReplicas = 2;
final int pageSize = numIndices * numReplicas * (numShards + 1);
internalCluster().startDataOnlyNodes(3);
createIndex(
"test-1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build()
);
createIndex(
"test-2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build()
);
createIndex(
"test-3",
Expand All @@ -162,39 +148,13 @@ public void testCatShardsSuccessWithPaginationWithClosedIndices() throws Interru
.build()
);
ensureGreen();

// close index test-3
client().admin().indices().close(Requests.closeIndexRequest("test-3")).get();

ClusterStateResponse clusterStateResponse = client().admin()
.cluster()
.prepareState()
.clear()
.setMetadata(true)
.setIndices("test-3")
.get();
assertThat(clusterStateResponse.getState().metadata().index("test-3").getState(), equalTo(IndexMetadata.State.CLOSE));

final CatShardsRequest shardsRequest = new CatShardsRequest();
shardsRequest.setCancelAfterTimeInterval(NO_TIMEOUT);
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
shardsRequest.setPageParams(new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize));
CountDownLatch latch = new CountDownLatch(1);
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
List<ShardRouting> shardRoutings = catShardsResponse.getResponseShards();
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals("test-3")));
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail();
latch.countDown();
}
});
latch.await();
ActionFuture<CatShardsResponse> response = client().execute(CatShardsAction.INSTANCE, shardsRequest);
assertTrue(response.get().getResponseShards().stream().anyMatch(shard -> shard.getIndexName().equals("test-3")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public void onResponse(ClusterStateResponse clusterStateResponse) {
return;
}
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
if (paginationStrategy != null) {
// Use lenient indices options for paginated queries, which would silently
// ignore closed concrete indices for fetching stats instead of throwing out an error.
indicesStatsRequest.indicesOptions(IndicesOptions.lenientExpandOpenAndForbidClosed());
}
indicesStatsRequest.setShouldCancelOnTimeout(true);
indicesStatsRequest.all();
indicesStatsRequest.indices(indices);
Expand Down Expand Up @@ -149,9 +154,7 @@ public void onFailure(Exception e) {
}

private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) {
return Objects.isNull(pageParams)
? null
: new ShardPaginationStrategy(pageParams, clusterStateResponse.getState(), IndicesOptions.strictExpandOpenAndForbidClosed());
return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState());
}

private void validateRequestLimit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.action.pagination;

import org.opensearch.OpenSearchParseException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -38,23 +37,14 @@ public class ShardPaginationStrategy implements PaginationStrategy<ShardRouting>
private PageData pageData;

public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState) {
this(pageParams, clusterState, null);
}

public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState, IndicesOptions indicesOptions) {
ShardStrategyToken shardStrategyToken = getShardStrategyToken(pageParams.getRequestedToken());
// Get list of indices metadata sorted by their creation time and filtered by the last sent index
List<IndexMetadata> filteredIndices = PaginationStrategy.getSortedIndexMetadata(
List<IndexMetadata> filteredIndices = getEligibleIndices(
clusterState,
getMetadataFilter(
pageParams.getSort(),
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName,
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime,
indicesOptions
),
PageParams.PARAM_ASC_SORT_VALUE.equals(pageParams.getSort()) ? ASC_COMPARATOR : DESC_COMPARATOR
pageParams.getSort(),
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName,
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime
);

// Get the list of shards and indices belonging to current page.
this.pageData = getPageData(
filteredIndices,
Expand All @@ -64,31 +54,39 @@ public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState,
);
}

private static Predicate<IndexMetadata> getMetadataFilter(
private static List<IndexMetadata> getEligibleIndices(
ClusterState clusterState,
String sortOrder,
String lastIndexName,
Long lastIndexCreationTime,
IndicesOptions indicesOptions
Long lastIndexCreationTime
) {
if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) {
return indexStateFilter(indicesOptions);
return PaginationStrategy.getSortedIndexMetadata(
clusterState,
PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR
);
} else {
return PaginationStrategy.getSortedIndexMetadata(
clusterState,
getMetadataFilter(sortOrder, lastIndexName, lastIndexCreationTime),
PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR
);
}
}

private static Predicate<IndexMetadata> getMetadataFilter(String sortOrder, String lastIndexName, Long lastIndexCreationTime) {
if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) {
return indexMetadata -> true;
}
return indexNameFilter(lastIndexName).or(
IndexPaginationStrategy.getIndexCreateTimeFilter(sortOrder, lastIndexName, lastIndexCreationTime)
).and(indexStateFilter(indicesOptions));
);
}

private static Predicate<IndexMetadata> indexNameFilter(String lastIndexName) {
return metadata -> metadata.getIndex().getName().equals(lastIndexName);
}

private static Predicate<IndexMetadata> indexStateFilter(IndicesOptions indicesOptions) {
if (Objects.isNull(indicesOptions) || !indicesOptions.forbidClosedIndices()) {
return metadata -> true;
}
return metadata -> metadata.getState().equals(IndexMetadata.State.OPEN);
}

/**
* Will be used to get the list of shards and respective indices to which they belong,
* which are to be displayed in a page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ public enum Option {
EnumSet.of(Option.ALLOW_NO_INDICES, Option.IGNORE_UNAVAILABLE),
EnumSet.of(WildcardStates.OPEN, WildcardStates.CLOSED, WildcardStates.HIDDEN)
);
public static final IndicesOptions LENIENT_EXPAND_OPEN_FORBID_CLOSED = new IndicesOptions(
EnumSet.of(Option.ALLOW_NO_INDICES, Option.IGNORE_UNAVAILABLE, Option.FORBID_CLOSED_INDICES),
EnumSet.of(WildcardStates.OPEN)
);
public static final IndicesOptions STRICT_EXPAND_OPEN_CLOSED = new IndicesOptions(
EnumSet.of(Option.ALLOW_NO_INDICES),
EnumSet.of(WildcardStates.OPEN, WildcardStates.CLOSED)
Expand Down Expand Up @@ -654,6 +658,15 @@ public static IndicesOptions lenientExpandHidden() {
return LENIENT_EXPAND_OPEN_CLOSED_HIDDEN;
}

/**
* @return indices options that ignores unavailable indices and forbids closed indices (not return error if explicitly queried),
* expands wildcards to all open and closed indices and allows that no indices are resolved
* from wildcard expressions (not returning an error).
*/
public static IndicesOptions lenientExpandOpenAndForbidClosed() {
return LENIENT_EXPAND_OPEN_FORBID_CLOSED;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -394,45 +393,6 @@ public void testRetrieveShardsWhenLastIndexGetsDeletedAndReCreated() {
assertNull(strategy.getResponseToken().getNextToken());
}

/**
* Validates strategy filters out CLOSED indices, if forbidClosed() indices options are provided.
*/
public void testNoClosedIndicesReturnedByStrategy() {
final int pageSize = DEFAULT_NUMBER_OF_SHARDS * (DEFAULT_NUMBER_OF_REPLICAS + 1);
ClusterState clusterState = getRandomClusterState(List.of(0, 1, 2, 3, 4, 5));
// Add 2 closed indices to cluster state
clusterState = addIndexToClusterState(
clusterState,
6,
DEFAULT_NUMBER_OF_SHARDS,
DEFAULT_NUMBER_OF_REPLICAS,
IndexMetadata.State.CLOSE
);
clusterState = addIndexToClusterState(
clusterState,
7,
DEFAULT_NUMBER_OF_SHARDS,
DEFAULT_NUMBER_OF_REPLICAS,
IndexMetadata.State.CLOSE
);
List<ShardRouting> shardRoutings = new ArrayList<>();
List<String> indices = new ArrayList<>();
String requestedToken = null;
ShardPaginationStrategy strategy;
do {
PageParams pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize);
strategy = new ShardPaginationStrategy(pageParams, clusterState, IndicesOptions.strictExpandOpenAndForbidClosed());
requestedToken = strategy.getResponseToken().getNextToken();
shardRoutings.addAll(strategy.getRequestedEntities());
indices.addAll(strategy.getRequestedIndices());
} while (requestedToken != null);
// assert that the closed indices do not appear in the response
assertFalse(indices.contains(TEST_INDEX_PREFIX + 6));
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 6)));
assertFalse(indices.contains(TEST_INDEX_PREFIX + 7));
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 7)));
}

public void testCreatingShardStrategyPageTokenWithRequestedTokenNull() {
try {
new ShardPaginationStrategy.ShardStrategyToken(null);
Expand Down Expand Up @@ -518,40 +478,11 @@ private ClusterState addIndexToClusterState(
final int numShards,
final int numReplicas,
final long creationTime
) {
return addIndexToClusterState(clusterState, indexNumber, numShards, numReplicas, creationTime, IndexMetadata.State.OPEN);
}

private ClusterState addIndexToClusterState(
ClusterState clusterState,
final int indexNumber,
final int numShards,
final int numReplicas,
final IndexMetadata.State state
) {
return addIndexToClusterState(
clusterState,
indexNumber,
numShards,
numReplicas,
Instant.now().plus(indexNumber, ChronoUnit.SECONDS).toEpochMilli(),
state
);
}

private ClusterState addIndexToClusterState(
ClusterState clusterState,
final int indexNumber,
final int numShards,
final int numReplicas,
final long creationTime,
final IndexMetadata.State state
) {
IndexMetadata indexMetadata = IndexMetadata.builder(TEST_INDEX_PREFIX + indexNumber)
.settings(settings(Version.CURRENT).put(SETTING_CREATION_DATE, creationTime))
.numberOfShards(numShards)
.numberOfReplicas(numReplicas)
.state(state)
.build();
IndexRoutingTable.Builder indexRoutingTableBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew(
indexMetadata
Expand Down

0 comments on commit b6a8c8b

Please sign in to comment.