Skip to content

Commit

Permalink
Assert no active threads
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Dec 3, 2024
1 parent 64f4d5b commit 692ef32
Showing 1 changed file with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
import org.apache.logging.log4j.Logger;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.cluster.health.ClusterHealthStatus;
Expand All @@ -67,6 +70,7 @@
import org.opensearch.plugins.Plugin;
import org.opensearch.test.framework.certificate.TestCertificates;
import org.opensearch.test.framework.cluster.ClusterManager.NodeSettings;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.BindTransportException;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -219,15 +223,34 @@ public boolean isStarted() {
return started;
}

public void stop() {
public void stop() throws IOException {
Client client = clientNode().getInternalNodeClient();
AdminClient adminClient = client.admin();

final NodesStatsResponse nodesStatsResponse = adminClient.cluster()
.nodesStats(new NodesStatsRequest().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName()))
.actionGet();

for (NodeStats stats : nodesStatsResponse.getNodes()) {
for (ThreadPoolStats.Stats stat : requireNonNull(stats.getThreadPool())) {
if ("management".equals(stat.getName())) {
continue;
}
if (stat.getActive() > 0) {
log.warn("Thread pool {} has {} active threads", stat.getName(), stat.getActive());
throw new IOException("Thread pool " + stat.getName() + " has " + stat.getActive() + " active threads");
}
}
}

List<CompletableFuture<Boolean>> stopFutures = new ArrayList<>();
for (Node node : nodes) {
stopFutures.add(node.stop(2, TimeUnit.SECONDS));
}
CompletableFuture.allOf(stopFutures.toArray(CompletableFuture[]::new)).join();
}

public void destroy() {
public void destroy() throws IOException {
try {
stop();
nodes.clear();
Expand Down

0 comments on commit 692ef32

Please sign in to comment.