Skip to content

Commit

Permalink
Only check on destroy
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Dec 5, 2024
1 parent aba5f7d commit de9ecf1
Showing 1 changed file with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,52 +223,52 @@ public boolean isStarted() {
return started;
}

public void stop() throws IOException {
public void stop(boolean checkActiveThreads) throws IOException {
Client client = clientNode().getInternalNodeClient();
AdminClient adminClient = client.admin();

// TODO Try the following code 3 times to see if there are active threads
if (checkActiveThreads) {
int maxRetries = 3;
int retryCount = 0;
boolean threadsActive = true;

int maxRetries = 3;
int retryCount = 0;
boolean threadsActive = true;

while (retryCount < maxRetries && threadsActive) {
try {
threadsActive = false;
final NodesStatsResponse nodesStatsResponse = adminClient.cluster()
.nodesStats(new NodesStatsRequest().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName()))
.actionGet();

for (NodeStats stats : nodesStatsResponse.getNodes()) {
for (ThreadPoolStats.Stats stat : requireNonNull(stats.getThreadPool())) {
if ("management".equals(stat.getName())) {
continue;
while (retryCount < maxRetries && threadsActive) {
try {
threadsActive = false;
final NodesStatsResponse nodesStatsResponse = adminClient.cluster()
.nodesStats(new NodesStatsRequest().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName()))
.actionGet();

for (NodeStats stats : nodesStatsResponse.getNodes()) {
for (ThreadPoolStats.Stats stat : requireNonNull(stats.getThreadPool())) {
if ("management".equals(stat.getName())) {
continue;
}
if (stat.getActive() > 0) {
log.warn("Thread pool {} has {} active threads", stat.getName(), stat.getActive());
threadsActive = true;
break;
}
}
if (stat.getActive() > 0) {
log.warn("Thread pool {} has {} active threads", stat.getName(), stat.getActive());
threadsActive = true;
if (threadsActive) {
break;
}
}
if (threadsActive) {
break;
}
}

if (threadsActive && retryCount < maxRetries - 1) {
// Add a small delay between retries
Thread.sleep(3000);
if (threadsActive && retryCount < maxRetries - 1) {
// Add a small delay between retries
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting between retries", e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting between retries", e);
retryCount++;
}
retryCount++;
}

if (threadsActive) {
throw new IOException("Thread pools still have active threads after " + maxRetries + " attempts");
if (threadsActive) {
throw new IOException("Thread pools still have active threads after " + maxRetries + " attempts");
}
}

List<CompletableFuture<Boolean>> stopFutures = new ArrayList<>();
Expand All @@ -280,7 +280,7 @@ public void stop() throws IOException {

public void destroy() throws IOException {
try {
stop();
stop(true);
nodes.clear();
} finally {
try {
Expand Down Expand Up @@ -325,7 +325,7 @@ private void retry() throws Exception {
throw new RuntimeException("Detected port collisions for cluster manager node. Giving up.");
}

stop();
stop(false);

this.nodes.clear();
this.seedHosts = null;
Expand Down

0 comments on commit de9ecf1

Please sign in to comment.