diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 6e2fed76afa..b26b1405f0c 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1155,7 +1155,11 @@ public enum Property { @Experimental COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL( "compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION, - "The interval at which to check for dead compactors.", "2.1.0"); + "The interval at which to check for dead compactors.", "2.1.0"), + COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME( + "compaction.coordinator.wait.time.job.request.max", "2m", PropertyType.TIMEDURATION, + "The maximum amount of time the coordinator will wait for a requested job from the job queue.", + "4.0.0"); private final String key; private final String defaultValue; diff --git a/pom.xml b/pom.xml index dedaefcc6d0..1dde7404e49 100644 --- a/pom.xml +++ b/pom.xml @@ -925,6 +925,8 @@ org.glassfish.jersey.ext:jersey-bean-validation:jar:* org.glassfish.jersey.inject:jersey-hk2:jar:* org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:jar:* + + org.glassfish.jersey.containers:jersey-container-servlet:jar:* org.powermock:powermock-api-easymock:jar:* com.github.spotbugs:spotbugs-annotations:jar:* diff --git a/server/base/pom.xml b/server/base/pom.xml index c20b3e6edce..27bffec6853 100644 --- a/server/base/pom.xml +++ b/server/base/pom.xml @@ -35,6 +35,14 @@ com.beust jcommander + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + com.github.ben-manes.caffeine caffeine diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java new file mode 100644 index 00000000000..a1ecea607c2 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftDeserializer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rest; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.server.rest.ThriftSerializer.ENCODED; +import static org.apache.accumulo.server.rest.ThriftSerializer.TYPE; + +import java.io.IOException; +import java.lang.reflect.Constructor; + +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; + +/** + * Jackson deserializer for thrift objects that delegates the deserialization of thrift to + * {@link TDeserializer}. It handles previously encoded serialized objects from + * {@link ThriftSerializer} + */ +public class ThriftDeserializer> extends JsonDeserializer { + @Override + public T deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode tree = p.readValueAsTree(); + + try { + var thriftClassName = tree.get(TYPE).asText(); + var encoded = tree.get(ENCODED).asText(); + + Constructor constructor = getThriftClass(thriftClassName).getDeclaredConstructor(); + T obj = constructor.newInstance(); + deserialize(obj, encoded); + + return obj; + } catch (ReflectiveOperationException e) { + throw new IOException(e); + } + } + + @SuppressWarnings("unchecked") + private Class getThriftClass(String className) throws ClassNotFoundException { + var clazz = Class.forName(className, false, ThriftDeserializer.class.getClassLoader()); + // Note: This check is important to prevent potential security issues + // We don't want to allow arbitrary classes to be loaded + if (!TBase.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException("Class " + clazz + " is not assignable to TBase"); + } + return (Class) clazz; + } + + // TODO: It doesn't seem like TDeserializer is thread safe, is there a way + // to prevent creating a new deserializer for every object? + private static > void deserialize(T obj, String json) throws IOException { + try { + final TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory()); + deserializer.deserialize(obj, json, UTF_8.name()); + } catch (TException e) { + throw new IOException(e); + } + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java new file mode 100644 index 00000000000..ce2233cc6e7 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftMixIn.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rest; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +@JsonSerialize(using = ThriftSerializer.class) +@JsonDeserialize(using = ThriftDeserializer.class) +public abstract class ThriftMixIn { + +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java new file mode 100644 index 00000000000..3534091775d --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rest/ThriftSerializer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rest; + +import java.io.IOException; + +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +/** + * Jackson serializer for thrift objects that delegates the serialization of thrift to + * {@link TSerializer} and also includes the serialized type for the deserializer to use + */ +public class ThriftSerializer> extends JsonSerializer { + + static final String TYPE = "type"; + static final String ENCODED = "encoded"; + + @Override + public void serialize(T value, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + gen.writeStartObject(); + gen.writeObjectField(TYPE, value.getClass()); + gen.writeStringField(ENCODED, serialize(value)); + gen.writeEndObject(); + } + + // TODO: It doesn't seem like TSerializer is thread safe, is there a way + // to prevent creating a new serializer for every object? + private static > String serialize(T obj) throws IOException { + try { + final TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(obj); + } catch (TException e) { + throw new IOException(e); + } + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java b/server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java new file mode 100644 index 00000000000..a54e51d91c0 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rest/request/GetCompactionJobRequest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rest.request; + +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; + +public class GetCompactionJobRequest { + + private TInfo tinfo; + private TCredentials credentials; + private String groupName; + private String compactorAddress; + private String externalCompactionId; + + public GetCompactionJobRequest() {} + + public GetCompactionJobRequest(TInfo tinfo, TCredentials credentials, String groupName, + String compactorAddress, String externalCompactionId) { + this.tinfo = tinfo; + this.credentials = credentials; + this.groupName = groupName; + this.compactorAddress = compactorAddress; + this.externalCompactionId = externalCompactionId; + } + + public TInfo getTinfo() { + return tinfo; + } + + public void setTinfo(TInfo tinfo) { + this.tinfo = tinfo; + } + + public TCredentials getCredentials() { + return credentials; + } + + public void setCredentials(TCredentials credentials) { + this.credentials = credentials; + } + + public String getGroupName() { + return groupName; + } + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + public String getCompactorAddress() { + return compactorAddress; + } + + public void setCompactorAddress(String compactorAddress) { + this.compactorAddress = compactorAddress; + } + + public String getExternalCompactionId() { + return externalCompactionId; + } + + public void setExternalCompactionId(String externalCompactionId) { + this.externalCompactionId = externalCompactionId; + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java index de3725f2525..9cfb9b89870 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java @@ -103,7 +103,7 @@ public static final class SystemToken extends PasswordToken { */ public SystemToken() {} - private SystemToken(byte[] systemPassword) { + public SystemToken(byte[] systemPassword) { super(systemPassword); } diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml index 7779271cefb..2e7a9ceb9f1 100644 --- a/server/compactor/pom.xml +++ b/server/compactor/pom.xml @@ -30,6 +30,10 @@ accumulo-compactor Apache Accumulo Compactor + + com.fasterxml.jackson.core + jackson-databind + com.google.auto.service auto-service @@ -71,6 +75,22 @@ org.apache.zookeeper zookeeper + + org.eclipse.jetty + jetty-client + + + org.eclipse.jetty + jetty-http + + + org.eclipse.jetty + jetty-io + + + org.eclipse.jetty + jetty-util + org.slf4j slf4j-api diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 463937e9ba3..8fb26ac0688 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -19,6 +19,7 @@ package org.apache.accumulo.compactor; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_ENTRIES_READ; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_ENTRIES_WRITTEN; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_STUCK; @@ -26,6 +27,10 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; @@ -46,6 +51,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; @@ -57,6 +63,7 @@ import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -66,6 +73,7 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -89,6 +97,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; @@ -116,18 +125,32 @@ import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.rest.ThriftMixIn; +import org.apache.accumulo.server.rest.request.GetCompactionJobRequest; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; +import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.tserver.log.LogSorter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; +import org.eclipse.jetty.client.util.BasicAuthentication; +import org.eclipse.jetty.client.util.SPNEGOAuthentication; +import org.eclipse.jetty.client.util.StringRequestContent; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.util.ssl.SslContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; @@ -470,8 +493,10 @@ protected TNextCompactionJob getNextJob(Supplier uuid) throws RetriesExcee ExternalCompactionId eci = ExternalCompactionId.generate(uuid.get()); LOG.trace("Attempting to get next job, eci = {}", eci); currentCompactionId.set(eci); - return coordinatorClient.getCompactionJob(TraceUtil.traceInfo(), - getContext().rpcCreds(), this.getResourceGroup(), + + JsonCoordinatorService jsonClient = new JsonCoordinatorService(getContext()); + return jsonClient.getCompactionJob(TraceUtil.traceInfo(), getContext().rpcCreds(), + this.getResourceGroup(), ExternalCompactionUtil.getHostPortString(compactorAddress.getAddress()), eci.toString()); } catch (Exception e) { @@ -1006,4 +1031,171 @@ public String getRunningCompactionId(TInfo tinfo, TCredentials credentials) } } + protected static class JsonCoordinatorService implements CompactionCoordinatorService.Iface { + private final ServerContext context; + private final ObjectMapper mapper; + private final URI ccBaseUri; + private final Duration timeout; + + private static final String GET_COMPACTION_JOB_ENDPOINT = "get-compaction-job"; + + protected JsonCoordinatorService(ServerContext context) throws TTransportException { + this.context = context; + var coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); + if (coordinatorHost.isEmpty()) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + try { + String scheme = context.getThriftServerType() == ThriftServerType.SSL ? "https" : "http"; + this.ccBaseUri = + new URL(scheme + "://" + coordinatorHost.orElseThrow().getHost() + ":8999/rest/cc/") + .toURI(); + } catch (MalformedURLException | URISyntaxException e) { + throw new RuntimeException(e); + } + mapper = new ObjectMapper(); + // Register the custom serializer/deserializer to handle thrift classes + mapper.addMixIn(TBase.class, ThriftMixIn.class); + + // TODO: We should add a property for this, for now just make the timeout 10 seconds longer + // than the coordinator job request timeout so the compactor will get back the empty job + // response under normal cases before timing out the http request + this.timeout = Duration + .ofMillis(context.getConfiguration() + .getTimeInMillis(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME)) + .plus(Duration.ofSeconds(10)); + } + + @Override + public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, + String groupName, String compactor, String externalCompactionId) throws TException { + + var cjr = new GetCompactionJobRequest(tinfo, credentials, groupName, compactor, + externalCompactionId); + + var requestUri = ccBaseUri.resolve("./" + GET_COMPACTION_JOB_ENDPOINT); + LOG.info("requesturi {}", requestUri); + + // TODO: reuse client between requests + // For this prototype use the Jetty Http client as it was quick to set up + // For a real server we'd want to make sure we are re-using the client and set + // things like thread pooling so we could look at configuring the Jetty client + // or using something else like the Apache HTTP 5 client could be a better choice + HttpClient httpClient = null; + + try { + httpClient = newHttpClient(requestUri); + httpClient.start(); + + ContentResponse response = + httpClient.POST(requestUri).timeout(timeout.toMillis(), TimeUnit.MILLISECONDS) + .headers(headers -> headers.put(HttpHeader.CONTENT_TYPE, "application/json")) + .body(new StringRequestContent(mapper.writeValueAsString(cjr))).send(); + + // If not 200 we have an error, such as 401 + Preconditions.checkState(response.getStatus() == 200, "Response status code is %s", + response.getStatus()); + + return mapper.readValue(response.getContentAsString(), TNextCompactionJob.class); + } catch (Exception e) { + // TODO: This is wrapping errors for the http client in TException so that we can retry + // The current Retry logic only handles TException as it was meant for thrift so this + // was a quick way to re-use that logic for HTTP client issues (timeouts, etc) + // We would obviously need to clean this up if we plan to stick with REST for real + throw new TException(e); + } finally { + try { + // TODO: re use the client, don't re-create each time and start/stop + httpClient.stop(); + } catch (Exception e) { + LOG.debug(e.getMessage(), e); + } + } + } + + private HttpClient newHttpClient(URI requestUri) { + final HttpClient httpClient; + + if (context.getThriftServerType() == ThriftServerType.SSL) { + SslContextFactory.Client sslContextFactory = getSslClient(); + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSslContextFactory(sslContextFactory); + httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector)); + } else if (context.getThriftServerType() == ThriftServerType.SASL) { + httpClient = new HttpClient(); + var principal = context.getConfiguration().get(Property.GENERAL_KERBEROS_PRINCIPAL); + SPNEGOAuthentication authentication = new SPNEGOAuthentication(requestUri); + authentication.setUserName(principal); + authentication.setUserKeyTabPath(java.nio.file.Path + .of(context.getConfiguration().getPath(Property.GENERAL_KERBEROS_KEYTAB))); + authentication.setServiceName("accumulo"); + httpClient.getAuthenticationStore().addAuthentication(authentication); + } else { + httpClient = new HttpClient(); + var creds = context.getCredentials(); + BasicAuthentication authentication = + new BasicAuthentication(requestUri, "accumulo", creds.getPrincipal(), + new String(((PasswordToken) creds.getToken()).getPassword(), UTF_8)); + httpClient.getAuthenticationStore().addAuthentication(authentication); + } + return httpClient; + } + + private SslContextFactory.Client getSslClient() { + SslContextFactory.Client sslContextFactory = new SslContextFactory.Client(); + SslConnectionParams sslParams = context.getClientSslParams(); + sslContextFactory.setKeyStorePath(sslParams.getKeyStorePath()); + sslContextFactory.setKeyStorePassword(sslParams.getKeyStorePass()); + sslContextFactory.setKeyStoreType(sslParams.getKeyStoreType()); + sslContextFactory.setTrustStorePath(sslParams.getTrustStorePath()); + sslContextFactory.setTrustStorePassword(sslParams.getTrustStorePass()); + sslContextFactory.setTrustStoreType(sslParams.getTrustStoreType()); + + // TODO: Disable hostname checks for testing + sslContextFactory.setEndpointIdentificationAlgorithm(null); + sslContextFactory.setHostnameVerifier((hostname, session) -> true); + + return sslContextFactory; + } + + // The following methods were not implemented but could be moved to Json as well if we + // decide to go with Jetty and REST + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent extent, TCompactionStats stats) throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TCompactionStatusUpdate status, long timestamp) + throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent) throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials) + throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials) + throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { + throw new UnsupportedOperationException(); + } + } + } diff --git a/server/manager/pom.xml b/server/manager/pom.xml index cb52ea5681f..ee921ca4f5a 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -31,6 +31,10 @@ Apache Accumulo Manager Server The manager server for Apache Accumulo for load balancing and other system-wide operations. + + com.fasterxml.jackson.core + jackson-databind + com.github.ben-manes.caffeine caffeine @@ -64,6 +68,18 @@ io.opentelemetry opentelemetry-context + + jakarta.inject + jakarta.inject-api + + + jakarta.servlet + jakarta.servlet-api + + + jakarta.ws.rs + jakarta.ws.rs-api + org.apache.accumulo accumulo-core @@ -100,6 +116,48 @@ org.checkerframework checker-qual + + org.eclipse.jetty + jetty-security + + + org.eclipse.jetty + jetty-server + + + org.eclipse.jetty + jetty-servlet + + + org.eclipse.jetty + jetty-util + + + org.glassfish.hk2 + hk2-api + + + + org.glassfish.jersey.containers + jersey-container-servlet + + + org.glassfish.jersey.containers + jersey-container-servlet-core + + + org.glassfish.jersey.core + jersey-common + + + org.glassfish.jersey.core + jersey-server + + + org.glassfish.jersey.media + jersey-media-json-jackson + org.slf4j slf4j-api diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 0be61f19bc3..11157df7882 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -28,6 +28,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; +import java.net.InetAddress; import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; @@ -58,6 +59,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import jakarta.inject.Singleton; + import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.admin.CompactionConfig; @@ -122,6 +125,7 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; +import org.apache.accumulo.manager.http.EmbeddedRpcWebServer; import org.apache.accumulo.manager.metrics.BalancerMetrics; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; @@ -162,6 +166,13 @@ import org.apache.zookeeper.KeeperException.NoAuthException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.eclipse.jetty.servlet.ServletHolder; +import org.glassfish.hk2.api.Factory; +import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.jackson.JacksonFeature; +import org.glassfish.jersey.logging.LoggingFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,6 +180,7 @@ import com.google.common.collect.Comparators; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; +import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; @@ -217,6 +229,7 @@ public class Manager extends AbstractServer ServiceLock managerLock = null; private TServer clientService = null; + private EmbeddedRpcWebServer restClientService; protected volatile TabletBalancer tabletBalancer; private final BalancerEnvironment balancerEnvironment; private final BalancerMetrics balancerMetrics = new BalancerMetrics(); @@ -332,6 +345,12 @@ synchronized void setManagerState(final ManagerState newState) { Manager.this.nextEvent.event("stopped event loop"); }, 100L, 1000L, MILLISECONDS); ThreadPools.watchNonCriticalScheduledTask(future); + final var restFuture = getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> { + // This frees the main thread and will cause the manager to exit + restClientService.stop(); + Manager.this.nextEvent.event("stopped event loop"); + }, 100L, 1000L, MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(restFuture); break; case HAVE_LOCK: if (isUpgrading()) { @@ -1134,6 +1153,30 @@ public void run() { clientService = sa.server; log.info("Started Manager client service at {}", sa.address); + int restPort = 8999; + try { + restClientService = new EmbeddedRpcWebServer(this, restPort); + restClientService.addServlet(getRestServlet(), "/rest/*"); + restClientService.start(); + + if (!restClientService.isRunning()) { + throw new RuntimeException("Unable to start rpc http server on port: " + restPort); + } + } catch (Exception e) { + throw new IllegalStateException("Unable to start embedded web server " + getHostname(), e); + } + + String advertiseHost = getHostname(); + if (advertiseHost.equals("0.0.0.0")) { + try { + advertiseHost = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + log.error("Unable to get hostname", e); + } + } + HostAndPort restHostAndPort = HostAndPort.fromParts(advertiseHost, restPort); + log.info("Started Manager rpc rest client service at {}", restHostAndPort); + // block until we can obtain the ZK lock for the manager ServiceLockData sld; try { @@ -1313,14 +1356,14 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception updating manager lock", e); } - while (!clientService.isServing()) { + while (!clientService.isServing() || !restClientService.isRunning()) { sleepUninterruptibly(100, MILLISECONDS); } // The manager is fully initialized. Clients are allowed to connect now. managerInitialized.set(true); - while (clientService.isServing()) { + while (clientService.isServing() && restClientService.isRunning()) { sleepUninterruptibly(500, MILLISECONDS); } log.info("Shutting down fate."); @@ -1866,4 +1909,34 @@ private Map> getFateRefs() { Preconditions.checkState(fateRefs != null, "Unexpected null fate references map"); return fateRefs; } + + public static class ManagerFactory extends AbstractBinder implements Factory { + + private final Manager manager; + + public ManagerFactory(Manager manager) { + this.manager = manager; + } + + @Override + public Manager provide() { + return manager; + } + + @Override + public void dispose(Manager instance) {} + + @Override + protected void configure() { + bindFactory(this).to(Manager.class).in(Singleton.class); + } + } + + private ServletHolder getRestServlet() { + final ResourceConfig rc = new ResourceConfig().packages("org.apache.accumulo.manager.http.rest") + .register(new ManagerFactory(this)) + .register(new LoggingFeature(java.util.logging.Logger.getLogger(this.getClass().getName()))) + .register(JacksonFeature.class); + return new ServletHolder(new ServletContainer(rc)); + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 7a0b275994b..51fae7f3ade 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.compaction.coordinator; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; @@ -50,6 +51,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -137,6 +139,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,6 +198,7 @@ public class CompactionCoordinator private final int jobQueueInitialSize; private volatile long coordinatorStartTime; + private final long maxJobRequestWaitTime; public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference>> fateInstances, Manager manager) { @@ -212,6 +216,9 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, this.fateInstances = fateInstances; + this.maxJobRequestWaitTime = ctx.getConfiguration() + .getTimeInMillis(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME); + completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true) .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); @@ -360,31 +367,43 @@ public long getNumRunningCompactions() { public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, String groupName, String compactorAddress, String externalCompactionId) throws ThriftSecurityException { + // TODO, this is the only RPC currently converted to use Jetty + // when all are converted we can just remove the thrift RPC service + // for CompactionCoordinator + throw new UnsupportedOperationException("getCompactionJob not supported"); + } + + public void getCompactionJob(TInfo tinfo, TCredentials credentials, String groupName, + String compactorAddress, String externalCompactionId, + AsyncMethodCallback resultHandler) throws TException { // do not expect users to call this directly, expect compactors to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } + + // Get the next job as a future as we need to wait until something is available CompactorGroupId groupId = CompactorGroupId.of(groupName); LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); - TExternalCompactionJob result = null; - - CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId); - - while (metaJob != null) { - + // TODO: Use a thread pool and use thenAcceptAsync and exceptionallyAsync() + // Async send back to the compactor when a new job is ready + // Need the unused var for errorprone + var unused = jobQueues.getAsync(groupId).thenAccept(metaJob -> { + LOG.trace("Next metaJob is ready {}", metaJob.getJob()); Optional compactionConfig = getCompactionConfig(metaJob); - // this method may reread the metadata, do not use the metadata in metaJob for anything after + // this method may reread the metadata, do not use the metadata in metaJob for anything + // after // this method CompactionMetadata ecm = null; var kind = metaJob.getJob().getKind(); - // Only reserve user compactions when the config is present. When compactions are canceled the + // Only reserve user compactions when the config is present. When compactions are canceled + // the // config is deleted. var cid = ExternalCompactionId.from(externalCompactionId); if (kind == CompactionKind.SYSTEM @@ -392,34 +411,37 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials ecm = reserveCompaction(metaJob, compactorAddress, cid); } + final TExternalCompactionJob result; if (ecm != null) { result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); - // It is possible that by the time this added that the the compactor that made this request + // It is possible that by the time this added that the the compactor that made this + // request // is dead. In this cases the compaction is not actually running. RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), new RunningCompaction(result, compactorAddress, groupName)); TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, metaJob.getJob()); - break; } else { - LOG.debug( - "Unable to reserve compaction job for {}, pulling another off the queue for group {}", - metaJob.getTabletMetadata().getExtent(), groupName); - metaJob = jobQueues.poll(CompactorGroupId.of(groupName)); + LOG.debug("Unable to reserve compaction job for {} {}, returning empty job to compactor {}", + groupName, metaJob.getTabletMetadata().getExtent(), compactorAddress); + result = new TExternalCompactionJob(); } - } - - if (metaJob == null) { - LOG.trace("No jobs found in group {} ", groupName); - } - - if (result == null) { - LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName, - compactorAddress); - result = new TExternalCompactionJob(); - } - return new TNextCompactionJob(result, compactorCounts.get(groupName)); + var ecj = new TNextCompactionJob(result, compactorCounts.get(groupName)); + LOG.trace("Received next compaction job {}", ecj); + resultHandler.onComplete(ecj); + }).orTimeout(maxJobRequestWaitTime, MILLISECONDS).exceptionally(e -> { + if (e instanceof TimeoutException) { + LOG.trace("Compaction job request with ecid {} timed out.", externalCompactionId); + resultHandler.onComplete( + new TNextCompactionJob(new TExternalCompactionJob(), compactorCounts.get(groupName))); + } else { + LOG.warn("Received exception processing compaction job {}", e.getMessage()); + LOG.debug(e.getMessage(), e); + resultHandler.onError(new RuntimeException(e)); + } + return null; + }); } @VisibleForTesting diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/http/EmbeddedRpcWebServer.java b/server/manager/src/main/java/org/apache/accumulo/manager/http/EmbeddedRpcWebServer.java new file mode 100644 index 00000000000..ceda6694e05 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/http/EmbeddedRpcWebServer.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.http; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Path; +import java.util.List; + +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.SslConnectionParams; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.security.SystemCredentials.SystemToken; +import org.eclipse.jetty.security.AbstractLoginService; +import org.eclipse.jetty.security.ConfigurableSpnegoLoginService; +import org.eclipse.jetty.security.ConstraintMapping; +import org.eclipse.jetty.security.ConstraintSecurityHandler; +import org.eclipse.jetty.security.HashLoginService; +import org.eclipse.jetty.security.RolePrincipal; +import org.eclipse.jetty.security.UserPrincipal; +import org.eclipse.jetty.security.UserStore; +import org.eclipse.jetty.security.authentication.AuthorizationService; +import org.eclipse.jetty.security.authentication.BasicAuthenticator; +import org.eclipse.jetty.security.authentication.ConfigurableSpnegoAuthenticator; +import org.eclipse.jetty.server.AbstractConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.security.Constraint; +import org.eclipse.jetty.util.security.Credential; +import org.eclipse.jetty.util.security.Password; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +// TODO: I couldn't figure out why spotbugs is giving this error, it needs to be investigated more +@SuppressFBWarnings(value = "SE_BAD_FIELD", + justification = "This seems to be serialization related and possibly a false positive") +public class EmbeddedRpcWebServer { + private static final Logger LOG = LoggerFactory.getLogger(EmbeddedRpcWebServer.class); + + private final Server server; + private final ServerConnector connector; + private final ServletContextHandler handler; + private final boolean ssl; + + public EmbeddedRpcWebServer(Manager manager, int port) { + server = new Server(); + final ServerContext context = manager.getContext(); + ssl = context.getThriftServerType() == ThriftServerType.SSL; + + ConstraintSecurityHandler securityHandler = null; + if (context.getThriftServerType() == ThriftServerType.SASL) { + // TODO: This is set up so it works with the miniaccumulo cluster test KRB + // We would need to make this a bit more configurable for a real system + // but is fine for the proof of concept + String realm = "EXAMPLE.com"; + HashLoginService authorizationService = new HashLoginService(realm); + UserStore userStore = new UserStore(); + userStore.addUser( + context.getConfiguration().get(Property.GENERAL_KERBEROS_PRINCIPAL).split("@")[0], + new Password(""), List.of("system").toArray(new String[] {})); + authorizationService.setUserStore(userStore); + ConfigurableSpnegoLoginService loginService = new ConfigurableSpnegoLoginService(realm, + AuthorizationService.from(authorizationService, "")); + loginService.addBean(authorizationService); + loginService.setKeyTabPath( + Path.of(context.getConfiguration().getPath(Property.GENERAL_KERBEROS_KEYTAB))); + loginService.setServiceName("accumulo"); + try { + loginService.setHostName(InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + server.addBean(loginService); + + securityHandler = new ConstraintSecurityHandler(); + securityHandler.addConstraintMapping(getConstraintMapping("system")); + ConfigurableSpnegoAuthenticator authenticator = new ConfigurableSpnegoAuthenticator(); + securityHandler.setAuthenticator(authenticator); + securityHandler.setLoginService(loginService); + } else if (!ssl) { + securityHandler = new ConstraintSecurityHandler(); + securityHandler.addConstraintMapping(getConstraintMapping("system")); + securityHandler.setAuthenticator(new BasicAuthenticator()); + securityHandler.setLoginService(new AbstractLoginService() { + @Override + public String getName() { + return "accumulo"; + } + + @Override + protected List loadRoleInfo(UserPrincipal user) { + // TODO: check credentials if can map to system or something else + // We need to verify that the principal here is authorized for whatever + // level access (system or user) so eventually UserPrincipal or + // this method needs to make checks to verify a user is authorized using + // context.getSecurityOperation().canPerformSystemActions() or + // other security methods + return List.of(new RolePrincipal("system")); + } + + @Override + protected UserPrincipal loadUserInfo(String username) { + + return new UserPrincipal(username, new Credential() { + @Override + public boolean check(Object credentials) { + try { + // Authenticate the user with the passed in serialized password + SystemToken passwordToken = new SystemToken(((String) credentials).getBytes(UTF_8)); + var creds = new SystemCredentials(context.getInstanceID(), username, passwordToken); + return context.getSecurityOperation().authenticateUser(context.rpcCreds(), + creds.toThrift(context.getInstanceID())); + } catch (ThriftSecurityException e) { + LOG.debug(e.getMessage()); + return false; + } + } + }); + } + }); + } + + connector = new ServerConnector(server, getConnectionFactories(context, ssl)); + connector.setHost(manager.getHostname()); + connector.setPort(port); + + handler = + new ServletContextHandler(ServletContextHandler.SESSIONS | ServletContextHandler.SECURITY); + if (securityHandler != null) { + handler.setSecurityHandler(securityHandler); + } + handler.getSessionHandler().getSessionCookieConfig().setHttpOnly(true); + handler.setContextPath("/"); + } + + private static AbstractConnectionFactory[] getConnectionFactories(ServerContext context, + boolean ssl) { + + if (ssl) { + HttpConfiguration httpConfig = new HttpConfiguration(); + SecureRequestCustomizer customizer = new SecureRequestCustomizer(); + customizer.setSniHostCheck(false); + httpConfig.addCustomizer(customizer); + final HttpConnectionFactory httpFactory = new HttpConnectionFactory(httpConfig); + + LOG.debug("Configuring Jetty to use TLS"); + final SslContextFactory.Server sslContextFactory = new SslContextFactory.Server(); + + SslConnectionParams sslParams = context.getServerSslParams(); + sslContextFactory.setKeyStorePath(sslParams.getKeyStorePath()); + sslContextFactory.setKeyStorePassword(sslParams.getKeyStorePass()); + sslContextFactory.setKeyStoreType(sslParams.getKeyStoreType()); + sslContextFactory.setTrustStorePath(sslParams.getTrustStorePath()); + sslContextFactory.setTrustStorePassword(sslParams.getTrustStorePass()); + sslContextFactory.setTrustStoreType(sslParams.getTrustStoreType()); + + final String includedCiphers = context.getConfiguration().get(Property.RPC_SSL_CIPHER_SUITES); + if (!Property.RPC_SSL_CIPHER_SUITES.getDefaultValue().equals(includedCiphers)) { + sslContextFactory.setIncludeCipherSuites(includedCiphers.split(",")); + } + + final String[] includeProtocols = sslParams.getServerProtocols(); + if (includeProtocols != null && includeProtocols.length > 0) { + sslContextFactory.setIncludeProtocols(includeProtocols); + } + + SslConnectionFactory sslFactory = + new SslConnectionFactory(sslContextFactory, httpFactory.getProtocol()); + return new AbstractConnectionFactory[] {sslFactory, httpFactory}; + } else { + LOG.debug("Not configuring Jetty to use TLS"); + return new AbstractConnectionFactory[] {new HttpConnectionFactory()}; + } + } + + public void addServlet(ServletHolder restServlet, String where) { + handler.addServlet(restServlet, where); + } + + public int getPort() { + return connector.getLocalPort(); + } + + public boolean isSsl() { + return ssl; + } + + public void start() { + try { + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } catch (Exception e) { + stop(); + throw new RuntimeException(e); + } + } + + public void stop() { + try { + server.stop(); + server.join(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public boolean isRunning() { + return server.isRunning(); + } + + private ConstraintMapping getConstraintMapping(String... roles) { + Constraint constraint = new Constraint(); + constraint.setAuthenticate(true); + constraint.setRoles(roles); + ConstraintMapping mapping = new ConstraintMapping(); + mapping.setPathSpec("/*"); + mapping.setConstraint(constraint); + return mapping; + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java new file mode 100644 index 00000000000..ef24e2b82d2 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/CompactionCoordinatorResource.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.http.rest; + +import java.util.Objects; + +import jakarta.inject.Inject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.Suspended; +import jakarta.ws.rs.core.MediaType; + +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.server.rest.request.GetCompactionJobRequest; +import org.apache.thrift.TBase; +import org.apache.thrift.async.AsyncMethodCallback; + +@Path("/cc") +@Produces({MediaType.APPLICATION_JSON}) +public class CompactionCoordinatorResource { + + @Inject + private Manager manager; + + @POST + @Path("get-compaction-job") + @Consumes(MediaType.APPLICATION_JSON) + public void getCompactionJob(@Suspended AsyncResponse response, GetCompactionJobRequest request) + throws Exception { + manager.getCompactionCoordinator().getCompactionJob(request.getTinfo(), + request.getCredentials(), request.getGroupName(), request.getCompactorAddress(), + request.getExternalCompactionId(), wrap(response)); + } + + // TODO: We can eventually create our own Async callback interface if we wanted, I just + // reused the Thrift version for now as we are already using Thrift objects anyways. + // AsyncMethodCallback is what async thrift uses + private static > AsyncMethodCallback wrap(AsyncResponse response) { + return new JerseyAsyncWrapper<>(response); + } + + private static class JerseyAsyncWrapper> implements AsyncMethodCallback { + private final AsyncResponse asyncResponse; + + private JerseyAsyncWrapper(AsyncResponse asyncResponse) { + this.asyncResponse = Objects.requireNonNull(asyncResponse); + } + + @Override + public void onComplete(T response) { + asyncResponse.resume(response); + } + + @Override + public void onError(Exception exception) { + asyncResponse.resume(exception); + } + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/ThriftJacksonProvider.java b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/ThriftJacksonProvider.java new file mode 100644 index 00000000000..9fce862fbbe --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/http/rest/ThriftJacksonProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.http.rest; + +import jakarta.ws.rs.ext.ContextResolver; +import jakarta.ws.rs.ext.Provider; + +import org.apache.accumulo.server.rest.ThriftMixIn; +import org.apache.thrift.TBase; + +import com.fasterxml.jackson.databind.ObjectMapper; + +@Provider +public class ThriftJacksonProvider implements ContextResolver { + + final ObjectMapper defaultObjectMapper; + + public ThriftJacksonProvider() { + defaultObjectMapper = createDefaultMapper(); + } + + @Override + public ObjectMapper getContext(Class type) { + return defaultObjectMapper; + } + + // Register the custom thrift serializer/deserlizer + // for jackson + public static ObjectMapper createDefaultMapper() { + final ObjectMapper mapper = new ObjectMapper(); + mapper.addMixIn(TBase.class, ThriftMixIn.class); + return mapper; + + } +} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 931a0b6e7aa..b75fe04844a 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -43,6 +43,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -54,6 +55,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; @@ -97,6 +99,7 @@ import org.apache.accumulo.server.security.SecurityOperation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.thrift.async.AsyncMethodCallback; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; @@ -359,10 +362,22 @@ public void testGetCompactionJob() throws Exception { // Get the next job ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); - TNextCompactionJob nextJob = coordinator.getCompactionJob(new TInfo(), creds, - GROUP_ID.toString(), "localhost:10241", eci.toString()); - assertEquals(3, nextJob.getCompactorCount()); - TExternalCompactionJob createdJob = nextJob.getJob(); + CompletableFuture nextJob = new CompletableFuture<>(); + + coordinator.getCompactionJob(new TInfo(), creds, GROUP_ID.toString(), "localhost:10241", + eci.toString(), new AsyncMethodCallback<>() { + @Override + public void onComplete(TNextCompactionJob response) { + nextJob.complete(response); + } + + @Override + public void onError(Exception exception) { + nextJob.completeExceptionally(exception); + } + }); + assertEquals(3, nextJob.get().getCompactorCount()); + TExternalCompactionJob createdJob = nextJob.get().getJob(); assertEquals(eci.toString(), createdJob.getExternalCompactionId()); assertEquals(ke, KeyExtent.fromThrift(createdJob.getExtent())); @@ -382,7 +397,9 @@ public void testGetCompactionJobNoJobs() throws Exception { ServerContext context = EasyMock.createNiceMock(ServerContext.class); expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); + cc.set(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME.getKey(), "1s"); + expect(context.getConfiguration()).andReturn(cc).anyTimes(); TCredentials creds = EasyMock.createNiceMock(TCredentials.class); @@ -396,10 +413,22 @@ public void testGetCompactionJobNoJobs() throws Exception { EasyMock.replay(context, creds, security, manager); var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); - TNextCompactionJob nextJob = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, - GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString()); - assertEquals(3, nextJob.getCompactorCount()); - assertNull(nextJob.getJob().getExternalCompactionId()); + CompletableFuture nextJob = new CompletableFuture<>(); + + coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, GROUP_ID.toString(), + "localhost:10240", UUID.randomUUID().toString(), new AsyncMethodCallback<>() { + @Override + public void onComplete(TNextCompactionJob response) { + nextJob.complete(response); + } + + @Override + public void onError(Exception exception) { + nextJob.completeExceptionally(exception); + } + }); + assertEquals(3, nextJob.get().getCompactorCount()); + assertNull(nextJob.get().getJob().getExternalCompactionId()); EasyMock.verify(context, creds, security); } diff --git a/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java b/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java index dd187fa31bc..b134a967529 100644 --- a/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java +++ b/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java @@ -45,6 +45,7 @@ import org.apache.accumulo.server.security.handler.KerberosPermissionHandler; import org.apache.accumulo.test.functional.NativeMapIT; import org.apache.accumulo.test.util.CertUtils; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.slf4j.Logger; @@ -90,6 +91,10 @@ public MiniAccumuloClusterImpl create(String testClassName, String testMethodNam cfg.setNativeLibPaths(NativeMapIT.nativeMapLocation().getAbsolutePath()); cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.TRUE.toString()); + // Speed up testing by shortening the timeouts by setting a default to 10s + // TODO: Make this default configurable? + cfg.setProperty(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME, "10s"); + Configuration coreSite = new Configuration(false); // Setup SSL and credential providers if the properties request such @@ -148,6 +153,7 @@ protected void configureForSsl(MiniAccumuloConfigImpl cfg, File folder) { } File sslDir = new File(folder, "ssl"); + FileUtils.deleteQuietly(sslDir); assertTrue(sslDir.mkdirs() || sslDir.isDirectory()); File rootKeystoreFile = new File(sslDir, "root-" + cfg.getInstanceName() + ".jks"); File localKeystoreFile = new File(sslDir, "local-" + cfg.getInstanceName() + ".jks"); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java new file mode 100644 index 00000000000..4aa101ef389 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionKerberosIT.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; + +import java.security.PrivilegedExceptionAction; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExternalCompactionKerberosIT extends SharedMiniClusterBase { + + private static final Logger log = LoggerFactory.getLogger(ExternalCompactionKerberosIT.class); + + public static class ExternalCompactionKerberosConfig implements MiniClusterConfigurationCallback { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite, cfg.getClientProps()); + } + } + + @BeforeAll + public static void beforeTests() throws Exception { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, Boolean.TRUE.toString()); + startMiniClusterWithConfig(new ExternalCompactionKerberosConfig()); + } + + @AfterAll + public static void after() throws Exception { + stopMiniCluster(); + } + + @Test + public void testExternalCompaction() throws Exception { + var principal = getAdminUser().getPrincipal(); + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, + getAdminUser().getKeytab().getAbsolutePath()); + + String[] names = this.getUniqueNames(2); + ugi.doAs((PrivilegedExceptionAction) () -> { + try (AccumuloClient client = + getCluster().createAccumuloClient(principal, new KerberosToken())) { + + String table1 = names[0]; + createTable(client, table1, "cs1"); + + String table2 = names[1]; + createTable(client, table2, "cs2"); + + writeData(client, table1); + writeData(client, table2); + + compact(client, table1, 2, GROUP1, true); + verify(client, table1, 2); + + SortedSet splits = new TreeSet<>(); + splits.add(new Text(row(MAX_DATA / 2))); + client.tableOperations().addSplits(table2, splits); + + compact(client, table2, 3, GROUP2, true); + verify(client, table2, 3); + + } + return null; + }); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionSslIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionSslIT.java new file mode 100644 index 00000000000..809e6c2039a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionSslIT.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; + +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExternalCompactionSslIT extends SharedMiniClusterBase { + + private static final Logger log = LoggerFactory.getLogger(ExternalCompactionSslIT.class); + + public static class ExternalCompactionSslConfig implements MiniClusterConfigurationCallback { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + Map site = cfg.getSiteConfig(); + site.put(Property.INSTANCE_RPC_SSL_CLIENT_AUTH.getKey(), "true"); + cfg.setSiteConfig(site); + ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite, cfg.getClientProps()); + } + } + + @BeforeAll + public static void beforeTests() throws Exception { + System.setProperty(MiniClusterHarness.USE_SSL_FOR_IT_OPTION, Boolean.TRUE.toString()); + startMiniClusterWithConfig(new ExternalCompactionSslConfig()); + } + + @AfterAll + public static void after() throws Exception { + stopMiniCluster(); + } + + @Test + public void testExternalCompaction() throws Exception { + String[] names = this.getUniqueNames(2); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + String table1 = names[0]; + createTable(client, table1, "cs1"); + + String table2 = names[1]; + createTable(client, table2, "cs2"); + + writeData(client, table1); + writeData(client, table2); + + compact(client, table1, 2, GROUP1, true); + verify(client, table1, 2); + + SortedSet splits = new TreeSet<>(); + splits.add(new Text(row(MAX_DATA / 2))); + client.tableOperations().addSplits(table2, splits); + + compact(client, table2, 3, GROUP2, true); + verify(client, table2, 3); + + } + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index c3e24401a2b..2235a4bae54 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -77,7 +78,6 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; -import com.beust.jcommander.internal.Maps; import com.google.common.net.HostAndPort; public class ExternalCompactionTestUtils { @@ -187,10 +187,14 @@ public static void verify(AccumuloClient client, String table1, int modulus, int } public static void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + configureMiniCluster(cfg, coreSite, new HashMap<>()); + } + + public static void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite, + Map clProps) { // ecomp writes from the TabletServer are not being written to the metadata // table, they are being queued up instead. - Map clProps = Maps.newHashMap(); clProps.put(ClientProperty.BATCH_WRITER_LATENCY_MAX.getKey(), "2s"); cfg.setClientProps(clProps);