diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 6e2fed76afa..b26b1405f0c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1155,7 +1155,11 @@ public enum Property {
@Experimental
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
"compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION,
- "The interval at which to check for dead compactors.", "2.1.0");
+ "The interval at which to check for dead compactors.", "2.1.0"),
+ COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME(
+ "compaction.coordinator.wait.time.job.request.max", "2m", PropertyType.TIMEDURATION,
+ "The maximum amount of time the coordinator will wait for a requested job from the job queue.",
+ "4.0.0");
private final String key;
private final String defaultValue;
diff --git a/pom.xml b/pom.xml
index dedaefcc6d0..1dde7404e49 100644
--- a/pom.xml
+++ b/pom.xml
@@ -925,6 +925,8 @@
org.glassfish.jersey.ext:jersey-bean-validation:jar:*
org.glassfish.jersey.inject:jersey-hk2:jar:*
org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:jar:*
+
+ org.glassfish.jersey.containers:jersey-container-servlet:jar:*
org.powermock:powermock-api-easymock:jar:*
com.github.spotbugs:spotbugs-annotations:jar:*
diff --git a/server/base/pom.xml b/server/base/pom.xml
index c20b3e6edce..27bffec6853 100644
--- a/server/base/pom.xml
+++ b/server/base/pom.xml
@@ -35,6 +35,14 @@
com.beust
jcommander
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
com.github.ben-manes.caffeine
caffeine
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java
new file mode 100644
index 00000000000..a1ecea607c2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.rest;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.server.rest.ThriftSerializer.ENCODED;
+import static org.apache.accumulo.server.rest.ThriftSerializer.TYPE;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Jackson deserializer for thrift objects that delegates the deserialization of thrift to
+ * {@link TDeserializer}. It handles previously encoded serialized objects from
+ * {@link ThriftSerializer}
+ */
+public class ThriftDeserializer> extends JsonDeserializer {
+ @Override
+ public T deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+ JsonNode tree = p.readValueAsTree();
+
+ try {
+ var thriftClassName = tree.get(TYPE).asText();
+ var encoded = tree.get(ENCODED).asText();
+
+ Constructor constructor = getThriftClass(thriftClassName).getDeclaredConstructor();
+ T obj = constructor.newInstance();
+ deserialize(obj, encoded);
+
+ return obj;
+ } catch (ReflectiveOperationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Class getThriftClass(String className) throws ClassNotFoundException {
+ var clazz = Class.forName(className, false, ThriftDeserializer.class.getClassLoader());
+ // Note: This check is important to prevent potential security issues
+ // We don't want to allow arbitrary classes to be loaded
+ if (!TBase.class.isAssignableFrom(clazz)) {
+ throw new IllegalArgumentException("Class " + clazz + " is not assignable to TBase");
+ }
+ return (Class) clazz;
+ }
+
+ // TODO: It doesn't seem like TDeserializer is thread safe, is there a way
+ // to prevent creating a new deserializer for every object?
+ private static > void deserialize(T obj, String json) throws IOException {
+ try {
+ final TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
+ deserializer.deserialize(obj, json, UTF_8.name());
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java
new file mode 100644
index 00000000000..ce2233cc6e7
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.rest;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize(using = ThriftSerializer.class)
+@JsonDeserialize(using = ThriftDeserializer.class)
+public abstract class ThriftMixIn {
+
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java
new file mode 100644
index 00000000000..3534091775d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.rest;
+
+import java.io.IOException;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+/**
+ * Jackson serializer for thrift objects that delegates the serialization of thrift to
+ * {@link TSerializer} and also includes the serialized type for the deserializer to use
+ */
+public class ThriftSerializer> extends JsonSerializer {
+
+ static final String TYPE = "type";
+ static final String ENCODED = "encoded";
+
+ @Override
+ public void serialize(T value, JsonGenerator gen, SerializerProvider serializers)
+ throws IOException {
+ gen.writeStartObject();
+ gen.writeObjectField(TYPE, value.getClass());
+ gen.writeStringField(ENCODED, serialize(value));
+ gen.writeEndObject();
+ }
+
+ // TODO: It doesn't seem like TSerializer is thread safe, is there a way
+ // to prevent creating a new serializer for every object?
+ private static > String serialize(T obj) throws IOException {
+ try {
+ final TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(obj);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java b/server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java
new file mode 100644
index 00000000000..a54e51d91c0
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.rest.request;
+
+import org.apache.accumulo.core.clientImpl.thrift.TInfo;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+
+public class GetCompactionJobRequest {
+
+ private TInfo tinfo;
+ private TCredentials credentials;
+ private String groupName;
+ private String compactorAddress;
+ private String externalCompactionId;
+
+ public GetCompactionJobRequest() {}
+
+ public GetCompactionJobRequest(TInfo tinfo, TCredentials credentials, String groupName,
+ String compactorAddress, String externalCompactionId) {
+ this.tinfo = tinfo;
+ this.credentials = credentials;
+ this.groupName = groupName;
+ this.compactorAddress = compactorAddress;
+ this.externalCompactionId = externalCompactionId;
+ }
+
+ public TInfo getTinfo() {
+ return tinfo;
+ }
+
+ public void setTinfo(TInfo tinfo) {
+ this.tinfo = tinfo;
+ }
+
+ public TCredentials getCredentials() {
+ return credentials;
+ }
+
+ public void setCredentials(TCredentials credentials) {
+ this.credentials = credentials;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ public String getCompactorAddress() {
+ return compactorAddress;
+ }
+
+ public void setCompactorAddress(String compactorAddress) {
+ this.compactorAddress = compactorAddress;
+ }
+
+ public String getExternalCompactionId() {
+ return externalCompactionId;
+ }
+
+ public void setExternalCompactionId(String externalCompactionId) {
+ this.externalCompactionId = externalCompactionId;
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
index de3725f2525..9cfb9b89870 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
@@ -103,7 +103,7 @@ public static final class SystemToken extends PasswordToken {
*/
public SystemToken() {}
- private SystemToken(byte[] systemPassword) {
+ public SystemToken(byte[] systemPassword) {
super(systemPassword);
}
diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml
index 7779271cefb..2e7a9ceb9f1 100644
--- a/server/compactor/pom.xml
+++ b/server/compactor/pom.xml
@@ -30,6 +30,10 @@
accumulo-compactor
Apache Accumulo Compactor
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
com.google.auto.service
auto-service
@@ -71,6 +75,22 @@
org.apache.zookeeper
zookeeper
+
+ org.eclipse.jetty
+ jetty-client
+
+
+ org.eclipse.jetty
+ jetty-http
+
+
+ org.eclipse.jetty
+ jetty-io
+
+
+ org.eclipse.jetty
+ jetty-util
+
org.slf4j
slf4j-api
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 463937e9ba3..8fb26ac0688 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.compactor;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_ENTRIES_READ;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_ENTRIES_WRITTEN;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_STUCK;
@@ -26,6 +27,10 @@
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
@@ -46,6 +51,7 @@
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@ -57,6 +63,7 @@
import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob;
import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -66,6 +73,7 @@
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -89,6 +97,7 @@
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
@@ -116,18 +125,32 @@
import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rest.ThriftMixIn;
+import org.apache.accumulo.server.rest.request.GetCompactionJobRequest;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
+import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.tserver.log.LogSorter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
+import org.eclipse.jetty.client.util.BasicAuthentication;
+import org.eclipse.jetty.client.util.SPNEGOAuthentication;
+import org.eclipse.jetty.client.util.StringRequestContent;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.io.ClientConnector;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
@@ -470,8 +493,10 @@ protected TNextCompactionJob getNextJob(Supplier uuid) throws RetriesExcee
ExternalCompactionId eci = ExternalCompactionId.generate(uuid.get());
LOG.trace("Attempting to get next job, eci = {}", eci);
currentCompactionId.set(eci);
- return coordinatorClient.getCompactionJob(TraceUtil.traceInfo(),
- getContext().rpcCreds(), this.getResourceGroup(),
+
+ JsonCoordinatorService jsonClient = new JsonCoordinatorService(getContext());
+ return jsonClient.getCompactionJob(TraceUtil.traceInfo(), getContext().rpcCreds(),
+ this.getResourceGroup(),
ExternalCompactionUtil.getHostPortString(compactorAddress.getAddress()),
eci.toString());
} catch (Exception e) {
@@ -1006,4 +1031,171 @@ public String getRunningCompactionId(TInfo tinfo, TCredentials credentials)
}
}
+ protected static class JsonCoordinatorService implements CompactionCoordinatorService.Iface {
+ private final ServerContext context;
+ private final ObjectMapper mapper;
+ private final URI ccBaseUri;
+ private final Duration timeout;
+
+ private static final String GET_COMPACTION_JOB_ENDPOINT = "get-compaction-job";
+
+ protected JsonCoordinatorService(ServerContext context) throws TTransportException {
+ this.context = context;
+ var coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context);
+ if (coordinatorHost.isEmpty()) {
+ throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+ }
+ try {
+ String scheme = context.getThriftServerType() == ThriftServerType.SSL ? "https" : "http";
+ this.ccBaseUri =
+ new URL(scheme + "://" + coordinatorHost.orElseThrow().getHost() + ":8999/rest/cc/")
+ .toURI();
+ } catch (MalformedURLException | URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ mapper = new ObjectMapper();
+ // Register the custom serializer/deserializer to handle thrift classes
+ mapper.addMixIn(TBase.class, ThriftMixIn.class);
+
+ // TODO: We should add a property for this, for now just make the timeout 10 seconds longer
+ // than the coordinator job request timeout so the compactor will get back the empty job
+ // response under normal cases before timing out the http request
+ this.timeout = Duration
+ .ofMillis(context.getConfiguration()
+ .getTimeInMillis(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME))
+ .plus(Duration.ofSeconds(10));
+ }
+
+ @Override
+ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials,
+ String groupName, String compactor, String externalCompactionId) throws TException {
+
+ var cjr = new GetCompactionJobRequest(tinfo, credentials, groupName, compactor,
+ externalCompactionId);
+
+ var requestUri = ccBaseUri.resolve("./" + GET_COMPACTION_JOB_ENDPOINT);
+ LOG.info("requesturi {}", requestUri);
+
+ // TODO: reuse client between requests
+ // For this prototype use the Jetty Http client as it was quick to set up
+ // For a real server we'd want to make sure we are re-using the client and set
+ // things like thread pooling so we could look at configuring the Jetty client
+ // or using something else like the Apache HTTP 5 client could be a better choice
+ HttpClient httpClient = null;
+
+ try {
+ httpClient = newHttpClient(requestUri);
+ httpClient.start();
+
+ ContentResponse response =
+ httpClient.POST(requestUri).timeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
+ .headers(headers -> headers.put(HttpHeader.CONTENT_TYPE, "application/json"))
+ .body(new StringRequestContent(mapper.writeValueAsString(cjr))).send();
+
+ // If not 200 we have an error, such as 401
+ Preconditions.checkState(response.getStatus() == 200, "Response status code is %s",
+ response.getStatus());
+
+ return mapper.readValue(response.getContentAsString(), TNextCompactionJob.class);
+ } catch (Exception e) {
+ // TODO: This is wrapping errors for the http client in TException so that we can retry
+ // The current Retry logic only handles TException as it was meant for thrift so this
+ // was a quick way to re-use that logic for HTTP client issues (timeouts, etc)
+ // We would obviously need to clean this up if we plan to stick with REST for real
+ throw new TException(e);
+ } finally {
+ try {
+ // TODO: re use the client, don't re-create each time and start/stop
+ httpClient.stop();
+ } catch (Exception e) {
+ LOG.debug(e.getMessage(), e);
+ }
+ }
+ }
+
+ private HttpClient newHttpClient(URI requestUri) {
+ final HttpClient httpClient;
+
+ if (context.getThriftServerType() == ThriftServerType.SSL) {
+ SslContextFactory.Client sslContextFactory = getSslClient();
+ ClientConnector clientConnector = new ClientConnector();
+ clientConnector.setSslContextFactory(sslContextFactory);
+ httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector));
+ } else if (context.getThriftServerType() == ThriftServerType.SASL) {
+ httpClient = new HttpClient();
+ var principal = context.getConfiguration().get(Property.GENERAL_KERBEROS_PRINCIPAL);
+ SPNEGOAuthentication authentication = new SPNEGOAuthentication(requestUri);
+ authentication.setUserName(principal);
+ authentication.setUserKeyTabPath(java.nio.file.Path
+ .of(context.getConfiguration().getPath(Property.GENERAL_KERBEROS_KEYTAB)));
+ authentication.setServiceName("accumulo");
+ httpClient.getAuthenticationStore().addAuthentication(authentication);
+ } else {
+ httpClient = new HttpClient();
+ var creds = context.getCredentials();
+ BasicAuthentication authentication =
+ new BasicAuthentication(requestUri, "accumulo", creds.getPrincipal(),
+ new String(((PasswordToken) creds.getToken()).getPassword(), UTF_8));
+ httpClient.getAuthenticationStore().addAuthentication(authentication);
+ }
+ return httpClient;
+ }
+
+ private SslContextFactory.Client getSslClient() {
+ SslContextFactory.Client sslContextFactory = new SslContextFactory.Client();
+ SslConnectionParams sslParams = context.getClientSslParams();
+ sslContextFactory.setKeyStorePath(sslParams.getKeyStorePath());
+ sslContextFactory.setKeyStorePassword(sslParams.getKeyStorePass());
+ sslContextFactory.setKeyStoreType(sslParams.getKeyStoreType());
+ sslContextFactory.setTrustStorePath(sslParams.getTrustStorePath());
+ sslContextFactory.setTrustStorePassword(sslParams.getTrustStorePass());
+ sslContextFactory.setTrustStoreType(sslParams.getTrustStoreType());
+
+ // TODO: Disable hostname checks for testing
+ sslContextFactory.setEndpointIdentificationAlgorithm(null);
+ sslContextFactory.setHostnameVerifier((hostname, session) -> true);
+
+ return sslContextFactory;
+ }
+
+ // The following methods were not implemented but could be moved to Json as well if we
+ // decide to go with Jetty and REST
+ @Override
+ public void compactionCompleted(TInfo tinfo, TCredentials credentials,
+ String externalCompactionId, TKeyExtent extent, TCompactionStats stats) throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
+ String externalCompactionId, TCompactionStatusUpdate status, long timestamp)
+ throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId,
+ TKeyExtent extent) throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials)
+ throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials)
+ throws TException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId)
+ throws TException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
}
diff --git a/server/manager/pom.xml b/server/manager/pom.xml
index cb52ea5681f..ee921ca4f5a 100644
--- a/server/manager/pom.xml
+++ b/server/manager/pom.xml
@@ -31,6 +31,10 @@
Apache Accumulo Manager Server
The manager server for Apache Accumulo for load balancing and other system-wide operations.
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
com.github.ben-manes.caffeine
caffeine
@@ -64,6 +68,18 @@
io.opentelemetry
opentelemetry-context
+
+ jakarta.inject
+ jakarta.inject-api
+
+
+ jakarta.servlet
+ jakarta.servlet-api
+
+
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+
org.apache.accumulo
accumulo-core
@@ -100,6 +116,48 @@
org.checkerframework
checker-qual
+
+ org.eclipse.jetty
+ jetty-security
+
+
+ org.eclipse.jetty
+ jetty-server
+
+
+ org.eclipse.jetty
+ jetty-servlet
+
+
+ org.eclipse.jetty
+ jetty-util
+
+
+ org.glassfish.hk2
+ hk2-api
+
+
+
+ org.glassfish.jersey.containers
+ jersey-container-servlet
+
+
+ org.glassfish.jersey.containers
+ jersey-container-servlet-core
+
+
+ org.glassfish.jersey.core
+ jersey-common
+
+
+ org.glassfish.jersey.core
+ jersey-server
+
+
+ org.glassfish.jersey.media
+ jersey-media-json-jackson
+
org.slf4j
slf4j-api
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0be61f19bc3..11157df7882 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -28,6 +28,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
@@ -58,6 +59,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import jakarta.inject.Singleton;
+
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.admin.CompactionConfig;
@@ -122,6 +125,7 @@
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
+import org.apache.accumulo.manager.http.EmbeddedRpcWebServer;
import org.apache.accumulo.manager.metrics.BalancerMetrics;
import org.apache.accumulo.manager.metrics.ManagerMetrics;
import org.apache.accumulo.manager.recovery.RecoveryManager;
@@ -162,6 +166,13 @@
import org.apache.zookeeper.KeeperException.NoAuthException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.hk2.api.Factory;
+import org.glassfish.hk2.utilities.binding.AbstractBinder;
+import org.glassfish.jersey.jackson.JacksonFeature;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,6 +180,7 @@
import com.google.common.collect.Comparators;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
+import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -217,6 +229,7 @@ public class Manager extends AbstractServer
ServiceLock managerLock = null;
private TServer clientService = null;
+ private EmbeddedRpcWebServer restClientService;
protected volatile TabletBalancer tabletBalancer;
private final BalancerEnvironment balancerEnvironment;
private final BalancerMetrics balancerMetrics = new BalancerMetrics();
@@ -332,6 +345,12 @@ synchronized void setManagerState(final ManagerState newState) {
Manager.this.nextEvent.event("stopped event loop");
}, 100L, 1000L, MILLISECONDS);
ThreadPools.watchNonCriticalScheduledTask(future);
+ final var restFuture = getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> {
+ // This frees the main thread and will cause the manager to exit
+ restClientService.stop();
+ Manager.this.nextEvent.event("stopped event loop");
+ }, 100L, 1000L, MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(restFuture);
break;
case HAVE_LOCK:
if (isUpgrading()) {
@@ -1134,6 +1153,30 @@ public void run() {
clientService = sa.server;
log.info("Started Manager client service at {}", sa.address);
+ int restPort = 8999;
+ try {
+ restClientService = new EmbeddedRpcWebServer(this, restPort);
+ restClientService.addServlet(getRestServlet(), "/rest/*");
+ restClientService.start();
+
+ if (!restClientService.isRunning()) {
+ throw new RuntimeException("Unable to start rpc http server on port: " + restPort);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to start embedded web server " + getHostname(), e);
+ }
+
+ String advertiseHost = getHostname();
+ if (advertiseHost.equals("0.0.0.0")) {
+ try {
+ advertiseHost = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ log.error("Unable to get hostname", e);
+ }
+ }
+ HostAndPort restHostAndPort = HostAndPort.fromParts(advertiseHost, restPort);
+ log.info("Started Manager rpc rest client service at {}", restHostAndPort);
+
// block until we can obtain the ZK lock for the manager
ServiceLockData sld;
try {
@@ -1313,14 +1356,14 @@ boolean canSuspendTablets() {
throw new IllegalStateException("Exception updating manager lock", e);
}
- while (!clientService.isServing()) {
+ while (!clientService.isServing() || !restClientService.isRunning()) {
sleepUninterruptibly(100, MILLISECONDS);
}
// The manager is fully initialized. Clients are allowed to connect now.
managerInitialized.set(true);
- while (clientService.isServing()) {
+ while (clientService.isServing() && restClientService.isRunning()) {
sleepUninterruptibly(500, MILLISECONDS);
}
log.info("Shutting down fate.");
@@ -1866,4 +1909,34 @@ private Map> getFateRefs() {
Preconditions.checkState(fateRefs != null, "Unexpected null fate references map");
return fateRefs;
}
+
+ public static class ManagerFactory extends AbstractBinder implements Factory {
+
+ private final Manager manager;
+
+ public ManagerFactory(Manager manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public Manager provide() {
+ return manager;
+ }
+
+ @Override
+ public void dispose(Manager instance) {}
+
+ @Override
+ protected void configure() {
+ bindFactory(this).to(Manager.class).in(Singleton.class);
+ }
+ }
+
+ private ServletHolder getRestServlet() {
+ final ResourceConfig rc = new ResourceConfig().packages("org.apache.accumulo.manager.http.rest")
+ .register(new ManagerFactory(this))
+ .register(new LoggingFeature(java.util.logging.Logger.getLogger(this.getClass().getName())))
+ .register(JacksonFeature.class);
+ return new ServletHolder(new ServletContainer(rc));
+ }
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 7a0b275994b..51fae7f3ade 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.manager.compaction.coordinator;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
@@ -50,6 +51,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -137,6 +139,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,6 +198,7 @@ public class CompactionCoordinator
private final int jobQueueInitialSize;
private volatile long coordinatorStartTime;
+ private final long maxJobRequestWaitTime;
public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
AtomicReference