Skip to content

Commit

Permalink
Added Executor and increased timeout to help with stability (#301)
Browse files Browse the repository at this point in the history
* Added Executor and increased timeout to help with stability
  • Loading branch information
Patrick Duin authored Oct 24, 2023
1 parent 63142df commit 9238dfb
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 7 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@

## [3.11.6] - 2023-10-24
### Fixed
- Switch to ExecutorService instead of the default `ForkJoinPool` for `MetastoreMappingImpl.isAvailable()` calls. Using `ForkJoinPool` may cause threads to wait on each other.
- Increased default `MetastoreMappingImpl.isAvailable()` timeout to `2000ms` (was `500ms`) to set a bit more conservative default.

## [3.11.5] - 2023-10-23
### Fixed
- Added timeout on `MetastoreMappingImpl.isAvailable()` calls to prevent long waits on unresponsive metastores.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2022 Expedia, Inc.
* Copyright (C) 2016-2023 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -247,6 +247,7 @@ public String toString() {
.add("metastoreTunnel", metastoreTunnel)
.add("accessControlType", accessControlType)
.add("writableDatabaseWhiteList", writableDatabaseWhitelist)
.add("latency", latency)
.add("status", status)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void open(HiveUgiArgs ugiArgs) {
// Wait before launching the next round of connection retries.
if (!isConnected && (retryDelaySeconds > 0) && ((attempt + 1) < retries)) {
try {
LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt.");
LOG.debug("Waiting " + retryDelaySeconds + " seconds before next connection attempt.");
Thread.sleep(retryDelaySeconds * 1000);
} catch (InterruptedException ignore) {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -45,16 +47,16 @@ class MetaStoreMappingImpl implements MetaStoreMapping {
private final static Logger log = LoggerFactory.getLogger(MetaStoreMappingImpl.class);

// MilliSeconds
static final long DEFAULT_AVAILABILITY_TIMEOUT = 500;
static final long DEFAULT_AVAILABILITY_TIMEOUT = 2000;

private final String databasePrefix;
private final CloseableThriftHiveMetastoreIface client;
private final AccessControlHandler accessControlHandler;
private final String name;
private final long latency;
private final MetaStoreFilterHook metastoreFilter;

private final ConnectionType connectionType;
private final ExecutorService executor = Executors.newSingleThreadExecutor();

MetaStoreMappingImpl(
String databasePrefix,
Expand Down Expand Up @@ -112,6 +114,7 @@ public String getDatabasePrefix() {
@Override
public void close() throws IOException {
client.close();
executor.shutdownNow();
}

/**
Expand All @@ -130,11 +133,12 @@ public boolean isAvailable() {
log.error("Metastore Mapping {} unavailable", name, e);
return false;
}
});
}, executor);
long timeout = DEFAULT_AVAILABILITY_TIMEOUT + getLatency();
try {
return future.get(DEFAULT_AVAILABILITY_TIMEOUT + getLatency(), TimeUnit.MILLISECONDS);
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.info("Took too long to check availability of '" + name + "', assuming unavailable");
log.info("Took too long (>" + timeout + "ms) to check availability of '" + name + "', assuming unavailable");
future.cancel(true);
} catch (InterruptedException | ExecutionException e) {
log.error("Error while checking availability '" + name + "', assuming unavailable");
Expand Down

0 comments on commit 9238dfb

Please sign in to comment.