diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalOpenSearchCluster.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalOpenSearchCluster.java index 8570c3d398..d54bbe0f10 100644 --- a/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalOpenSearchCluster.java +++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalOpenSearchCluster.java @@ -55,6 +55,9 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.cluster.health.ClusterHealthStatus; @@ -67,6 +70,7 @@ import org.opensearch.plugins.Plugin; import org.opensearch.test.framework.certificate.TestCertificates; import org.opensearch.test.framework.cluster.ClusterManager.NodeSettings; +import org.opensearch.threadpool.ThreadPoolStats; import org.opensearch.transport.BindTransportException; import static java.util.Objects.requireNonNull; @@ -219,7 +223,26 @@ public boolean isStarted() { return started; } - public void stop() { + public void stop() throws IOException { + Client client = clientNode().getInternalNodeClient(); + AdminClient adminClient = client.admin(); + + final NodesStatsResponse nodesStatsResponse = adminClient.cluster() + .nodesStats(new NodesStatsRequest().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName())) + .actionGet(); + + for (NodeStats stats : nodesStatsResponse.getNodes()) { + for (ThreadPoolStats.Stats stat : requireNonNull(stats.getThreadPool())) { + if ("management".equals(stat.getName())) { + continue; + } + if (stat.getActive() > 0) { + log.warn("Thread pool {} has {} active threads", stat.getName(), stat.getActive()); + throw new IOException("Thread pool " + stat.getName() + " has " + stat.getActive() + " active threads"); + } + } + } + List> stopFutures = new ArrayList<>(); for (Node node : nodes) { stopFutures.add(node.stop(2, TimeUnit.SECONDS)); @@ -227,7 +250,7 @@ public void stop() { CompletableFuture.allOf(stopFutures.toArray(CompletableFuture[]::new)).join(); } - public void destroy() { + public void destroy() throws IOException { try { stop(); nodes.clear();