Skip to content

Commit

Permalink
Merge pull request cdapio#11584 from cdapio/feature/CDAP-15715-https
Browse files Browse the repository at this point in the history
(CDAP-15715) Enable SSL for all internal communications
  • Loading branch information
chtyim authored Sep 6, 2019
2 parents 9bb76d0 + a280384 commit 9a61ff2
Show file tree
Hide file tree
Showing 97 changed files with 1,191 additions and 762 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.cdap.cdap.common.namespace.NamespaceQueryAdmin;
import io.cdap.cdap.common.namespace.NoLookupNamespacePathLocator;
import io.cdap.cdap.common.namespace.guice.NamespaceQueryAdminModule;
import io.cdap.cdap.common.security.KeyStores;
import io.cdap.cdap.data.runtime.ConstantTransactionSystemClient;
import io.cdap.cdap.data.runtime.DataFabricModules;
import io.cdap.cdap.data.runtime.DataSetServiceModules;
Expand Down Expand Up @@ -70,7 +71,6 @@
import io.cdap.cdap.security.impersonation.NoOpOwnerAdmin;
import io.cdap.cdap.security.impersonation.OwnerAdmin;
import io.cdap.cdap.security.impersonation.UGIProvider;
import io.cdap.cdap.security.tools.KeyStores;
import io.cdap.cdap.spi.metadata.MetadataStorage;
import io.cdap.cdap.spi.metadata.noop.NoopMetadataStorage;
import org.apache.hadoop.conf.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.discovery.ResolvingDiscoverable;
import io.cdap.cdap.common.discovery.URIScheme;
import io.cdap.cdap.common.http.CommonNettyHttpServiceBuilder;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.logging.ServiceLoggingContext;
import io.cdap.cdap.common.metrics.MetricsReporterHook;
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.gateway.handlers.preview.PreviewHttpHandler;
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.http.NettyHttpService;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,11 +54,12 @@ public class PreviewHttpServer extends AbstractIdleService {
private Cancellable cancelHttpService;

@Inject
PreviewHttpServer(CConfiguration cConf, DiscoveryService discoveryService, PreviewHttpHandler previewHttpHandler,
PreviewHttpServer(CConfiguration cConf, SConfiguration sConf,
DiscoveryService discoveryService, PreviewHttpHandler previewHttpHandler,
MetricsCollectionService metricsCollectionService,
PreviewManager previewManager) {
this.discoveryService = discoveryService;
this.httpService = new CommonNettyHttpServiceBuilder(cConf, Constants.Service.PREVIEW_HTTP)
NettyHttpService.Builder builder = new CommonNettyHttpServiceBuilder(cConf, Constants.Service.PREVIEW_HTTP)
.setHost(cConf.get(Constants.Preview.ADDRESS))
.setPort(cConf.getInt(Constants.Preview.PORT))
.setHttpHandlers(previewHttpHandler)
Expand All @@ -65,8 +68,13 @@ public class PreviewHttpServer extends AbstractIdleService {
.setBossThreadPoolSize(cConf.getInt(Constants.Preview.BOSS_THREADS))
.setWorkerThreadPoolSize(cConf.getInt(Constants.Preview.WORKER_THREADS))
.setHandlerHooks(Collections.singletonList(
new MetricsReporterHook(metricsCollectionService, Constants.Service.PREVIEW_HTTP)))
.build();
new MetricsReporterHook(metricsCollectionService, Constants.Service.PREVIEW_HTTP)));

if (cConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED)) {
new HttpsEnabler().configureKeyStore(cConf, sConf).enable(builder);
}

this.httpService = builder.build();
this.previewManager = previewManager;
}

Expand All @@ -83,8 +91,9 @@ protected void startUp() throws Exception {
}

httpService.start();

cancelHttpService = discoveryService.register(
ResolvingDiscoverable.of(new Discoverable(Constants.Service.PREVIEW_HTTP, httpService.getBindAddress())));
ResolvingDiscoverable.of(URIScheme.createDiscoverable(Constants.Service.PREVIEW_HTTP, httpService)));
LOG.info("Preview HTTP server started on {}", httpService.getBindAddress());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
import io.cdap.cdap.api.ServiceDiscoverer;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.discovery.RandomEndpointStrategy;
import io.cdap.cdap.common.discovery.URIScheme;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramId;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -79,15 +78,9 @@ private URL createURL(@Nullable Discoverable discoverable, String namespaceId, S
if (discoverable == null) {
return null;
}
InetSocketAddress address = discoverable.getSocketAddress();
String scheme = Arrays.equals(Constants.Security.SSL_URI_SCHEME.getBytes(), discoverable.getPayload()) ?
Constants.Security.SSL_URI_SCHEME : Constants.Security.URI_SCHEME;

String path = String.format("%s%s:%d%s/namespaces/%s/apps/%s/services/%s/methods/", scheme,
address.getHostName(), address.getPort(),
Constants.Gateway.API_VERSION_3, namespaceId, applicationId, serviceId);
try {
return new URL(path);
return URIScheme.createURI(discoverable, "%s/namespaces/%s/apps/%s/services/%s/methods/",
Constants.Gateway.API_VERSION_3_TOKEN, namespaceId, applicationId, serviceId).toURL();
} catch (MalformedURLException e) {
LOG.error("Got exception while creating serviceURL", e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.guice.ConfigModule;
import io.cdap.cdap.common.guice.IOModule;
import io.cdap.cdap.common.guice.LocalLocationModule;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class DefaultPreviewManager extends AbstractIdleService implements Previe

private final CConfiguration cConf;
private final Configuration hConf;
private final SConfiguration sConf;
private final DiscoveryService discoveryService;
private final DatasetFramework datasetFramework;
private final PreferencesService preferencesService;
Expand All @@ -123,7 +125,8 @@ public class DefaultPreviewManager extends AbstractIdleService implements Previe
private final ProgramRuntimeProviderLoader programRuntimeProviderLoader;

@Inject
DefaultPreviewManager(final CConfiguration cConf, Configuration hConf, DiscoveryService discoveryService,
DefaultPreviewManager(CConfiguration cConf, Configuration hConf,
SConfiguration sConf, DiscoveryService discoveryService,
@Named(DataSetsModules.BASE_DATASET_FRAMEWORK) DatasetFramework datasetFramework,
PreferencesService preferencesService, SecureStore secureStore,
TransactionSystemClient transactionSystemClient, ArtifactRepository artifactRepository,
Expand All @@ -132,6 +135,7 @@ public class DefaultPreviewManager extends AbstractIdleService implements Previe
ProgramRuntimeProviderLoader programRuntimeProviderLoader) {
this.cConf = cConf;
this.hConf = hConf;
this.sConf = sConf;
this.datasetFramework = datasetFramework;
this.discoveryService = discoveryService;
this.preferencesService = preferencesService;
Expand Down Expand Up @@ -274,8 +278,10 @@ Injector createPreviewInjector(ProgramId programId) throws IOException {
previewHConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
previewDir.resolve("fs").toUri().toString());

SConfiguration previewSConf = SConfiguration.copy(sConf);

return Guice.createInjector(
new ConfigModule(previewCConf, previewHConf),
new ConfigModule(previewCConf, previewHConf, previewSConf),
new IOModule(),
new AuthenticationContextModules().getMasterModule(),
new PreviewSecureStoreModule(secureStore),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,22 +376,25 @@ public void addSystemArtifacts() throws Exception {
}
}

ExecutorService executorService =
Executors.newFixedThreadPool(remainingArtifacts.size(),
Threads.createDaemonThreadFactory("system-artifact-loader-%d"));
try {
// loop until there is no change
boolean artifactsAdded = true;
while (!remainingArtifacts.isEmpty() && artifactsAdded) {
artifactsAdded = loadSystemArtifacts(executorService, systemArtifacts, remainingArtifacts, parentToChildren,
childToParents);
if (!remainingArtifacts.isEmpty()) {
ExecutorService executorService =
Executors.newFixedThreadPool(remainingArtifacts.size(),
Threads.createDaemonThreadFactory("system-artifact-loader-%d"));
try {
// loop until there is no change
boolean artifactsAdded = true;
while (!remainingArtifacts.isEmpty() && artifactsAdded) {
artifactsAdded = loadSystemArtifacts(executorService, systemArtifacts, remainingArtifacts, parentToChildren,
childToParents);
}
} finally {
executorService.shutdownNow();
}
} finally {
executorService.shutdownNow();
}

if (!remainingArtifacts.isEmpty()) {
LOG.warn("Unable to add system artifacts {} due to cyclic dependencies", Joiner.on(",").join(remainingArtifacts));
if (!remainingArtifacts.isEmpty()) {
LOG.warn("Unable to add system artifacts {} due to cyclic dependencies",
Joiner.on(",").join(remainingArtifacts));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import io.cdap.cdap.common.io.Locations;
import io.cdap.cdap.common.logging.LoggingContext;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.security.KeyStores;
import io.cdap.cdap.common.ssh.DefaultSSHSession;
import io.cdap.cdap.common.ssh.SSHConfig;
import io.cdap.cdap.common.utils.DirUtils;
import io.cdap.cdap.logging.context.LoggingContextHelper;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.runtime.spi.ssh.SSHSession;
import io.cdap.cdap.security.tools.KeyStores;
import joptsimple.OptionSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.cdap.cdap.common.io.Locations;
import io.cdap.cdap.common.logging.LoggingContext;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.security.KeyStores;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
Expand Down Expand Up @@ -66,7 +67,6 @@
import io.cdap.cdap.runtime.spi.provisioner.Node;
import io.cdap.cdap.runtime.spi.ssh.SSHKeyPair;
import io.cdap.cdap.runtime.spi.ssh.SSHSession;
import io.cdap.cdap.security.tools.KeyStores;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.common.http.HttpRequestConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.common.io.CharStreams;
import com.google.common.net.HttpHeaders;
import io.cdap.cdap.common.ServiceUnavailableException;
import io.cdap.cdap.security.tools.HttpsEnabler;
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.common.http.HttpRequestConfig;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import io.cdap.cdap.common.http.CommonNettyHttpServiceBuilder;
import io.cdap.cdap.common.logging.LogSamplers;
import io.cdap.cdap.common.logging.Loggers;
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.common.security.KeyStores;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.utils.Networks;
import io.cdap.cdap.messaging.MessagingService;
import io.cdap.cdap.messaging.context.MultiThreadMessagingContext;
import io.cdap.cdap.security.tools.HttpsEnabler;
import io.cdap.cdap.security.tools.KeyStores;
import io.cdap.http.AbstractHttpHandler;
import io.cdap.http.HttpResponder;
import io.cdap.http.NettyHttpService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,32 @@
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.discovery.ResolvingDiscoverable;
import io.cdap.cdap.common.discovery.URIScheme;
import io.cdap.cdap.common.http.CommonNettyHttpServiceBuilder;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.logging.ServiceLoggingContext;
import io.cdap.cdap.common.metrics.MetricsReporterHook;
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.internal.bootstrap.BootstrapService;
import io.cdap.cdap.internal.provision.ProvisioningService;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.scheduler.CoreSchedulerService;
import io.cdap.cdap.security.tools.HttpsEnabler;
import io.cdap.cdap.security.tools.KeyStores;
import io.cdap.http.HandlerHook;
import io.cdap.http.HttpHandler;
import io.cdap.http.NettyHttpService;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -156,15 +153,10 @@ protected void startUp() throws Exception {
.setBossThreadPoolSize(cConf.getInt(Constants.AppFabric.BOSS_THREADS,
Constants.AppFabric.DEFAULT_BOSS_THREADS))
.setWorkerThreadPoolSize(cConf.getInt(Constants.AppFabric.WORKER_THREADS,
Constants.AppFabric.DEFAULT_WORKER_THREADS));
Constants.AppFabric.DEFAULT_WORKER_THREADS))
.setPort(cConf.getInt(Constants.AppFabric.SERVER_PORT));
if (sslEnabled) {
httpServiceBuilder.setPort(cConf.getInt(Constants.AppFabric.SERVER_SSL_PORT));

String password = KeyStores.generateRandomPassword();
KeyStore ks = KeyStores.generatedCertKeyStore(sConf, password);
new HttpsEnabler().setKeyStore(ks, password::toCharArray).enable(httpServiceBuilder);
} else {
httpServiceBuilder.setPort(cConf.getInt(Constants.AppFabric.SERVER_PORT));
new HttpsEnabler().configureKeyStore(cConf, sConf).enable(httpServiceBuilder);
}

cancelHttpService = startHttpService(httpServiceBuilder.build());
Expand All @@ -182,7 +174,7 @@ protected void shutDown() throws Exception {
provisioningService.stopAndWait();
}

private Cancellable startHttpService(final NettyHttpService httpService) throws Exception {
private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
httpService.start();

String announceAddress = cConf.get(Constants.Service.MASTER_SERVICES_ANNOUNCE_ADDRESS,
Expand All @@ -194,34 +186,31 @@ private Cancellable startHttpService(final NettyHttpService httpService) throws
LOG.info("AppFabric HTTP Service announced at {}", socketAddress);

// Tag the discoverable's payload to mark it as supporting ssl.
byte[] sslPayload = sslEnabled ? Constants.Security.SSL_URI_SCHEME.getBytes() : Bytes.EMPTY_BYTE_ARRAY;
URIScheme uriScheme = sslEnabled ? URIScheme.HTTPS : URIScheme.HTTP;
// TODO accept a list of services, and start them here
// When it is running, register it with service discovery

final List<Cancellable> cancellables = new ArrayList<>();
for (final String serviceName : servicesNames) {
cancellables.add(discoveryService.register(ResolvingDiscoverable.of(
new Discoverable(serviceName, socketAddress, sslPayload))));
cancellables.add(discoveryService.register(
ResolvingDiscoverable.of(uriScheme.createDiscoverable(serviceName, socketAddress))));
}

return new Cancellable() {
@Override
public void cancel() {
LOG.debug("Stopping AppFabric HTTP service.");
for (Cancellable cancellable : cancellables) {
if (cancellable != null) {
cancellable.cancel();
}
}

try {
httpService.stop();
} catch (Exception e) {
LOG.warn("Exception raised when stopping AppFabric HTTP service", e);
return () -> {
LOG.debug("Stopping AppFabric HTTP service.");
for (Cancellable cancellable : cancellables) {
if (cancellable != null) {
cancellable.cancel();
}
}

LOG.info("AppFabric HTTP service stopped.");
try {
httpService.stop();
} catch (Exception e) {
LOG.warn("Exception raised when stopping AppFabric HTTP service", e);
}

LOG.info("AppFabric HTTP service stopped.");
};
}
}
Loading

0 comments on commit 9a61ff2

Please sign in to comment.