From 8bcc1a89029d8f8d0bb2fa0b370ee42f83876b30 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Fri, 25 Oct 2024 12:59:08 -0400 Subject: [PATCH 1/2] Switch getCompactionJob RPC to be async using Jetty This change switches getCompactionJob() rpc call to be an async REST call using Jetty and Jersey so that we can use long polling and not block while waiting for jobs on the queue. Jetty is configured to use Jackson and a custom serializer/deserializer to handle Thrift objects being serialized to json. Authentication has been set up to work with username/password, SSL, and also Kerberos. Right now for the prototype the authentication is not quite complete and a couple shortcuts were taken to get it working with the IT set up so some more work needs to be done before it is finished. --- .../apache/accumulo/core/conf/Property.java | 6 +- pom.xml | 2 + server/base/pom.xml | 8 + .../server/rest/ThriftDeserializer.java | 83 ++++++ .../accumulo/server/rest/ThriftMixIn.java | 28 ++ .../server/rest/ThriftSerializer.java | 60 +++++ .../rest/request/GetCompactionJobRequest.java | 82 ++++++ .../server/security/SystemCredentials.java | 2 +- server/compactor/pom.xml | 20 ++ .../apache/accumulo/compactor/Compactor.java | 196 +++++++++++++- server/manager/pom.xml | 58 ++++ .../org/apache/accumulo/manager/Manager.java | 77 +++++- .../coordinator/CompactionCoordinator.java | 74 +++-- .../manager/http/EmbeddedRpcWebServer.java | 252 ++++++++++++++++++ .../rest/CompactionCoordinatorResource.java | 79 ++++++ .../http/rest/ThriftJacksonProvider.java | 51 ++++ .../compaction/CompactionCoordinatorTest.java | 47 +++- .../accumulo/harness/MiniClusterHarness.java | 6 + .../ExternalCompactionKerberosIT.java | 110 ++++++++ .../compaction/ExternalCompactionSslIT.java | 101 +++++++ .../ExternalCompactionTestUtils.java | 8 +- 21 files changed, 1307 insertions(+), 43 deletions(-) create mode 100644 server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java create mode 100644 server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java create mode 100644 server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java create mode 100644 server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/http/EmbeddedRpcWebServer.java create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/http/rest/ThriftJacksonProvider.java create mode 100644 test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java create mode 100644 test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionSslIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 669d812f38e..8d5bb49df49 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1149,7 +1149,11 @@ public enum Property { @Experimental COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL( "compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION, - "The interval at which to check for dead compactors.", "2.1.0"); + "The interval at which to check for dead compactors.", "2.1.0"), + COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME( + "compaction.coordinator.wait.time.job.request.max", "2m", PropertyType.TIMEDURATION, + "The maximum amount of time the coordinator will wait for a requested job from the job queue.", + "4.0.0"); private final String key; private final String defaultValue; diff --git a/pom.xml b/pom.xml index dedaefcc6d0..1dde7404e49 100644 --- a/pom.xml +++ b/pom.xml @@ -925,6 +925,8 @@ org.glassfish.jersey.ext:jersey-bean-validation:jar:* org.glassfish.jersey.inject:jersey-hk2:jar:* org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:jar:* + + org.glassfish.jersey.containers:jersey-container-servlet:jar:* org.powermock:powermock-api-easymock:jar:* com.github.spotbugs:spotbugs-annotations:jar:* diff --git a/server/base/pom.xml b/server/base/pom.xml index c20b3e6edce..27bffec6853 100644 --- a/server/base/pom.xml +++ b/server/base/pom.xml @@ -35,6 +35,14 @@ com.beust jcommander + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + com.github.ben-manes.caffeine caffeine diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java new file mode 100644 index 00000000000..a1ecea607c2 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rest; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.server.rest.ThriftSerializer.ENCODED; +import static org.apache.accumulo.server.rest.ThriftSerializer.TYPE; + +import java.io.IOException; +import java.lang.reflect.Constructor; + +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; + +/** + * Jackson deserializer for thrift objects that delegates the deserialization of thrift to + * {@link TDeserializer}. It handles previously encoded serialized objects from + * {@link ThriftSerializer} + */ +public class ThriftDeserializer> extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode tree = p.readValueAsTree(); + + try { + var thriftClassName = tree.get(TYPE).asText(); + var encoded = tree.get(ENCODED).asText(); + + Constructor constructor = getThriftClass(thriftClassName).getDeclaredConstructor(); + T obj = constructor.newInstance(); + deserialize(obj, encoded); + + return obj; + } catch (ReflectiveOperationException e) { + throw new IOException(e); + } + } + + @SuppressWarnings("unchecked") + private Class getThriftClass(String className) throws ClassNotFoundException { + var clazz = Class.forName(className, false, ThriftDeserializer.class.getClassLoader()); + // Note: This check is important to prevent potential security issues + // We don't want to allow arbitrary classes to be loaded + if (!TBase.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException("Class " + clazz + " is not assignable to TBase"); + } + return (Class) clazz; + } + + // TODO: It doesn't seem like TDeserializer is thread safe, is there a way + // to prevent creating a new deserializer for every object? + private static > void deserialize(T obj, String json) throws IOException { + try { + final TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory()); + deserializer.deserialize(obj, json, UTF_8.name()); + } catch (TException e) { + throw new IOException(e); + } + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java new file mode 100644 index 00000000000..ce2233cc6e7 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rest; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +@JsonSerialize(using = ThriftSerializer.class) +@JsonDeserialize(using = ThriftDeserializer.class) +public abstract class ThriftMixIn { + +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java new file mode 100644 index 00000000000..3534091775d --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rest; + +import java.io.IOException; + +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +/** + * Jackson serializer for thrift objects that delegates the serialization of thrift to + * {@link TSerializer} and also includes the serialized type for the deserializer to use + */ +public class ThriftSerializer> extends JsonSerializer { + + static final String TYPE = "type"; + static final String ENCODED = "encoded"; + + @Override + public void serialize(T value, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + gen.writeStartObject(); + gen.writeObjectField(TYPE, value.getClass()); + gen.writeStringField(ENCODED, serialize(value)); + gen.writeEndObject(); + } + + // TODO: It doesn't seem like TSerializer is thread safe, is there a way + // to prevent creating a new serializer for every object? + private static > String serialize(T obj) throws IOException { + try { + final TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(obj); + } catch (TException e) { + throw new IOException(e); + } + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java b/server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java new file mode 100644 index 00000000000..a54e51d91c0 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rest.request; + +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; + +public class GetCompactionJobRequest { + + private TInfo tinfo; + private TCredentials credentials; + private String groupName; + private String compactorAddress; + private String externalCompactionId; + + public GetCompactionJobRequest() {} + + public GetCompactionJobRequest(TInfo tinfo, TCredentials credentials, String groupName, + String compactorAddress, String externalCompactionId) { + this.tinfo = tinfo; + this.credentials = credentials; + this.groupName = groupName; + this.compactorAddress = compactorAddress; + this.externalCompactionId = externalCompactionId; + } + + public TInfo getTinfo() { + return tinfo; + } + + public void setTinfo(TInfo tinfo) { + this.tinfo = tinfo; + } + + public TCredentials getCredentials() { + return credentials; + } + + public void setCredentials(TCredentials credentials) { + this.credentials = credentials; + } + + public String getGroupName() { + return groupName; + } + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + public String getCompactorAddress() { + return compactorAddress; + } + + public void setCompactorAddress(String compactorAddress) { + this.compactorAddress = compactorAddress; + } + + public String getExternalCompactionId() { + return externalCompactionId; + } + + public void setExternalCompactionId(String externalCompactionId) { + this.externalCompactionId = externalCompactionId; + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java index de3725f2525..9cfb9b89870 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java @@ -103,7 +103,7 @@ public static final class SystemToken extends PasswordToken { */ public SystemToken() {} - private SystemToken(byte[] systemPassword) { + public SystemToken(byte[] systemPassword) { super(systemPassword); } diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml index 7779271cefb..2e7a9ceb9f1 100644 --- a/server/compactor/pom.xml +++ b/server/compactor/pom.xml @@ -30,6 +30,10 @@ accumulo-compactor Apache Accumulo Compactor + + com.fasterxml.jackson.core + jackson-databind + com.google.auto.service auto-service @@ -71,6 +75,22 @@ org.apache.zookeeper zookeeper + + org.eclipse.jetty + jetty-client + + + org.eclipse.jetty + jetty-http + + + org.eclipse.jetty + jetty-io + + + org.eclipse.jetty + jetty-util + org.slf4j slf4j-api diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 77d8651bec5..4ce6b42e99c 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -19,6 +19,7 @@ package org.apache.accumulo.compactor; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_ENTRIES_READ; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_ENTRIES_WRITTEN; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_STUCK; @@ -26,6 +27,10 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; @@ -45,6 +50,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; @@ -56,6 +62,7 @@ import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -65,6 +72,7 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -88,6 +96,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; @@ -115,18 +124,32 @@ import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.rest.ThriftMixIn; +import org.apache.accumulo.server.rest.request.GetCompactionJobRequest; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; +import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.tserver.log.LogSorter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; +import org.eclipse.jetty.client.util.BasicAuthentication; +import org.eclipse.jetty.client.util.SPNEGOAuthentication; +import org.eclipse.jetty.client.util.StringRequestContent; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.util.ssl.SslContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; @@ -468,8 +491,10 @@ protected TNextCompactionJob getNextJob(Supplier uuid) throws RetriesExcee ExternalCompactionId eci = ExternalCompactionId.generate(uuid.get()); LOG.trace("Attempting to get next job, eci = {}", eci); currentCompactionId.set(eci); - return coordinatorClient.getCompactionJob(TraceUtil.traceInfo(), - getContext().rpcCreds(), this.getResourceGroup(), + + JsonCoordinatorService jsonClient = new JsonCoordinatorService(getContext()); + return jsonClient.getCompactionJob(TraceUtil.traceInfo(), getContext().rpcCreds(), + this.getResourceGroup(), ExternalCompactionUtil.getHostPortString(compactorAddress.getAddress()), eci.toString()); } catch (Exception e) { @@ -1000,4 +1025,171 @@ public String getRunningCompactionId(TInfo tinfo, TCredentials credentials) } } + protected static class JsonCoordinatorService implements CompactionCoordinatorService.Iface { + private final ServerContext context; + private final ObjectMapper mapper; + private final URI ccBaseUri; + private final Duration timeout; + + private static final String GET_COMPACTION_JOB_ENDPOINT = "get-compaction-job"; + + protected JsonCoordinatorService(ServerContext context) throws TTransportException { + this.context = context; + var coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); + if (coordinatorHost.isEmpty()) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + try { + String scheme = context.getThriftServerType() == ThriftServerType.SSL ? "https" : "http"; + this.ccBaseUri = + new URL(scheme + "://" + coordinatorHost.orElseThrow().getHost() + ":8999/rest/cc/") + .toURI(); + } catch (MalformedURLException | URISyntaxException e) { + throw new RuntimeException(e); + } + mapper = new ObjectMapper(); + // Register the custom serializer/deserializer to handle thrift classes + mapper.addMixIn(TBase.class, ThriftMixIn.class); + + // TODO: We should add a property for this, for now just make the timeout 10 seconds longer + // than the coordinator job request timeout so the compactor will get back the empty job + // response under normal cases before timing out the http request + this.timeout = Duration + .ofMillis(context.getConfiguration() + .getTimeInMillis(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME)) + .plus(Duration.ofSeconds(10)); + } + + @Override + public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, + String groupName, String compactor, String externalCompactionId) throws TException { + + var cjr = new GetCompactionJobRequest(tinfo, credentials, groupName, compactor, + externalCompactionId); + + var requestUri = ccBaseUri.resolve("./" + GET_COMPACTION_JOB_ENDPOINT); + LOG.info("requesturi {}", requestUri); + + // TODO: reuse client between requests + // For this prototype use the Jetty Http client as it was quick to set up + // For a real server we'd want to make sure we are re-using the client and set + // things like thread pooling so we could look at configuring the Jetty client + // or using something else like the Apache HTTP 5 client could be a better choice + HttpClient httpClient = null; + + try { + httpClient = newHttpClient(requestUri); + httpClient.start(); + + ContentResponse response = + httpClient.POST(requestUri).timeout(timeout.toMillis(), TimeUnit.MILLISECONDS) + .headers(headers -> headers.put(HttpHeader.CONTENT_TYPE, "application/json")) + .body(new StringRequestContent(mapper.writeValueAsString(cjr))).send(); + + // If not 200 we have an error, such as 401 + Preconditions.checkState(response.getStatus() == 200, "Response status code is %s", + response.getStatus()); + + return mapper.readValue(response.getContentAsString(), TNextCompactionJob.class); + } catch (Exception e) { + // TODO: This is wrapping errors for the http client in TException so that we can retry + // The current Retry logic only handles TException as it was meant for thrift so this + // was a quick way to re-use that logic for HTTP client issues (timeouts, etc) + // We would obviously need to clean this up if we plan to stick with REST for real + throw new TException(e); + } finally { + try { + // TODO: re use the client, don't re-create each time and start/stop + httpClient.stop(); + } catch (Exception e) { + LOG.debug(e.getMessage(), e); + } + } + } + + private HttpClient newHttpClient(URI requestUri) { + final HttpClient httpClient; + + if (context.getThriftServerType() == ThriftServerType.SSL) { + SslContextFactory.Client sslContextFactory = getSslClient(); + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSslContextFactory(sslContextFactory); + httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector)); + } else if (context.getThriftServerType() == ThriftServerType.SASL) { + httpClient = new HttpClient(); + var principal = context.getConfiguration().get(Property.GENERAL_KERBEROS_PRINCIPAL); + SPNEGOAuthentication authentication = new SPNEGOAuthentication(requestUri); + authentication.setUserName(principal); + authentication.setUserKeyTabPath(java.nio.file.Path + .of(context.getConfiguration().getPath(Property.GENERAL_KERBEROS_KEYTAB))); + authentication.setServiceName("accumulo"); + httpClient.getAuthenticationStore().addAuthentication(authentication); + } else { + httpClient = new HttpClient(); + var creds = context.getCredentials(); + BasicAuthentication authentication = + new BasicAuthentication(requestUri, "accumulo", creds.getPrincipal(), + new String(((PasswordToken) creds.getToken()).getPassword(), UTF_8)); + httpClient.getAuthenticationStore().addAuthentication(authentication); + } + return httpClient; + } + + private SslContextFactory.Client getSslClient() { + SslContextFactory.Client sslContextFactory = new SslContextFactory.Client(); + SslConnectionParams sslParams = context.getClientSslParams(); + sslContextFactory.setKeyStorePath(sslParams.getKeyStorePath()); + sslContextFactory.setKeyStorePassword(sslParams.getKeyStorePass()); + sslContextFactory.setKeyStoreType(sslParams.getKeyStoreType()); + sslContextFactory.setTrustStorePath(sslParams.getTrustStorePath()); + sslContextFactory.setTrustStorePassword(sslParams.getTrustStorePass()); + sslContextFactory.setTrustStoreType(sslParams.getTrustStoreType()); + + // TODO: Disable hostname checks for testing + sslContextFactory.setEndpointIdentificationAlgorithm(null); + sslContextFactory.setHostnameVerifier((hostname, session) -> true); + + return sslContextFactory; + } + + // The following methods were not implemented but could be moved to Json as well if we + // decide to go with Jetty and REST + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent extent, TCompactionStats stats) throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TCompactionStatusUpdate status, long timestamp) + throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent) throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials) + throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials) + throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { + throw new UnsupportedOperationException(); + } + } + } diff --git a/server/manager/pom.xml b/server/manager/pom.xml index cb52ea5681f..ee921ca4f5a 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -31,6 +31,10 @@ Apache Accumulo Manager Server The manager server for Apache Accumulo for load balancing and other system-wide operations. + + com.fasterxml.jackson.core + jackson-databind + com.github.ben-manes.caffeine caffeine @@ -64,6 +68,18 @@ io.opentelemetry opentelemetry-context + + jakarta.inject + jakarta.inject-api + + + jakarta.servlet + jakarta.servlet-api + + + jakarta.ws.rs + jakarta.ws.rs-api + org.apache.accumulo accumulo-core @@ -100,6 +116,48 @@ org.checkerframework checker-qual + + org.eclipse.jetty + jetty-security + + + org.eclipse.jetty + jetty-server + + + org.eclipse.jetty + jetty-servlet + + + org.eclipse.jetty + jetty-util + + + org.glassfish.hk2 + hk2-api + + + + org.glassfish.jersey.containers + jersey-container-servlet + + + org.glassfish.jersey.containers + jersey-container-servlet-core + + + org.glassfish.jersey.core + jersey-common + + + org.glassfish.jersey.core + jersey-server + + + org.glassfish.jersey.media + jersey-media-json-jackson + org.slf4j slf4j-api diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 20e72ca3af0..3854b9c74e2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -28,6 +28,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; +import java.net.InetAddress; import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; @@ -58,6 +59,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import jakarta.inject.Singleton; + import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.admin.CompactionConfig; @@ -122,6 +125,7 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; +import org.apache.accumulo.manager.http.EmbeddedRpcWebServer; import org.apache.accumulo.manager.metrics.BalancerMetrics; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; @@ -162,6 +166,13 @@ import org.apache.zookeeper.KeeperException.NoAuthException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.eclipse.jetty.servlet.ServletHolder; +import org.glassfish.hk2.api.Factory; +import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.jackson.JacksonFeature; +import org.glassfish.jersey.logging.LoggingFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,6 +180,7 @@ import com.google.common.collect.Comparators; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; +import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; @@ -217,6 +229,7 @@ public class Manager extends AbstractServer ServiceLock managerLock = null; private TServer clientService = null; + private EmbeddedRpcWebServer restClientService; protected volatile TabletBalancer tabletBalancer; private final BalancerEnvironment balancerEnvironment; private final BalancerMetrics balancerMetrics = new BalancerMetrics(); @@ -332,6 +345,12 @@ synchronized void setManagerState(final ManagerState newState) { Manager.this.nextEvent.event("stopped event loop"); }, 100L, 1000L, MILLISECONDS); ThreadPools.watchNonCriticalScheduledTask(future); + final var restFuture = getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> { + // This frees the main thread and will cause the manager to exit + restClientService.stop(); + Manager.this.nextEvent.event("stopped event loop"); + }, 100L, 1000L, MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(restFuture); break; case HAVE_LOCK: if (isUpgrading()) { @@ -1134,6 +1153,30 @@ public void run() { clientService = sa.server; log.info("Started Manager client service at {}", sa.address); + int restPort = 8999; + try { + restClientService = new EmbeddedRpcWebServer(this, restPort); + restClientService.addServlet(getRestServlet(), "/rest/*"); + restClientService.start(); + + if (!restClientService.isRunning()) { + throw new RuntimeException("Unable to start rpc http server on port: " + restPort); + } + } catch (Exception e) { + throw new IllegalStateException("Unable to start embedded web server " + getHostname(), e); + } + + String advertiseHost = getHostname(); + if (advertiseHost.equals("0.0.0.0")) { + try { + advertiseHost = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + log.error("Unable to get hostname", e); + } + } + HostAndPort restHostAndPort = HostAndPort.fromParts(advertiseHost, restPort); + log.info("Started Manager rpc rest client service at {}", restHostAndPort); + // block until we can obtain the ZK lock for the manager ServiceLockData sld; try { @@ -1313,14 +1356,14 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception updating manager lock", e); } - while (!clientService.isServing()) { + while (!clientService.isServing() || !restClientService.isRunning()) { sleepUninterruptibly(100, MILLISECONDS); } // The manager is fully initialized. Clients are allowed to connect now. managerInitialized.set(true); - while (clientService.isServing()) { + while (clientService.isServing() && restClientService.isRunning()) { sleepUninterruptibly(500, MILLISECONDS); } log.info("Shutting down fate."); @@ -1866,4 +1909,34 @@ private Map> getFateRefs() { Preconditions.checkState(fateRefs != null, "Unexpected null fate references map"); return fateRefs; } + + public static class ManagerFactory extends AbstractBinder implements Factory { + + private final Manager manager; + + public ManagerFactory(Manager manager) { + this.manager = manager; + } + + @Override + public Manager provide() { + return manager; + } + + @Override + public void dispose(Manager instance) {} + + @Override + protected void configure() { + bindFactory(this).to(Manager.class).in(Singleton.class); + } + } + + private ServletHolder getRestServlet() { + final ResourceConfig rc = new ResourceConfig().packages("org.apache.accumulo.manager.http.rest") + .register(new ManagerFactory(this)) + .register(new LoggingFeature(java.util.logging.Logger.getLogger(this.getClass().getName()))) + .register(JacksonFeature.class); + return new ServletHolder(new ServletContainer(rc)); + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 3de05a75785..56b8a8d03ba 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.compaction.coordinator; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; @@ -50,6 +51,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -137,6 +139,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,6 +198,7 @@ public class CompactionCoordinator private final int jobQueueInitialSize; private volatile long coordinatorStartTime; + private final long maxJobRequestWaitTime; public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference>> fateInstances, Manager manager) { @@ -212,6 +216,9 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, this.fateInstances = fateInstances; + this.maxJobRequestWaitTime = ctx.getConfiguration() + .getTimeInMillis(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME); + completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true) .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); @@ -360,31 +367,43 @@ public long getNumRunningCompactions() { public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, String groupName, String compactorAddress, String externalCompactionId) throws ThriftSecurityException { + // TODO, this is the only RPC currently converted to use Jetty + // when all are converted we can just remove the thrift RPC service + // for CompactionCoordinator + throw new UnsupportedOperationException("getCompactionJob not supported"); + } + + public void getCompactionJob(TInfo tinfo, TCredentials credentials, String groupName, + String compactorAddress, String externalCompactionId, + AsyncMethodCallback resultHandler) throws TException { // do not expect users to call this directly, expect compactors to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } + + // Get the next job as a future as we need to wait until something is available CompactorGroupId groupId = CompactorGroupId.of(groupName); LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); - TExternalCompactionJob result = null; - - CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId); - - while (metaJob != null) { - + // TODO: Use a thread pool and use thenAcceptAsync and exceptionallyAsync() + // Async send back to the compactor when a new job is ready + // Need the unused var for errorprone + var unused = jobQueues.getAsync(groupId).thenAccept(metaJob -> { + LOG.trace("Next metaJob is ready {}", metaJob.getJob()); Optional compactionConfig = getCompactionConfig(metaJob); - // this method may reread the metadata, do not use the metadata in metaJob for anything after + // this method may reread the metadata, do not use the metadata in metaJob for anything + // after // this method CompactionMetadata ecm = null; var kind = metaJob.getJob().getKind(); - // Only reserve user compactions when the config is present. When compactions are canceled the + // Only reserve user compactions when the config is present. When compactions are canceled + // the // config is deleted. var cid = ExternalCompactionId.from(externalCompactionId); if (kind == CompactionKind.SYSTEM @@ -392,34 +411,37 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials ecm = reserveCompaction(metaJob, compactorAddress, cid); } + final TExternalCompactionJob result; if (ecm != null) { result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); - // It is possible that by the time this added that the the compactor that made this request + // It is possible that by the time this added that the the compactor that made this + // request // is dead. In this cases the compaction is not actually running. RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), new RunningCompaction(result, compactorAddress, groupName)); TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, metaJob.getJob()); - break; } else { - LOG.debug( - "Unable to reserve compaction job for {}, pulling another off the queue for group {}", - metaJob.getTabletMetadata().getExtent(), groupName); - metaJob = jobQueues.poll(CompactorGroupId.of(groupName)); + LOG.debug("Unable to reserve compaction job for {} {}, returning empty job to compactor {}", + groupName, metaJob.getTabletMetadata().getExtent(), compactorAddress); + result = new TExternalCompactionJob(); } - } - - if (metaJob == null) { - LOG.debug("No jobs found in group {} ", groupName); - } - - if (result == null) { - LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName, - compactorAddress); - result = new TExternalCompactionJob(); - } - return new TNextCompactionJob(result, compactorCounts.get(groupName)); + var ecj = new TNextCompactionJob(result, compactorCounts.get(groupName)); + LOG.debug("Received next compaction job {}", ecj); + resultHandler.onComplete(ecj); + }).orTimeout(maxJobRequestWaitTime, MILLISECONDS).exceptionally(e -> { + if (e instanceof TimeoutException) { + LOG.trace("Compaction job request with ecid {} timed out.", externalCompactionId); + resultHandler.onComplete( + new TNextCompactionJob(new TExternalCompactionJob(), compactorCounts.get(groupName))); + } else { + LOG.warn("Received exception processing compaction job {}", e.getMessage()); + LOG.debug(e.getMessage(), e); + resultHandler.onError(new RuntimeException(e)); + } + return null; + }); } @VisibleForTesting diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/http/EmbeddedRpcWebServer.java b/server/manager/src/main/java/org/apache/accumulo/manager/http/EmbeddedRpcWebServer.java new file mode 100644 index 00000000000..ceda6694e05 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/http/EmbeddedRpcWebServer.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.http; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Path; +import java.util.List; + +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.SslConnectionParams; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.security.SystemCredentials.SystemToken; +import org.eclipse.jetty.security.AbstractLoginService; +import org.eclipse.jetty.security.ConfigurableSpnegoLoginService; +import org.eclipse.jetty.security.ConstraintMapping; +import org.eclipse.jetty.security.ConstraintSecurityHandler; +import org.eclipse.jetty.security.HashLoginService; +import org.eclipse.jetty.security.RolePrincipal; +import org.eclipse.jetty.security.UserPrincipal; +import org.eclipse.jetty.security.UserStore; +import org.eclipse.jetty.security.authentication.AuthorizationService; +import org.eclipse.jetty.security.authentication.BasicAuthenticator; +import org.eclipse.jetty.security.authentication.ConfigurableSpnegoAuthenticator; +import org.eclipse.jetty.server.AbstractConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.security.Constraint; +import org.eclipse.jetty.util.security.Credential; +import org.eclipse.jetty.util.security.Password; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +// TODO: I couldn't figure out why spotbugs is giving this error, it needs to be investigated more +@SuppressFBWarnings(value = "SE_BAD_FIELD", + justification = "This seems to be serialization related and possibly a false positive") +public class EmbeddedRpcWebServer { + private static final Logger LOG = LoggerFactory.getLogger(EmbeddedRpcWebServer.class); + + private final Server server; + private final ServerConnector connector; + private final ServletContextHandler handler; + private final boolean ssl; + + public EmbeddedRpcWebServer(Manager manager, int port) { + server = new Server(); + final ServerContext context = manager.getContext(); + ssl = context.getThriftServerType() == ThriftServerType.SSL; + + ConstraintSecurityHandler securityHandler = null; + if (context.getThriftServerType() == ThriftServerType.SASL) { + // TODO: This is set up so it works with the miniaccumulo cluster test KRB + // We would need to make this a bit more configurable for a real system + // but is fine for the proof of concept + String realm = "EXAMPLE.com"; + HashLoginService authorizationService = new HashLoginService(realm); + UserStore userStore = new UserStore(); + userStore.addUser( + context.getConfiguration().get(Property.GENERAL_KERBEROS_PRINCIPAL).split("@")[0], + new Password(""), List.of("system").toArray(new String[] {})); + authorizationService.setUserStore(userStore); + ConfigurableSpnegoLoginService loginService = new ConfigurableSpnegoLoginService(realm, + AuthorizationService.from(authorizationService, "")); + loginService.addBean(authorizationService); + loginService.setKeyTabPath( + Path.of(context.getConfiguration().getPath(Property.GENERAL_KERBEROS_KEYTAB))); + loginService.setServiceName("accumulo"); + try { + loginService.setHostName(InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + server.addBean(loginService); + + securityHandler = new ConstraintSecurityHandler(); + securityHandler.addConstraintMapping(getConstraintMapping("system")); + ConfigurableSpnegoAuthenticator authenticator = new ConfigurableSpnegoAuthenticator(); + securityHandler.setAuthenticator(authenticator); + securityHandler.setLoginService(loginService); + } else if (!ssl) { + securityHandler = new ConstraintSecurityHandler(); + securityHandler.addConstraintMapping(getConstraintMapping("system")); + securityHandler.setAuthenticator(new BasicAuthenticator()); + securityHandler.setLoginService(new AbstractLoginService() { + @Override + public String getName() { + return "accumulo"; + } + + @Override + protected List loadRoleInfo(UserPrincipal user) { + // TODO: check credentials if can map to system or something else + // We need to verify that the principal here is authorized for whatever + // level access (system or user) so eventually UserPrincipal or + // this method needs to make checks to verify a user is authorized using + // context.getSecurityOperation().canPerformSystemActions() or + // other security methods + return List.of(new RolePrincipal("system")); + } + + @Override + protected UserPrincipal loadUserInfo(String username) { + + return new UserPrincipal(username, new Credential() { + @Override + public boolean check(Object credentials) { + try { + // Authenticate the user with the passed in serialized password + SystemToken passwordToken = new SystemToken(((String) credentials).getBytes(UTF_8)); + var creds = new SystemCredentials(context.getInstanceID(), username, passwordToken); + return context.getSecurityOperation().authenticateUser(context.rpcCreds(), + creds.toThrift(context.getInstanceID())); + } catch (ThriftSecurityException e) { + LOG.debug(e.getMessage()); + return false; + } + } + }); + } + }); + } + + connector = new ServerConnector(server, getConnectionFactories(context, ssl)); + connector.setHost(manager.getHostname()); + connector.setPort(port); + + handler = + new ServletContextHandler(ServletContextHandler.SESSIONS | ServletContextHandler.SECURITY); + if (securityHandler != null) { + handler.setSecurityHandler(securityHandler); + } + handler.getSessionHandler().getSessionCookieConfig().setHttpOnly(true); + handler.setContextPath("/"); + } + + private static AbstractConnectionFactory[] getConnectionFactories(ServerContext context, + boolean ssl) { + + if (ssl) { + HttpConfiguration httpConfig = new HttpConfiguration(); + SecureRequestCustomizer customizer = new SecureRequestCustomizer(); + customizer.setSniHostCheck(false); + httpConfig.addCustomizer(customizer); + final HttpConnectionFactory httpFactory = new HttpConnectionFactory(httpConfig); + + LOG.debug("Configuring Jetty to use TLS"); + final SslContextFactory.Server sslContextFactory = new SslContextFactory.Server(); + + SslConnectionParams sslParams = context.getServerSslParams(); + sslContextFactory.setKeyStorePath(sslParams.getKeyStorePath()); + sslContextFactory.setKeyStorePassword(sslParams.getKeyStorePass()); + sslContextFactory.setKeyStoreType(sslParams.getKeyStoreType()); + sslContextFactory.setTrustStorePath(sslParams.getTrustStorePath()); + sslContextFactory.setTrustStorePassword(sslParams.getTrustStorePass()); + sslContextFactory.setTrustStoreType(sslParams.getTrustStoreType()); + + final String includedCiphers = context.getConfiguration().get(Property.RPC_SSL_CIPHER_SUITES); + if (!Property.RPC_SSL_CIPHER_SUITES.getDefaultValue().equals(includedCiphers)) { + sslContextFactory.setIncludeCipherSuites(includedCiphers.split(",")); + } + + final String[] includeProtocols = sslParams.getServerProtocols(); + if (includeProtocols != null && includeProtocols.length > 0) { + sslContextFactory.setIncludeProtocols(includeProtocols); + } + + SslConnectionFactory sslFactory = + new SslConnectionFactory(sslContextFactory, httpFactory.getProtocol()); + return new AbstractConnectionFactory[] {sslFactory, httpFactory}; + } else { + LOG.debug("Not configuring Jetty to use TLS"); + return new AbstractConnectionFactory[] {new HttpConnectionFactory()}; + } + } + + public void addServlet(ServletHolder restServlet, String where) { + handler.addServlet(restServlet, where); + } + + public int getPort() { + return connector.getLocalPort(); + } + + public boolean isSsl() { + return ssl; + } + + public void start() { + try { + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } catch (Exception e) { + stop(); + throw new RuntimeException(e); + } + } + + public void stop() { + try { + server.stop(); + server.join(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public boolean isRunning() { + return server.isRunning(); + } + + private ConstraintMapping getConstraintMapping(String... roles) { + Constraint constraint = new Constraint(); + constraint.setAuthenticate(true); + constraint.setRoles(roles); + ConstraintMapping mapping = new ConstraintMapping(); + mapping.setPathSpec("/*"); + mapping.setConstraint(constraint); + return mapping; + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java new file mode 100644 index 00000000000..1cc80832021 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.http.rest; + +import java.util.Objects; + +import jakarta.inject.Inject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.Suspended; +import jakarta.ws.rs.core.MediaType; + +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.server.rest.request.GetCompactionJobRequest; +import org.apache.thrift.TBase; +import org.apache.thrift.async.AsyncMethodCallback; + +@Path("/cc") +@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) +public class CompactionCoordinatorResource { + + @Inject + private Manager manager; + + @POST + @Path("get-compaction-job") + @Consumes(MediaType.APPLICATION_JSON) + public void getCompactionJob(@Suspended AsyncResponse response, GetCompactionJobRequest request) + throws Exception { + manager.getCompactionCoordinator().getCompactionJob(request.getTinfo(), + request.getCredentials(), request.getGroupName(), request.getCompactorAddress(), + request.getExternalCompactionId(), wrap(response)); + } + + // TODO: We can eventually create our own Async callback interface if we wanted, I just + // reused the Thrift version for now as we are already using Thrift objects anyways. + // AsyncMethodCallback is what async thrift uses + private static > AsyncMethodCallback wrap(AsyncResponse response) { + return new JerseyAsyncWrapper<>(response); + } + + private static class JerseyAsyncWrapper> implements AsyncMethodCallback { + private final AsyncResponse asyncResponse; + + private JerseyAsyncWrapper(AsyncResponse asyncResponse) { + this.asyncResponse = Objects.requireNonNull(asyncResponse); + } + + @Override + public void onComplete(T response) { + asyncResponse.resume(response); + } + + @Override + public void onError(Exception exception) { + asyncResponse.resume(exception); + } + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/ThriftJacksonProvider.java b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/ThriftJacksonProvider.java new file mode 100644 index 00000000000..9fce862fbbe --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/ThriftJacksonProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.http.rest; + +import jakarta.ws.rs.ext.ContextResolver; +import jakarta.ws.rs.ext.Provider; + +import org.apache.accumulo.server.rest.ThriftMixIn; +import org.apache.thrift.TBase; + +import com.fasterxml.jackson.databind.ObjectMapper; + +@Provider +public class ThriftJacksonProvider implements ContextResolver { + + final ObjectMapper defaultObjectMapper; + + public ThriftJacksonProvider() { + defaultObjectMapper = createDefaultMapper(); + } + + @Override + public ObjectMapper getContext(Class type) { + return defaultObjectMapper; + } + + // Register the custom thrift serializer/deserlizer + // for jackson + public static ObjectMapper createDefaultMapper() { + final ObjectMapper mapper = new ObjectMapper(); + mapper.addMixIn(TBase.class, ThriftMixIn.class); + return mapper; + + } +} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 9a4237e619d..5040d134287 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -43,6 +43,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -54,6 +55,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; @@ -97,6 +99,7 @@ import org.apache.accumulo.server.security.SecurityOperation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.thrift.async.AsyncMethodCallback; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; @@ -361,10 +364,22 @@ public void testGetCompactionJob() throws Exception { // Get the next job ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); - TNextCompactionJob nextJob = coordinator.getCompactionJob(new TInfo(), creds, - GROUP_ID.toString(), "localhost:10241", eci.toString()); - assertEquals(3, nextJob.getCompactorCount()); - TExternalCompactionJob createdJob = nextJob.getJob(); + CompletableFuture nextJob = new CompletableFuture<>(); + + coordinator.getCompactionJob(new TInfo(), creds, GROUP_ID.toString(), "localhost:10241", + eci.toString(), new AsyncMethodCallback<>() { + @Override + public void onComplete(TNextCompactionJob response) { + nextJob.complete(response); + } + + @Override + public void onError(Exception exception) { + nextJob.completeExceptionally(exception); + } + }); + assertEquals(3, nextJob.get().getCompactorCount()); + TExternalCompactionJob createdJob = nextJob.get().getJob(); assertEquals(eci.toString(), createdJob.getExternalCompactionId()); assertEquals(ke, KeyExtent.fromThrift(createdJob.getExtent())); @@ -384,7 +399,9 @@ public void testGetCompactionJobNoJobs() throws Exception { ServerContext context = EasyMock.createNiceMock(ServerContext.class); expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); + cc.set(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME.getKey(), "1s"); + expect(context.getConfiguration()).andReturn(cc).anyTimes(); TCredentials creds = EasyMock.createNiceMock(TCredentials.class); @@ -398,10 +415,22 @@ public void testGetCompactionJobNoJobs() throws Exception { EasyMock.replay(context, creds, security, manager); var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); - TNextCompactionJob nextJob = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, - GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString()); - assertEquals(3, nextJob.getCompactorCount()); - assertNull(nextJob.getJob().getExternalCompactionId()); + CompletableFuture nextJob = new CompletableFuture<>(); + + coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, GROUP_ID.toString(), + "localhost:10240", UUID.randomUUID().toString(), new AsyncMethodCallback<>() { + @Override + public void onComplete(TNextCompactionJob response) { + nextJob.complete(response); + } + + @Override + public void onError(Exception exception) { + nextJob.completeExceptionally(exception); + } + }); + assertEquals(3, nextJob.get().getCompactorCount()); + assertNull(nextJob.get().getJob().getExternalCompactionId()); EasyMock.verify(context, creds, security); } diff --git a/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java b/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java index dd187fa31bc..b134a967529 100644 --- a/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java +++ b/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java @@ -45,6 +45,7 @@ import org.apache.accumulo.server.security.handler.KerberosPermissionHandler; import org.apache.accumulo.test.functional.NativeMapIT; import org.apache.accumulo.test.util.CertUtils; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.slf4j.Logger; @@ -90,6 +91,10 @@ public MiniAccumuloClusterImpl create(String testClassName, String testMethodNam cfg.setNativeLibPaths(NativeMapIT.nativeMapLocation().getAbsolutePath()); cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.TRUE.toString()); + // Speed up testing by shortening the timeouts by setting a default to 10s + // TODO: Make this default configurable? + cfg.setProperty(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME, "10s"); + Configuration coreSite = new Configuration(false); // Setup SSL and credential providers if the properties request such @@ -148,6 +153,7 @@ protected void configureForSsl(MiniAccumuloConfigImpl cfg, File folder) { } File sslDir = new File(folder, "ssl"); + FileUtils.deleteQuietly(sslDir); assertTrue(sslDir.mkdirs() || sslDir.isDirectory()); File rootKeystoreFile = new File(sslDir, "root-" + cfg.getInstanceName() + ".jks"); File localKeystoreFile = new File(sslDir, "local-" + cfg.getInstanceName() + ".jks"); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java new file mode 100644 index 00000000000..36b844553e9 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; + +import java.security.PrivilegedExceptionAction; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExternalCompactionKerberosIT extends SharedMiniClusterBase { + + private static final Logger log = LoggerFactory.getLogger(ExternalCompactionKerberosIT.class); + + public static class ExternalCompactionKerberosConfig implements MiniClusterConfigurationCallback { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite, cfg.getClientProps()); + } + } + + @BeforeAll + public static void beforeTests() throws Exception { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, Boolean.TRUE.toString()); + startMiniClusterWithConfig(new ExternalCompactionKerberosConfig()); + } + + @AfterAll + public static void after() throws Exception { + stopMiniCluster(); + } + + // TODO: This test sets up Kerberos for Thrift but the Jetty service has not + // be set up yet to use Kerberos. The next step is to set up the Jetty service + // using SASL/JAAS for Kerberos support and then configure the Jetty http client + // with the correct auth credentials to see if it works. + @Test + public void testExternalCompaction() throws Exception { + var principal = getAdminUser().getPrincipal(); + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, + getAdminUser().getKeytab().getAbsolutePath()); + + String[] names = this.getUniqueNames(2); + ugi.doAs((PrivilegedExceptionAction) () -> { + try (AccumuloClient client = + getCluster().createAccumuloClient(principal, new KerberosToken())) { + + String table1 = names[0]; + createTable(client, table1, "cs1"); + + String table2 = names[1]; + createTable(client, table2, "cs2"); + + writeData(client, table1); + writeData(client, table2); + + compact(client, table1, 2, GROUP1, true); + verify(client, table1, 2); + + SortedSet splits = new TreeSet<>(); + splits.add(new Text(row(MAX_DATA / 2))); + client.tableOperations().addSplits(table2, splits); + + compact(client, table2, 3, GROUP2, true); + verify(client, table2, 3); + + } + return null; + }); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionSslIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionSslIT.java new file mode 100644 index 00000000000..809e6c2039a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionSslIT.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; + +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExternalCompactionSslIT extends SharedMiniClusterBase { + + private static final Logger log = LoggerFactory.getLogger(ExternalCompactionSslIT.class); + + public static class ExternalCompactionSslConfig implements MiniClusterConfigurationCallback { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + Map site = cfg.getSiteConfig(); + site.put(Property.INSTANCE_RPC_SSL_CLIENT_AUTH.getKey(), "true"); + cfg.setSiteConfig(site); + ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite, cfg.getClientProps()); + } + } + + @BeforeAll + public static void beforeTests() throws Exception { + System.setProperty(MiniClusterHarness.USE_SSL_FOR_IT_OPTION, Boolean.TRUE.toString()); + startMiniClusterWithConfig(new ExternalCompactionSslConfig()); + } + + @AfterAll + public static void after() throws Exception { + stopMiniCluster(); + } + + @Test + public void testExternalCompaction() throws Exception { + String[] names = this.getUniqueNames(2); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + String table1 = names[0]; + createTable(client, table1, "cs1"); + + String table2 = names[1]; + createTable(client, table2, "cs2"); + + writeData(client, table1); + writeData(client, table2); + + compact(client, table1, 2, GROUP1, true); + verify(client, table1, 2); + + SortedSet splits = new TreeSet<>(); + splits.add(new Text(row(MAX_DATA / 2))); + client.tableOperations().addSplits(table2, splits); + + compact(client, table2, 3, GROUP2, true); + verify(client, table2, 3); + + } + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index c3e24401a2b..2235a4bae54 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -77,7 +78,6 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; -import com.beust.jcommander.internal.Maps; import com.google.common.net.HostAndPort; public class ExternalCompactionTestUtils { @@ -187,10 +187,14 @@ public static void verify(AccumuloClient client, String table1, int modulus, int } public static void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + configureMiniCluster(cfg, coreSite, new HashMap<>()); + } + + public static void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite, + Map clProps) { // ecomp writes from the TabletServer are not being written to the metadata // table, they are being queued up instead. - Map clProps = Maps.newHashMap(); clProps.put(ClientProperty.BATCH_WRITER_LATENCY_MAX.getKey(), "2s"); cfg.setClientProps(clProps); From 3546d6477d9002e8adc22cfd53e8b8258f9eb0b7 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Mon, 28 Oct 2024 14:28:26 -0400 Subject: [PATCH 2/2] code cleanup --- .../manager/http/rest/CompactionCoordinatorResource.java | 2 +- .../test/compaction/ExternalCompactionKerberosIT.java | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java index 1cc80832021..ef24e2b82d2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java @@ -35,7 +35,7 @@ import org.apache.thrift.async.AsyncMethodCallback; @Path("/cc") -@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) +@Produces({MediaType.APPLICATION_JSON}) public class CompactionCoordinatorResource { @Inject diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java index 36b844553e9..4aa101ef389 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java @@ -68,10 +68,6 @@ public static void after() throws Exception { stopMiniCluster(); } - // TODO: This test sets up Kerberos for Thrift but the Jetty service has not - // be set up yet to use Kerberos. The next step is to set up the Jetty service - // using SASL/JAAS for Kerberos support and then configure the Jetty http client - // with the correct auth credentials to see if it works. @Test public void testExternalCompaction() throws Exception { var principal = getAdminUser().getPrincipal();