From 1a722cc47be930e28ea188e97a8740dba318f9c7 Mon Sep 17 00:00:00 2001 From: Terence Yim Date: Fri, 6 Sep 2019 00:11:53 -0700 Subject: [PATCH] (CDAP-14987) Reporting service status correctly in k8s - Refactor to unify MasterServiceManager code - Use service discovery to report service health - Handle enable and disable of services correctly --- .../guice/DelegatingMasterServiceManager.java | 59 +++-- .../cdap/app/guice/MonitorHandlerModule.java | 70 ++++-- .../cdap/gateway/handlers/MonitorHandler.java | 15 +- .../InMemoryTransactionServiceManager.java | 50 ---- .../distributed/AppFabricServiceManager.java | 52 +--- .../TransactionServiceManager.java | 42 ++-- .../NonHadoopAppFabricServiceManager.java | 18 +- .../internal/guice/AppFabricTestModule.java | 4 + .../io/cdap/cdap/common/conf/Constants.java | 8 +- ...stractDistributedMasterServiceManager.java | 229 ------------------ .../twill/AbstractMasterServiceManager.java | 205 ++++++++++++++++ .../common/twill/MasterServiceManager.java | 54 +++-- .../common/twill}/NoopTwillController.java | 42 +++- .../cdap/common/twill}/NoopTwillPreparer.java | 2 +- .../common/twill}/NoopTwillRunnerService.java | 7 +- .../DatasetExecutorServiceManager.java | 14 +- .../dataset/MetadataServiceManager.java | 20 +- .../service/ExploreServiceManager.java | 19 +- .../k8s/KubeMasterEnvironment.java | 4 + .../environment/MockMasterEnvironment.java | 1 + .../cdap/proto/SystemServiceLiveInfo.java | 2 +- .../java/io/cdap/cdap/StandaloneMain.java | 13 +- .../distributed/MessagingServiceManager.java | 15 +- .../main/java/io/cdap/cdap/test/TestBase.java | 5 + ...InMemoryDatasetExecutorServiceManager.java | 31 --- .../run/InMemoryExploreServiceManager.java | 45 ---- .../run/InMemoryLogSaverServiceManager.java | 36 --- .../run/InMemoryMessagingServiceManager.java | 31 --- .../run/InMemoryMetadataServiceManager.java | 31 --- ...nMemoryMetricsProcessorServiceManager.java | 36 --- .../run/InMemoryMetricsServiceManager.java | 31 --- ...nager.java => LogSaverServiceManager.java} | 13 +- .../MetricsProcessorStatusServiceManager.java | 14 +- .../runtime/MetricsServiceManager.java | 13 +- 34 files changed, 480 insertions(+), 751 deletions(-) rename cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractInMemoryMasterServiceManager.java => cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/DelegatingMasterServiceManager.java (56%) delete mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/InMemoryTransactionServiceManager.java rename cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryAppFabricServiceManager.java => cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/NonHadoopAppFabricServiceManager.java (53%) delete mode 100644 cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractDistributedMasterServiceManager.java create mode 100644 cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterServiceManager.java rename {cdap-master/src/test/java/io/cdap/cdap/master/environment => cdap-common/src/main/java/io/cdap/cdap/common/twill}/NoopTwillController.java (66%) rename {cdap-master/src/test/java/io/cdap/cdap/master/environment => cdap-common/src/main/java/io/cdap/cdap/common/twill}/NoopTwillPreparer.java (99%) rename {cdap-master/src/test/java/io/cdap/cdap/master/environment => cdap-common/src/main/java/io/cdap/cdap/common/twill}/NoopTwillRunnerService.java (89%) delete mode 100644 cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryDatasetExecutorServiceManager.java delete mode 100644 cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryExploreServiceManager.java delete mode 100644 cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryLogSaverServiceManager.java delete mode 100644 cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMessagingServiceManager.java delete mode 100644 cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetadataServiceManager.java delete mode 100644 cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetricsProcessorServiceManager.java delete mode 100644 cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetricsServiceManager.java rename cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/{LogSaverStatusServiceManager.java => LogSaverServiceManager.java} (67%) diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractInMemoryMasterServiceManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/DelegatingMasterServiceManager.java similarity index 56% rename from cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractInMemoryMasterServiceManager.java rename to cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/DelegatingMasterServiceManager.java index bd74a24a1a4e..d4ec97b4daf0 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractInMemoryMasterServiceManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/DelegatingMasterServiceManager.java @@ -1,5 +1,5 @@ /* - * Copyright © 2014 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -14,10 +14,9 @@ * the License. */ -package io.cdap.cdap.common.twill; +package io.cdap.cdap.app.guice; -import com.google.common.collect.ImmutableList; -import io.cdap.cdap.proto.Containers; +import io.cdap.cdap.common.twill.MasterServiceManager; import io.cdap.cdap.proto.SystemServiceLiveInfo; import org.apache.twill.api.logging.LogEntry; @@ -25,72 +24,82 @@ import java.util.Set; /** - * InMemory CDAP Service Management class. + * A {@link MasterServiceManager} that delegates all methods to another {@link MasterServiceManager}. */ -public abstract class AbstractInMemoryMasterServiceManager implements MasterServiceManager { +class DelegatingMasterServiceManager implements MasterServiceManager { + + private final MasterServiceManager delegate; + + DelegatingMasterServiceManager(MasterServiceManager delegate) { + this.delegate = delegate; + } + + MasterServiceManager getDelegate() { + return delegate; + } @Override - public SystemServiceLiveInfo getLiveInfo() { - return new SystemServiceLiveInfo(ImmutableList.of()); + public boolean isServiceEnabled() { + return getDelegate().isServiceEnabled(); + } + + @Override + public String getDescription() { + return getDelegate().getDescription(); } @Override public int getInstances() { - return 1; + return getDelegate().getInstances(); } @Override public boolean setInstances(int instanceCount) { - return false; + return getDelegate().setInstances(instanceCount); } @Override public int getMinInstances() { - return 1; + return getDelegate().getMinInstances(); } @Override public int getMaxInstances() { - return 1; + return getDelegate().getMaxInstances(); } @Override public boolean isLogAvailable() { - return true; - } - - @Override - public boolean canCheckStatus() { - return true; + return getDelegate().isLogAvailable(); } @Override public boolean isServiceAvailable() { - return true; + return getDelegate().isServiceAvailable(); } @Override - public boolean isServiceEnabled() { - return true; + public SystemServiceLiveInfo getLiveInfo() { + return getDelegate().getLiveInfo(); } @Override public void restartAllInstances() { - // No operation for in memory manager. + getDelegate().restartAllInstances(); } @Override public void restartInstances(int instanceId, int... moreInstanceIds) { - // No operation for in memory manager. + getDelegate().restartInstances(instanceId, moreInstanceIds); } @Override public void updateServiceLogLevels(Map logLevels) { - // No operation for in memory manager. + getDelegate().updateServiceLogLevels(logLevels); } @Override public void resetServiceLogLevels(Set loggerNames) { - // No operation for in memory manager. + getDelegate().resetServiceLogLevels(loggerNames); } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/MonitorHandlerModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/MonitorHandlerModule.java index 8cd4af248f50..0af452648d00 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/MonitorHandlerModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/MonitorHandlerModule.java @@ -41,19 +41,11 @@ import io.cdap.cdap.explore.service.ExploreServiceManager; import io.cdap.cdap.gateway.handlers.DatasetServiceStore; import io.cdap.cdap.gateway.handlers.MonitorHandler; -import io.cdap.cdap.internal.app.runtime.batch.InMemoryTransactionServiceManager; import io.cdap.cdap.internal.app.runtime.distributed.AppFabricServiceManager; import io.cdap.cdap.internal.app.runtime.distributed.TransactionServiceManager; +import io.cdap.cdap.internal.app.runtime.monitor.NonHadoopAppFabricServiceManager; import io.cdap.cdap.internal.app.services.AppFabricServer; -import io.cdap.cdap.logging.run.InMemoryAppFabricServiceManager; -import io.cdap.cdap.logging.run.InMemoryDatasetExecutorServiceManager; -import io.cdap.cdap.logging.run.InMemoryExploreServiceManager; -import io.cdap.cdap.logging.run.InMemoryLogSaverServiceManager; -import io.cdap.cdap.logging.run.InMemoryMessagingServiceManager; -import io.cdap.cdap.logging.run.InMemoryMetadataServiceManager; -import io.cdap.cdap.logging.run.InMemoryMetricsProcessorServiceManager; -import io.cdap.cdap.logging.run.InMemoryMetricsServiceManager; -import io.cdap.cdap.logging.run.LogSaverStatusServiceManager; +import io.cdap.cdap.logging.run.LogSaverServiceManager; import io.cdap.cdap.messaging.distributed.MessagingServiceManager; import io.cdap.cdap.metrics.runtime.MetricsProcessorStatusServiceManager; import io.cdap.cdap.metrics.runtime.MetricsServiceManager; @@ -66,7 +58,7 @@ */ public class MonitorHandlerModule extends AbstractModule { - public static final String SERVICE_STORE_DS_MODULES = "service.store.ds.modules"; + private static final String SERVICE_STORE_DS_MODULES = "service.store.ds.modules"; private final boolean isHadoop; public MonitorHandlerModule(boolean isHadoop) { @@ -112,15 +104,24 @@ protected void configure() { private void addNonHadoopBindings(Binder binder) { MapBinder mapBinder = MapBinder.newMapBinder(binder, String.class, MasterServiceManager.class); - mapBinder.addBinding(Constants.Service.LOGSAVER).to(InMemoryLogSaverServiceManager.class); - mapBinder.addBinding(Constants.Service.TRANSACTION).to(InMemoryTransactionServiceManager.class); - mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR).to(InMemoryMetricsProcessorServiceManager.class); - mapBinder.addBinding(Constants.Service.METRICS).to(InMemoryMetricsServiceManager.class); - mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP).to(InMemoryAppFabricServiceManager.class); - mapBinder.addBinding(Constants.Service.DATASET_EXECUTOR).to(InMemoryDatasetExecutorServiceManager.class); - mapBinder.addBinding(Constants.Service.METADATA_SERVICE).to(InMemoryMetadataServiceManager.class); - mapBinder.addBinding(Constants.Service.EXPLORE_HTTP_USER_SERVICE).to(InMemoryExploreServiceManager.class); - mapBinder.addBinding(Constants.Service.MESSAGING_SERVICE).to(InMemoryMessagingServiceManager.class); + mapBinder.addBinding(Constants.Service.LOGSAVER) + .toProvider(new NonHadoopMasterServiceManagerProvider(LogSaverServiceManager.class)); + mapBinder.addBinding(Constants.Service.TRANSACTION) + .toProvider(new NonHadoopMasterServiceManagerProvider(TransactionServiceManager.class)); + mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR) + .toProvider(new NonHadoopMasterServiceManagerProvider(MetricsProcessorStatusServiceManager.class)); + mapBinder.addBinding(Constants.Service.METRICS) + .toProvider(new NonHadoopMasterServiceManagerProvider(MetricsServiceManager.class)); + mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP) + .toProvider(new NonHadoopMasterServiceManagerProvider(NonHadoopAppFabricServiceManager.class)); + mapBinder.addBinding(Constants.Service.DATASET_EXECUTOR) + .toProvider(new NonHadoopMasterServiceManagerProvider(DatasetExecutorServiceManager.class)); + mapBinder.addBinding(Constants.Service.METADATA_SERVICE) + .toProvider(new NonHadoopMasterServiceManagerProvider(MetadataServiceManager.class)); + mapBinder.addBinding(Constants.Service.EXPLORE_HTTP_USER_SERVICE) + .toProvider(new NonHadoopMasterServiceManagerProvider(ExploreServiceManager.class)); + mapBinder.addBinding(Constants.Service.MESSAGING_SERVICE) + .toProvider(new NonHadoopMasterServiceManagerProvider(MessagingServiceManager.class)); // The ServiceStore uses a special non-TX KV Table. bindDatasetModule(binder, new InMemoryKVTableDefinition.Module()); @@ -134,7 +135,7 @@ private void addNonHadoopBindings(Binder binder) { private void addHadoopBindings(Binder binder) { MapBinder mapBinder = MapBinder.newMapBinder(binder, String.class, MasterServiceManager.class); - mapBinder.addBinding(Constants.Service.LOGSAVER).to(LogSaverStatusServiceManager.class); + mapBinder.addBinding(Constants.Service.LOGSAVER).to(LogSaverServiceManager.class); mapBinder.addBinding(Constants.Service.TRANSACTION).to(TransactionServiceManager.class); mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR).to(MetricsProcessorStatusServiceManager.class); mapBinder.addBinding(Constants.Service.METRICS).to(MetricsServiceManager.class); @@ -153,7 +154,7 @@ private void addHadoopBindings(Binder binder) { */ private void bindDatasetModule(Binder binder, DatasetModule module) { MapBinder mapBinder = MapBinder.newMapBinder( - binder(), String.class, DatasetModule.class, Names.named(SERVICE_STORE_DS_MODULES)); + binder, String.class, DatasetModule.class, Names.named(SERVICE_STORE_DS_MODULES)); mapBinder.addBinding(module.getClass().getName()).toInstance(module); } @@ -178,4 +179,29 @@ public DatasetFramework get() { return new InMemoryDatasetFramework(new DefaultDatasetDefinitionRegistryFactory(injector), datasetModules); } } + + /** + * Provides for {@link MasterServiceManager} used in non-hadoop environment. + */ + private static final class NonHadoopMasterServiceManagerProvider implements Provider { + + private final Class serviceManagerClass; + @Inject + private Injector injector; + + NonHadoopMasterServiceManagerProvider(Class serviceManagerClass) { + this.serviceManagerClass = serviceManagerClass; + } + + @Override + public MasterServiceManager get() { + return new DelegatingMasterServiceManager(injector.getInstance(serviceManagerClass)) { + @Override + public int getInstances() { + // For now it is always just one instance for each master service in non-hadoop environment. + return 1; + } + }; + } + } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/MonitorHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/MonitorHandler.java index f4af2b064ba8..2be1776648b7 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/MonitorHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/MonitorHandler.java @@ -63,14 +63,12 @@ public class MonitorHandler extends AbstractAppFabricHttpHandler { private static final String STATUSOK = Constants.Monitor.STATUS_OK; private static final String STATUSNOTOK = Constants.Monitor.STATUS_NOTOK; - private static final String NOTAPPLICABLE = "NA"; private final Map serviceManagementMap; private final ServiceStore serviceStore; @Inject - public MonitorHandler(Map serviceMap, - ServiceStore serviceStore) throws Exception { + public MonitorHandler(Map serviceMap, ServiceStore serviceStore) { this.serviceManagementMap = serviceMap; this.serviceStore = serviceStore; } @@ -160,7 +158,7 @@ public void getBootStatus(HttpRequest request, HttpResponder responder) { Map result = new HashMap<>(); for (String service : serviceManagementMap.keySet()) { MasterServiceManager masterServiceManager = serviceManagementMap.get(service); - if (masterServiceManager.isServiceEnabled() && masterServiceManager.canCheckStatus()) { + if (masterServiceManager.isServiceEnabled()) { String status = masterServiceManager.isServiceAvailable() ? STATUSOK : STATUSNOTOK; result.put(service, status); } @@ -170,7 +168,7 @@ public void getBootStatus(HttpRequest request, HttpResponder responder) { @Path("/system/services") @GET - public void getServiceSpec(HttpRequest request, HttpResponder responder) throws Exception { + public void getServiceSpec(HttpRequest request, HttpResponder responder) { List response = new ArrayList<>(); SortedSet services = new TreeSet<>(serviceManagementMap.keySet()); List serviceList = new ArrayList<>(services); @@ -178,8 +176,7 @@ public void getServiceSpec(HttpRequest request, HttpResponder responder) throws MasterServiceManager serviceManager = serviceManagementMap.get(service); if (serviceManager.isServiceEnabled()) { String logs = serviceManager.isLogAvailable() ? Constants.Monitor.STATUS_OK : Constants.Monitor.STATUS_NOTOK; - String canCheck = serviceManager.canCheckStatus() ? ( - serviceManager.isServiceAvailable() ? STATUSOK : STATUSNOTOK) : NOTAPPLICABLE; + String canCheck = serviceManager.isServiceAvailable() ? STATUSOK : STATUSNOTOK; //TODO: Add metric name for Event Rate monitoring response.add(new SystemServiceMeta(service, serviceManager.getDescription(), canCheck, logs, serviceManager.getMinInstances(), serviceManager.getMaxInstances(), @@ -277,7 +274,7 @@ public void resetServiceLogLevels(FullHttpRequest request, HttpResponder respond try { Set loggerNames = parseBody(request, SET_STRING_TYPE); - masterServiceManager.resetServiceLogLevels(loggerNames == null ? Collections.emptySet() : loggerNames); + masterServiceManager.resetServiceLogLevels(loggerNames == null ? Collections.emptySet() : loggerNames); responder.sendStatus(HttpResponseStatus.OK); } catch (IllegalStateException ise) { throw new ServiceUnavailableException(String.format("Failed to reset log levels for service %s " + @@ -287,7 +284,7 @@ public void resetServiceLogLevels(FullHttpRequest request, HttpResponder respond } } - private int getSystemServiceInstanceCount(String serviceName) throws Exception { + private int getSystemServiceInstanceCount(String serviceName) { Integer count = serviceStore.getServiceInstance(serviceName); //In standalone mode, this count will be null. And thus we just return the actual instance count. diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/InMemoryTransactionServiceManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/InMemoryTransactionServiceManager.java deleted file mode 100644 index a426250c7b0d..000000000000 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/InMemoryTransactionServiceManager.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright © 2014 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.internal.app.runtime.batch; - -import com.google.inject.Inject; -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractInMemoryMasterServiceManager; -import org.apache.tephra.TransactionSystemClient; - -/** - * - */ -public class InMemoryTransactionServiceManager extends AbstractInMemoryMasterServiceManager { - private TransactionSystemClient txClient; - - @Override - public boolean isLogAvailable() { - return false; - } - - @Inject - public InMemoryTransactionServiceManager(TransactionSystemClient txClient) { - this.txClient = txClient; - } - - @Override - public boolean isServiceAvailable() { - return txClient.status().equals(Constants.Monitor.STATUS_OK); - } - - @Override - public String getDescription() { - return Constants.Transaction.SERVICE_DESCRIPTION; - } - -} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/AppFabricServiceManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/AppFabricServiceManager.java index 15b2de8ea87d..1d00cdf6df04 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/AppFabricServiceManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/AppFabricServiceManager.java @@ -53,10 +53,10 @@ public class AppFabricServiceManager implements MasterServiceManager { private final Map oldLogLevels; @Inject - public AppFabricServiceManager(@Named(Constants.Service.MASTER_SERVICES_BIND_ADDRESS) InetAddress hostname) { + AppFabricServiceManager(@Named(Constants.Service.MASTER_SERVICES_BIND_ADDRESS) InetAddress hostname) { this.hostname = hostname; // this is to remember old log levels, will be used during reset - this.oldLogLevels = Collections.synchronizedMap(new HashMap()); + this.oldLogLevels = Collections.synchronizedMap(new HashMap<>()); } @Inject(optional = true) @@ -72,11 +72,6 @@ public String getDescription() { return Constants.AppFabric.SERVICE_DESCRIPTION; } - @Override - public int getMaxInstances() { - return 1; - } - @Override public SystemServiceLiveInfo getLiveInfo() { SystemServiceLiveInfo.Builder builder = SystemServiceLiveInfo.builder(); @@ -100,51 +95,11 @@ public SystemServiceLiveInfo getLiveInfo() { return builder.build(); } - @Override - public int getInstances() { - return 1; - } - - @Override - public boolean setInstances(int instanceCount) { - return false; - } - - @Override - public int getMinInstances() { - return 1; - } - - @Override - public boolean isLogAvailable() { - return true; - } - - @Override - public boolean canCheckStatus() { - return true; - } - @Override public boolean isServiceAvailable() { return true; } - @Override - public boolean isServiceEnabled() { - return true; - } - - @Override - public void restartAllInstances() { - // no-op - } - - @Override - public void restartInstances(int instanceId, int... moreInstanceIds) { - // no-op - } - /** * Update log levels for app fabric service. * Note that changing the log levels of app fabric service will also change log levels for app.fabric server and @@ -178,6 +133,9 @@ public void updateServiceLogLevels(Map logLevels) { @Override public void resetServiceLogLevels(Set loggerNames) { LoggerContext context = getLoggerContext(); + if (context == null) { + return; + } Iterator> entryIterator = oldLogLevels.entrySet().iterator(); while (entryIterator.hasNext()) { Map.Entry entry = entryIterator.next(); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/TransactionServiceManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/TransactionServiceManager.java index 2616865475f9..dd61aaace79d 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/TransactionServiceManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/TransactionServiceManager.java @@ -19,54 +19,48 @@ import com.google.inject.Inject; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.discovery.RandomEndpointStrategy; -import io.cdap.cdap.common.twill.AbstractDistributedMasterServiceManager; +import io.cdap.cdap.common.twill.AbstractMasterServiceManager; import org.apache.tephra.TransactionSystemClient; -import org.apache.twill.api.TwillRunnerService; -import org.apache.twill.discovery.Discoverable; +import org.apache.twill.api.TwillRunner; import org.apache.twill.discovery.DiscoveryServiceClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; - /** * Transaction Service Management in Distributed Mode. */ -public class TransactionServiceManager extends AbstractDistributedMasterServiceManager { +public class TransactionServiceManager extends AbstractMasterServiceManager { + private static final Logger LOG = LoggerFactory.getLogger(TransactionServiceManager.class); + private final TransactionSystemClient txClient; - private final DiscoveryServiceClient discoveryServiceClient; - private final boolean isTxEnabled; @Inject - public TransactionServiceManager(CConfiguration cConf, TwillRunnerService twillRunnerService, - TransactionSystemClient txClient, DiscoveryServiceClient discoveryServiceClient) { - super(cConf, Constants.Service.TRANSACTION, twillRunnerService, discoveryServiceClient); + TransactionServiceManager(CConfiguration cConf, TwillRunner twillRunner, + TransactionSystemClient txClient, DiscoveryServiceClient discoveryClient) { + super(cConf, discoveryClient, Constants.Service.TRANSACTION, twillRunner); this.txClient = txClient; - this.isTxEnabled = cConf.getBoolean(Constants.Transaction.TX_ENABLED); - this.discoveryServiceClient = discoveryServiceClient; + } + + @Override + public boolean isServiceEnabled() { + return getCConf().getBoolean(Constants.Transaction.TX_ENABLED); } @Override public int getMaxInstances() { - return cConf.getInt(Constants.Transaction.Container.MAX_INSTANCES); + return getCConf().getInt(Constants.Transaction.Container.MAX_INSTANCES); } @Override public boolean isServiceAvailable() { + if (!isServiceEnabled()) { + return false; + } try { - Discoverable discoverable = new RandomEndpointStrategy(() -> discoveryServiceClient.discover(serviceName)) - .pick(discoveryTimeout, TimeUnit.SECONDS); - if (discoverable == null && isTxEnabled) { - return false; - } - return txClient.status().equals(Constants.Monitor.STATUS_OK); - } catch (IllegalArgumentException e) { - return false; } catch (Exception e) { - LOG.warn("Unable to ping {} : Reason {} ", serviceName, e.getMessage()); + LOG.warn("Unable to ping {}", Constants.Service.TRANSACTION, e); return false; } } diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryAppFabricServiceManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/NonHadoopAppFabricServiceManager.java similarity index 53% rename from cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryAppFabricServiceManager.java rename to cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/NonHadoopAppFabricServiceManager.java index 4310a702a177..8dbb565a6d93 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryAppFabricServiceManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/monitor/NonHadoopAppFabricServiceManager.java @@ -1,5 +1,5 @@ /* - * Copyright © 2014 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -14,15 +14,25 @@ * the License. */ -package io.cdap.cdap.logging.run; +package io.cdap.cdap.internal.app.runtime.monitor; +import com.google.inject.Inject; +import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractInMemoryMasterServiceManager; +import io.cdap.cdap.common.twill.AbstractMasterServiceManager; +import org.apache.twill.api.TwillRunner; +import org.apache.twill.discovery.DiscoveryServiceClient; /** * Service for managing app fabric service. */ -public class InMemoryAppFabricServiceManager extends AbstractInMemoryMasterServiceManager { +public class NonHadoopAppFabricServiceManager extends AbstractMasterServiceManager { + + @Inject + NonHadoopAppFabricServiceManager(CConfiguration cConf, DiscoveryServiceClient discoveryClient, + TwillRunner twillRunner) { + super(cConf, discoveryClient, Constants.Service.APP_FABRIC_HTTP, twillRunner); + } @Override public String getDescription() { diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/guice/AppFabricTestModule.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/guice/AppFabricTestModule.java index 0e3de85057fb..8f0f446c4801 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/guice/AppFabricTestModule.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/guice/AppFabricTestModule.java @@ -28,6 +28,7 @@ import io.cdap.cdap.common.guice.IOModule; import io.cdap.cdap.common.guice.InMemoryDiscoveryModule; import io.cdap.cdap.common.guice.NonCustomLocationUnitTestModule; +import io.cdap.cdap.common.twill.NoopTwillRunnerService; import io.cdap.cdap.config.guice.ConfigStoreModule; import io.cdap.cdap.data.runtime.DataFabricModules; import io.cdap.cdap.data.runtime.DataSetServiceModules; @@ -46,6 +47,7 @@ import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule; import io.cdap.cdap.security.guice.SecureStoreServerModule; import org.apache.hadoop.conf.Configuration; +import org.apache.twill.api.TwillRunner; import java.io.File; import javax.annotation.Nullable; @@ -105,5 +107,7 @@ protected void configure() { install(new MetadataReaderWriterModules().getInMemoryModules()); install(new MessagingServerRuntimeModule().getInMemoryModules()); install(new MockProvisionerModule()); + // Needed by MonitorHandlerModuler + bind(TwillRunner.class).to(NoopTwillRunnerService.class); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 1046e0f130e3..94b81831a856 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -19,14 +19,10 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.BindingAnnotation; import io.cdap.cdap.proto.id.NamespaceId; -import org.apache.twill.discovery.Discoverable; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.Target; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Map; import static java.lang.annotation.RetentionPolicy.RUNTIME; @@ -100,13 +96,13 @@ public static final class Startup { */ public static final class Master { public static final String EXTENSIONS_DIR = "master.environment.extensions.dir"; + public static final String MAX_INSTANCES = "master.service.max.instances"; } /** * Global Service names. */ public static final class Service { - public static final String ACL = "acl"; public static final String APP_FABRIC_HTTP = "appfabric"; public static final String TRANSACTION = "transaction"; public static final String TRANSACTION_HTTP = "transaction.http"; @@ -1311,6 +1307,8 @@ public static final class Audit { * Constants for the messaging system */ public static final class MessagingSystem { + public static final String SERVICE_DESCRIPTION = "Service for providing messaging system."; + public static final String LOCAL_DATA_DIR = "messaging.local.data.dir"; public static final String LOCAL_DATA_CLEANUP_FREQUENCY = "messaging.local.data.cleanup.frequency.secs"; diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractDistributedMasterServiceManager.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractDistributedMasterServiceManager.java deleted file mode 100644 index 304c1a94cba2..000000000000 --- a/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractDistributedMasterServiceManager.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright © 2014-2017 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.common.twill; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; -import io.cdap.cdap.common.conf.CConfiguration; -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.discovery.URIScheme; -import io.cdap.cdap.proto.Containers; -import io.cdap.cdap.proto.SystemServiceLiveInfo; -import io.netty.handler.codec.http.HttpResponseStatus; -import org.apache.twill.api.ResourceReport; -import org.apache.twill.api.TwillController; -import org.apache.twill.api.TwillRunResources; -import org.apache.twill.api.TwillRunnerService; -import org.apache.twill.api.logging.LogEntry; -import org.apache.twill.discovery.Discoverable; -import org.apache.twill.discovery.DiscoveryServiceClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.HttpURLConnection; -import java.net.SocketTimeoutException; -import java.net.URL; -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * Abstract class that can be extended by individual CDAP Services to implement their management methods. - */ -public abstract class AbstractDistributedMasterServiceManager implements MasterServiceManager { - private static final Logger LOG = LoggerFactory.getLogger(AbstractDistributedMasterServiceManager.class); - private static final long SERVICE_PING_RESPONSE_TIMEOUT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS); - protected final long discoveryTimeout; - - protected CConfiguration cConf; - protected TwillRunnerService twillRunnerService; - protected String serviceName; - protected DiscoveryServiceClient discoveryServiceClient; - - public AbstractDistributedMasterServiceManager(CConfiguration cConf, String serviceName, - TwillRunnerService twillRunnerService, - DiscoveryServiceClient discoveryServiceClient) { - this.cConf = cConf; - this.serviceName = serviceName; - this.twillRunnerService = twillRunnerService; - this.discoveryTimeout = cConf.getLong(Constants.Monitor.DISCOVERY_TIMEOUT_SECONDS); - this.discoveryServiceClient = discoveryServiceClient; - } - - @Override - public SystemServiceLiveInfo getLiveInfo() { - SystemServiceLiveInfo.Builder builder = SystemServiceLiveInfo.builder(); - - Iterable twillControllerList = twillRunnerService.lookup(Constants.Service.MASTER_SERVICES); - if (twillControllerList == null) { - return builder.build(); - } - - for (TwillController twillController : twillControllerList) { - if (twillController.getResourceReport() == null) { - continue; - } - - ResourceReport resourceReport = twillController.getResourceReport(); - Collection runResources = resourceReport.getResources().get(serviceName); - for (TwillRunResources resources : runResources) { - Containers.ContainerInfo containerInfo = new Containers.ContainerInfo( - Containers.ContainerType.SYSTEM_SERVICE, - serviceName, - resources.getInstanceId(), - resources.getContainerId(), - resources.getHost(), - resources.getMemoryMB(), - resources.getVirtualCores(), - resources.getDebugPort()); - builder.addContainer(containerInfo); - } - } - return builder.build(); - } - - @Override - public int getInstances() { - Iterable twillControllerList = twillRunnerService.lookup(Constants.Service.MASTER_SERVICES); - int instances = 0; - if (twillControllerList != null) { - for (TwillController twillController : twillControllerList) { - if (twillController.getResourceReport() != null) { - instances = twillController.getResourceReport().getRunnableResources(serviceName).size(); - } - } - } - return instances; - } - - @Override - public boolean isServiceEnabled() { - // By default all the services are enabled. extending classes can override if the behavior should be different. - return true; - } - - @Override - public boolean setInstances(int instanceCount) { - Preconditions.checkArgument(instanceCount > 0); - try { - Iterable twillControllerList = twillRunnerService.lookup(Constants.Service.MASTER_SERVICES); - if (twillControllerList != null) { - for (TwillController twillController : twillControllerList) { - twillController.changeInstances(serviceName, instanceCount).get(); - } - } - return true; - } catch (Throwable t) { - LOG.error("Could not change service instance of {} : {}", serviceName, t.getMessage(), t); - return false; - } - } - - @Override - public int getMinInstances() { - return 1; - } - - @Override - public boolean canCheckStatus() { - return true; - } - - @Override - public boolean isLogAvailable() { - return true; - } - - @Override - public boolean isServiceAvailable() { - URL url = null; - try { - Iterable discoverables = this.discoveryServiceClient.discover(getDiscoverableName()); - for (Discoverable discoverable : discoverables) { - url = URIScheme.createURI(discoverable, "/ping").toURL(); - //Ping the discovered service to check its status. - if (checkGetStatus(url).equals(HttpResponseStatus.OK)) { - return true; - } - } - return false; - } catch (IllegalArgumentException e) { - return false; - } catch (Exception e) { - LOG.warn("Unable to ping {} at {} : Reason : {}", serviceName, url, e.getMessage()); - return false; - } - } - - protected String getDiscoverableName() { - return serviceName; - } - - private HttpResponseStatus checkGetStatus(URL url) throws Exception { - HttpURLConnection httpConn = null; - try { - httpConn = (HttpURLConnection) url.openConnection(); - httpConn.setConnectTimeout((int) SERVICE_PING_RESPONSE_TIMEOUT); - httpConn.setReadTimeout((int) SERVICE_PING_RESPONSE_TIMEOUT); - return (HttpResponseStatus.valueOf(httpConn.getResponseCode())); - } catch (SocketTimeoutException e) { - return HttpResponseStatus.NOT_FOUND; - } finally { - if (httpConn != null) { - httpConn.disconnect(); - } - } - } - - @Override - public void restartAllInstances() { - Iterable twillControllers = twillRunnerService.lookup(Constants.Service.MASTER_SERVICES); - for (TwillController twillController : twillControllers) { - // Call restart instances - Futures.getUnchecked(twillController.restartAllInstances(serviceName)); - } - } - - @Override - public void restartInstances(int instanceId, int... moreInstanceIds) { - Iterable twillControllers = twillRunnerService.lookup(Constants.Service.MASTER_SERVICES); - for (TwillController twillController : twillControllers) { - // Call restart instances - Futures.getUnchecked(twillController.restartInstances(serviceName, instanceId, moreInstanceIds)); - } - } - - @Override - public void updateServiceLogLevels(Map logLevels) { - Iterable twillControllers = twillRunnerService.lookup(Constants.Service.MASTER_SERVICES); - for (TwillController twillController : twillControllers) { - // Call update log levels - Futures.getUnchecked(twillController.updateLogLevels(serviceName, logLevels)); - } - } - - @Override - public void resetServiceLogLevels(Set loggerNames) { - Iterable twillControllers = twillRunnerService.lookup(Constants.Service.MASTER_SERVICES); - for (TwillController twillController : twillControllers) { - // Call reset log levels - Futures.getUnchecked(twillController.resetRunnableLogLevels(serviceName, - loggerNames.toArray(new String[loggerNames.size()]))); - } - } -} diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterServiceManager.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterServiceManager.java new file mode 100644 index 000000000000..72a1b1a51990 --- /dev/null +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterServiceManager.java @@ -0,0 +1,205 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.common.twill; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.discovery.RandomEndpointStrategy; +import io.cdap.cdap.common.discovery.URIScheme; +import io.cdap.cdap.common.logging.LogSamplers; +import io.cdap.cdap.common.logging.Loggers; +import io.cdap.cdap.proto.Containers; +import io.cdap.cdap.proto.SystemServiceLiveInfo; +import io.cdap.common.http.HttpRequest; +import io.cdap.common.http.HttpRequestConfig; +import io.cdap.common.http.HttpRequests; +import org.apache.twill.api.ResourceReport; +import org.apache.twill.api.TwillController; +import org.apache.twill.api.TwillRunResources; +import org.apache.twill.api.TwillRunner; +import org.apache.twill.api.logging.LogEntry; +import org.apache.twill.discovery.Discoverable; +import org.apache.twill.discovery.DiscoveryServiceClient; +import org.apache.twill.discovery.ServiceDiscovered; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; + +/** + * An abstract base class to provide common implementation for the {@link MasterServiceManager}. + */ +public abstract class AbstractMasterServiceManager implements MasterServiceManager { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractMasterServiceManager.class); + private static final Logger OUTAGE_LOG = Loggers.sampling(LOG, LogSamplers.limitRate(30000)); + + private final CConfiguration cConf; + private final String serviceName; + private final DiscoveryServiceClient discoveryClient; + private final HttpRequestConfig httpRequestConfig; + private final TwillRunner twillRunner; + private final int serviceTimeoutSeconds; + + protected AbstractMasterServiceManager(CConfiguration cConf, DiscoveryServiceClient discoveryClient, + String serviceName, TwillRunner twillRunner) { + this.cConf = cConf; + this.discoveryClient = discoveryClient; + this.serviceName = serviceName; + this.serviceTimeoutSeconds = cConf.getInt(Constants.Monitor.DISCOVERY_TIMEOUT_SECONDS); + + int requestTimeout = serviceTimeoutSeconds * 1000; + this.httpRequestConfig = new HttpRequestConfig(requestTimeout, requestTimeout, false); + this.twillRunner = twillRunner; + } + + /** + * Returns the {@link CConfiguration}. + */ + protected final CConfiguration getCConf() { + return cConf; + } + + /** + * Returns the name of the twill runnable. + */ + protected String getTwillRunnableName() { + return serviceName; + } + + @Override + public boolean isServiceAvailable() { + // Try to ping the endpoint. If any one of them is available, we treat the service as available. + ServiceDiscovered serviceDiscovered = discoveryClient.discover(serviceName); + // Block to wait for some endpoint to be available. This is just to compensate initialization. + // Calls after the first discovery will return immediately. + new RandomEndpointStrategy(() -> serviceDiscovered).pick(serviceTimeoutSeconds, TimeUnit.SECONDS); + return StreamSupport.stream(serviceDiscovered.spliterator(), false).anyMatch(this::isEndpointAlive); + } + + @Override + public SystemServiceLiveInfo getLiveInfo() { + SystemServiceLiveInfo.Builder builder = SystemServiceLiveInfo.builder(); + + for (TwillController twillController : twillRunner.lookup(Constants.Service.MASTER_SERVICES)) { + if (twillController.getResourceReport() == null) { + continue; + } + + ResourceReport resourceReport = twillController.getResourceReport(); + Collection runResources = resourceReport.getResources().getOrDefault(getTwillRunnableName(), + Collections.emptyList()); + for (TwillRunResources resources : runResources) { + Containers.ContainerInfo containerInfo = new Containers.ContainerInfo( + Containers.ContainerType.SYSTEM_SERVICE, + getTwillRunnableName(), + resources.getInstanceId(), + resources.getContainerId(), + resources.getHost(), + resources.getMemoryMB(), + resources.getVirtualCores(), + resources.getDebugPort()); + builder.addContainer(containerInfo); + } + } + return builder.build(); + } + + @Override + public int getInstances() { + for (TwillController twillController : twillRunner.lookup(Constants.Service.MASTER_SERVICES)) { + ResourceReport resourceReport = twillController.getResourceReport(); + if (resourceReport != null) { + int instances = resourceReport.getRunnableResources(getTwillRunnableName()).size(); + if (instances > 0) { + return instances; + } + } + } + return 0; + } + + @Override + public boolean setInstances(int instanceCount) { + Preconditions.checkArgument(instanceCount > 0); + try { + for (TwillController twillController : twillRunner.lookup(Constants.Service.MASTER_SERVICES)) { + twillController.changeInstances(getTwillRunnableName(), instanceCount).get(); + } + return true; + } catch (Throwable t) { + LOG.error("Could not change service instance of {} : {}", getTwillRunnableName(), t.getMessage(), t); + return false; + } + } + + @Override + public void restartAllInstances() { + for (TwillController twillController : twillRunner.lookup(Constants.Service.MASTER_SERVICES)) { + // Call restart instances + Futures.getUnchecked(twillController.restartAllInstances(getTwillRunnableName())); + } + } + + @Override + public void restartInstances(int instanceId, int... moreInstanceIds) { + for (TwillController twillController : twillRunner.lookup(Constants.Service.MASTER_SERVICES)) { + // Call restart instances + Futures.getUnchecked(twillController.restartInstances(getTwillRunnableName(), instanceId, moreInstanceIds)); + } + } + + @Override + public void updateServiceLogLevels(Map logLevels) { + for (TwillController twillController : twillRunner.lookup(Constants.Service.MASTER_SERVICES)) { + // Call update log levels + Futures.getUnchecked(twillController.updateLogLevels(getTwillRunnableName(), logLevels)); + } + } + + @Override + public void resetServiceLogLevels(Set loggerNames) { + for (TwillController twillController : twillRunner.lookup(Constants.Service.MASTER_SERVICES)) { + // Call reset log levels + Futures.getUnchecked(twillController.resetRunnableLogLevels(getTwillRunnableName(), + loggerNames.toArray(new String[0]))); + } + } + + private boolean isEndpointAlive(Discoverable discoverable) { + try { + URL url = URIScheme.createURI(discoverable, "/ping").toURL(); + int responseCode = HttpRequests.execute(HttpRequest.get(url).build(), httpRequestConfig).getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + return true; + } + } catch (IOException e) { + OUTAGE_LOG.warn("Failed to ping endpoint from discoverable {} for service {}", discoverable, serviceName, e); + } + return false; + } +} diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/twill/MasterServiceManager.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/MasterServiceManager.java index 60bafbd67318..3959a5022b12 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/twill/MasterServiceManager.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/MasterServiceManager.java @@ -31,7 +31,10 @@ public interface MasterServiceManager { /** * @return true if the configured to be available, false otherwise. */ - boolean isServiceEnabled(); + default boolean isServiceEnabled() { + // By default all the services are enabled. extending classes can override if the behavior should be different. + return true; + } /** * @return service description. @@ -43,7 +46,9 @@ public interface MasterServiceManager { * * @return the number of instances of the CDAP Service instances alive. */ - int getInstances(); + default int getInstances() { + return 1; + } /** * Set the number of instances of the CDAP service. @@ -51,35 +56,36 @@ public interface MasterServiceManager { * @param instanceCount number of instances (should be greater than 0) * @return was the operation successful */ - boolean setInstances(int instanceCount); + default boolean setInstances(int instanceCount) { + throw new UnsupportedOperationException("Setting number of instances is not supported."); + } /** * Get the minimum instance count for the service. * * @return the required minimum number of instances of the CDAP Service. */ - int getMinInstances(); + default int getMinInstances() { + return isServiceEnabled() ? 1 : 0; + } /** * Get the maximum instance count for the service. * * @return the allowed maximum number of instances of the CDAP Service. */ - int getMaxInstances(); + default int getMaxInstances() { + return 1; + } /** * Logging availability. * * @return true if logs are available. */ - boolean isLogAvailable(); - - /** - * Possible to check the status of the service. - * - * @return true if the status of the service can be checked. - */ - boolean canCheckStatus(); + default boolean isLogAvailable() { + return true; + } /** * Service's availability. @@ -88,14 +94,18 @@ public interface MasterServiceManager { */ boolean isServiceAvailable(); + /** + * Returns information about this system service runtime information. The information may not be available in + * all runtime environment. + */ SystemServiceLiveInfo getLiveInfo(); - //TODO: Add method to get the metrics name to get event rate on UI - /** * Restart all instances of this service. */ - void restartAllInstances(); + default void restartAllInstances() { + throw new UnsupportedOperationException("Restart all instances is not supported."); + } /** * Restart some instances of this service. @@ -103,14 +113,18 @@ public interface MasterServiceManager { * @param instanceId the instance id to be restarted. * @param moreInstanceIds optional additional instance ids to be restarted. */ - void restartInstances(int instanceId, int... moreInstanceIds); + default void restartInstances(int instanceId, int... moreInstanceIds) { + throw new UnsupportedOperationException("Restart instances is not supported."); + } /** * Update log levels for this service * * @param logLevels The {@link Map} contains the requested logger name and log level. */ - void updateServiceLogLevels(Map logLevels); + default void updateServiceLogLevels(Map logLevels) { + throw new UnsupportedOperationException("Update service log levels is not supported."); + } /** * Reset the log levels of the service. @@ -118,5 +132,7 @@ public interface MasterServiceManager { * * @param loggerNames The set of logger names to be reset, if empty, all loggers will be reset. */ - void resetServiceLogLevels(Set loggerNames); + default void resetServiceLogLevels(Set loggerNames) { + throw new UnsupportedOperationException("Reset service log levels is not supported."); + } } diff --git a/cdap-master/src/test/java/io/cdap/cdap/master/environment/NoopTwillController.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java similarity index 66% rename from cdap-master/src/test/java/io/cdap/cdap/master/environment/NoopTwillController.java rename to cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java index d5da33c73283..efffbdee3802 100644 --- a/cdap-master/src/test/java/io/cdap/cdap/master/environment/NoopTwillController.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java @@ -14,7 +14,7 @@ * the License. */ -package io.cdap.cdap.master.environment; +package io.cdap.cdap.common.twill; import org.apache.twill.api.Command; import org.apache.twill.api.ResourceReport; @@ -41,7 +41,7 @@ */ final class NoopTwillController extends AbstractExecutionServiceController implements TwillController { - protected NoopTwillController() { + NoopTwillController() { super(RunIds.generate()); } @@ -70,14 +70,16 @@ public boolean contains(Discoverable discoverable) { @Override public Iterator iterator() { - return Collections.emptyList().iterator(); + return Collections.emptyIterator(); } }; } @Override public Future changeInstances(String runnable, int newCount) { - return CompletableFuture.completedFuture(newCount); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Change instances is not supported")); + return future; } @Nullable @@ -88,43 +90,59 @@ public ResourceReport getResourceReport() { @Override public Future restartAllInstances(String runnable) { - return CompletableFuture.completedFuture(runnable); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Restart all instances is not supported")); + return future; } @Override public Future> restartInstances(Map> runnableToInstanceIds) { - return CompletableFuture.completedFuture(runnableToInstanceIds.keySet()); + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Restart instances is not supported")); + return future; } @Override public Future restartInstances(String runnable, int instanceId, int... moreInstanceIds) { - return CompletableFuture.completedFuture(runnable); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Restart instances is not supported")); + return future; } @Override public Future restartInstances(String runnable, Set instanceIds) { - return CompletableFuture.completedFuture(runnable); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Restart instances is not supported")); + return future; } @Override public Future> updateLogLevels(Map logLevels) { - return CompletableFuture.completedFuture(logLevels); + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Update log levels is not supported")); + return future; } @Override public Future> updateLogLevels(String runnableName, Map logLevelsForRunnable) { - return CompletableFuture.completedFuture(logLevelsForRunnable); + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Update log levels is not supported")); + return future; } @Override public Future resetLogLevels(String... loggerNames) { - return CompletableFuture.completedFuture(loggerNames); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Reset log levels is not supported")); + return future; } @Override public Future resetRunnableLogLevels(String runnableName, String... loggerNames) { - return CompletableFuture.completedFuture(loggerNames); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Reset runnable log levels is not supported")); + return future; } @Override diff --git a/cdap-master/src/test/java/io/cdap/cdap/master/environment/NoopTwillPreparer.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillPreparer.java similarity index 99% rename from cdap-master/src/test/java/io/cdap/cdap/master/environment/NoopTwillPreparer.java rename to cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillPreparer.java index ee648257dee3..d60c1fa3765a 100644 --- a/cdap-master/src/test/java/io/cdap/cdap/master/environment/NoopTwillPreparer.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillPreparer.java @@ -14,7 +14,7 @@ * the License. */ -package io.cdap.cdap.master.environment; +package io.cdap.cdap.common.twill; import org.apache.twill.api.ClassAcceptor; import org.apache.twill.api.SecureStore; diff --git a/cdap-master/src/test/java/io/cdap/cdap/master/environment/NoopTwillRunnerService.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillRunnerService.java similarity index 89% rename from cdap-master/src/test/java/io/cdap/cdap/master/environment/NoopTwillRunnerService.java rename to cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillRunnerService.java index 922c81d3c350..62d1a81bea6c 100644 --- a/cdap-master/src/test/java/io/cdap/cdap/master/environment/NoopTwillRunnerService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillRunnerService.java @@ -14,7 +14,7 @@ * the License. */ -package io.cdap.cdap.master.environment; +package io.cdap.cdap.common.twill; import org.apache.twill.api.ResourceSpecification; import org.apache.twill.api.RunId; @@ -32,10 +32,9 @@ import javax.annotation.Nullable; /** - * A no-op implementation of {@link TwillRunnerService}. It is used for satisfying dependency injection. - * In k8s, as of now, we don't use twill for launching containers. + * A no-op implementation of {@link TwillRunnerService}. */ -final class NoopTwillRunnerService implements TwillRunnerService { +public final class NoopTwillRunnerService implements TwillRunnerService { @Override public void start() { diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/datafabric/dataset/DatasetExecutorServiceManager.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/datafabric/dataset/DatasetExecutorServiceManager.java index aa972ed27ee3..92bc32e35dc1 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/datafabric/dataset/DatasetExecutorServiceManager.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/datafabric/dataset/DatasetExecutorServiceManager.java @@ -19,25 +19,23 @@ import com.google.inject.Inject; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractDistributedMasterServiceManager; -import org.apache.twill.api.TwillRunnerService; +import io.cdap.cdap.common.twill.AbstractMasterServiceManager; +import org.apache.twill.api.TwillRunner; import org.apache.twill.discovery.DiscoveryServiceClient; /** * CDAP Dataset Service management in distributed mode. */ -public class DatasetExecutorServiceManager extends AbstractDistributedMasterServiceManager { +public class DatasetExecutorServiceManager extends AbstractMasterServiceManager { @Inject - public DatasetExecutorServiceManager(CConfiguration cConf, TwillRunnerService twillRunnerService, - DiscoveryServiceClient discoveryServiceClient) { - super(cConf, Constants.Service.DATASET_EXECUTOR, twillRunnerService, discoveryServiceClient); - this.discoveryServiceClient = discoveryServiceClient; + DatasetExecutorServiceManager(CConfiguration cConf, TwillRunner twillRunner, DiscoveryServiceClient discoveryClient) { + super(cConf, discoveryClient, Constants.Service.DATASET_EXECUTOR, twillRunner); } @Override public int getMaxInstances() { - return cConf.getInt(Constants.Dataset.Executor.MAX_INSTANCES); + return getCConf().getInt(Constants.Dataset.Executor.MAX_INSTANCES); } @Override diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/datafabric/dataset/MetadataServiceManager.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/datafabric/dataset/MetadataServiceManager.java index b317658d56ab..a68bae51c0ff 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/datafabric/dataset/MetadataServiceManager.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/datafabric/dataset/MetadataServiceManager.java @@ -19,7 +19,8 @@ import com.google.inject.Inject; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import org.apache.twill.api.TwillRunnerService; +import io.cdap.cdap.common.twill.AbstractMasterServiceManager; +import org.apache.twill.api.TwillRunner; import org.apache.twill.discovery.DiscoveryServiceClient; /** @@ -27,12 +28,16 @@ * container as the dataset executor service, this is same as DatasetExecutorServiceManager, except * we use a different discoverable name and description. */ -public class MetadataServiceManager extends DatasetExecutorServiceManager { +public class MetadataServiceManager extends AbstractMasterServiceManager { @Inject - public MetadataServiceManager(CConfiguration cConf, TwillRunnerService twillRunnerService, - DiscoveryServiceClient discoveryServiceClient) { - super(cConf, twillRunnerService, discoveryServiceClient); + MetadataServiceManager(CConfiguration cConf, TwillRunner twillRunner, DiscoveryServiceClient discoveryClient) { + super(cConf, discoveryClient, Constants.Service.METADATA_SERVICE, twillRunner); + } + + @Override + public int getMaxInstances() { + return getCConf().getInt(Constants.Dataset.Executor.MAX_INSTANCES); } @Override @@ -41,7 +46,8 @@ public String getDescription() { } @Override - protected String getDiscoverableName() { - return Constants.Service.METADATA_SERVICE; + protected String getTwillRunnableName() { + // Metadata service runs inside the dataset executor YARN container, hence the dataset executor runnable name. + return Constants.Service.DATASET_EXECUTOR; } } diff --git a/cdap-explore-client/src/main/java/io/cdap/cdap/explore/service/ExploreServiceManager.java b/cdap-explore-client/src/main/java/io/cdap/cdap/explore/service/ExploreServiceManager.java index 9d6b5e0f117e..1c0f1d0b54dc 100644 --- a/cdap-explore-client/src/main/java/io/cdap/cdap/explore/service/ExploreServiceManager.java +++ b/cdap-explore-client/src/main/java/io/cdap/cdap/explore/service/ExploreServiceManager.java @@ -19,30 +19,23 @@ import com.google.inject.Inject; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractDistributedMasterServiceManager; -import org.apache.twill.api.TwillRunnerService; +import io.cdap.cdap.common.twill.AbstractMasterServiceManager; +import org.apache.twill.api.TwillRunner; import org.apache.twill.discovery.DiscoveryServiceClient; /** * Service manager for explore service in distributed mode. */ -public class ExploreServiceManager extends AbstractDistributedMasterServiceManager { +public class ExploreServiceManager extends AbstractMasterServiceManager { @Inject - public ExploreServiceManager(CConfiguration cConf, TwillRunnerService twillRunnerService, - DiscoveryServiceClient discoveryServiceClient) { - super(cConf, Constants.Service.EXPLORE_HTTP_USER_SERVICE, twillRunnerService, discoveryServiceClient); - this.discoveryServiceClient = discoveryServiceClient; + ExploreServiceManager(CConfiguration cConf, TwillRunner twillRunner, DiscoveryServiceClient discoveryClient) { + super(cConf, discoveryClient, Constants.Service.EXPLORE_HTTP_USER_SERVICE, twillRunner); } @Override public boolean isServiceEnabled() { - return cConf.getBoolean(Constants.Explore.EXPLORE_ENABLED); - } - - @Override - public int getMaxInstances() { - return 1; // max explore service container instances is 1 (non-configurable) + return getCConf().getBoolean(Constants.Explore.EXPLORE_ENABLED); } @Override diff --git a/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java b/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java index d90e27fdb947..975e2b227ca7 100644 --- a/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java +++ b/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java @@ -67,6 +67,7 @@ public class KubeMasterEnvironment implements MasterEnvironment { // needed to propagate to deployments created via the KubeTwillRunnerService private static final Set CONFIG_NAMES = ImmutableSet.of("cdap-conf", "hadoop-conf", "cdap-security"); + private static final String MASTER_MAX_INSTANCES = "master.service.max.instances"; private static final String NAMESPACE_KEY = "master.environment.k8s.namespace"; private static final String INSTANCE_LABEL = "master.environment.k8s.instance.label"; // Label for the container name @@ -97,6 +98,9 @@ public void initialize(MasterEnvironmentContext context) throws IOException, Api LOG.info("Initializing Kubernetes environment"); Map conf = context.getConfigurations(); + // We don't support scaling from inside pod. Scaling should be done via CDAP operator. + // Currently we don't support more than one instance per system service, hence set it to "1". + conf.put(MASTER_MAX_INSTANCES, "1"); // Load the pod labels from the configured path. It should be setup by the CDAP operator PodInfo podInfo = getPodInfo(conf); diff --git a/cdap-master/src/test/java/io/cdap/cdap/master/environment/MockMasterEnvironment.java b/cdap-master/src/test/java/io/cdap/cdap/master/environment/MockMasterEnvironment.java index 178d3a3a728e..9095cee91ba6 100644 --- a/cdap-master/src/test/java/io/cdap/cdap/master/environment/MockMasterEnvironment.java +++ b/cdap-master/src/test/java/io/cdap/cdap/master/environment/MockMasterEnvironment.java @@ -17,6 +17,7 @@ package io.cdap.cdap.master.environment; import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.twill.NoopTwillRunnerService; import io.cdap.cdap.master.spi.environment.MasterEnvironment; import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext; import org.apache.twill.api.TwillRunnerService; diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/SystemServiceLiveInfo.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/SystemServiceLiveInfo.java index bcbfd9e2760d..31d3fccc36a1 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/SystemServiceLiveInfo.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/SystemServiceLiveInfo.java @@ -28,7 +28,7 @@ public class SystemServiceLiveInfo { private final List containers; - public SystemServiceLiveInfo(List containers) { + private SystemServiceLiveInfo(List containers) { this.containers = Collections.unmodifiableList(new ArrayList<>(containers)); } diff --git a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java index 4966e275ebbe..004a34efcfb3 100644 --- a/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java +++ b/cdap-standalone/src/main/java/io/cdap/cdap/StandaloneMain.java @@ -21,6 +21,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Service; +import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; @@ -46,6 +47,7 @@ import io.cdap.cdap.common.io.URLConnections; import io.cdap.cdap.common.logging.common.UncaughtExceptionHandler; import io.cdap.cdap.common.startup.ConfigurationLogger; +import io.cdap.cdap.common.twill.NoopTwillRunnerService; import io.cdap.cdap.common.utils.DirUtils; import io.cdap.cdap.common.utils.OSDetector; import io.cdap.cdap.data.runtime.DataFabricModules; @@ -95,6 +97,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.tephra.inmemory.InMemoryTransactionService; +import org.apache.twill.api.TwillRunner; import org.apache.twill.api.TwillRunnerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -469,6 +472,7 @@ static StandaloneMain create(CConfiguration cConf, Configuration hConf) { } private static List createPersistentModules(CConfiguration cConf, Configuration hConf) { + cConf.setInt(Constants.Master.MAX_INSTANCES, 1); cConf.setIfUnset(Constants.CFG_DATA_LEVELDB_DIR, Constants.DEFAULT_DATA_LEVELDB_DIR); cConf.set(Constants.CFG_DATA_INMEMORY_PERSISTENCE, Constants.InMemoryPersistenceType.LEVELDB.name()); @@ -517,7 +521,14 @@ private static List createPersistentModules(CConfiguration cConf, Config new MessagingServerRuntimeModule().getStandaloneModules(), new AppFabricServiceRuntimeModule().getStandaloneModules(), new MonitorHandlerModule(false), - new OperationalStatsModule() + new OperationalStatsModule(), + new AbstractModule() { + @Override + protected void configure() { + // Needed by MonitorHandlerModuler + bind(TwillRunner.class).to(NoopTwillRunnerService.class); + } + } ); } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/MessagingServiceManager.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/MessagingServiceManager.java index 4484edeaa879..748d0e678440 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/MessagingServiceManager.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/MessagingServiceManager.java @@ -19,29 +19,28 @@ import com.google.inject.Inject; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractDistributedMasterServiceManager; +import io.cdap.cdap.common.twill.AbstractMasterServiceManager; import io.cdap.cdap.common.twill.MasterServiceManager; -import org.apache.twill.api.TwillRunnerService; +import org.apache.twill.api.TwillRunner; import org.apache.twill.discovery.DiscoveryServiceClient; /** * A {@link MasterServiceManager} used in distributed mode. */ -public class MessagingServiceManager extends AbstractDistributedMasterServiceManager { +public class MessagingServiceManager extends AbstractMasterServiceManager { @Inject - MessagingServiceManager(CConfiguration cConf, TwillRunnerService twillRunnerService, - DiscoveryServiceClient discoveryServiceClient) { - super(cConf, Constants.Service.MESSAGING_SERVICE, twillRunnerService, discoveryServiceClient); + MessagingServiceManager(CConfiguration cConf, TwillRunner twillRunner, DiscoveryServiceClient discoveryClient) { + super(cConf, discoveryClient, Constants.Service.MESSAGING_SERVICE, twillRunner); } @Override public String getDescription() { - return Constants.Service.MESSAGING_SERVICE; + return Constants.MessagingSystem.SERVICE_DESCRIPTION; } @Override public int getMaxInstances() { - return cConf.getInt(Constants.MessagingSystem.MAX_INSTANCES); + return getCConf().getInt(Constants.MessagingSystem.MAX_INSTANCES); } } diff --git a/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java b/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java index 00fc6f5ccc9f..8626db912124 100644 --- a/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java +++ b/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java @@ -64,6 +64,7 @@ import io.cdap.cdap.common.http.DefaultHttpRequestConfig; import io.cdap.cdap.common.namespace.NamespaceAdmin; import io.cdap.cdap.common.test.TestRunner; +import io.cdap.cdap.common.twill.NoopTwillRunnerService; import io.cdap.cdap.common.utils.OSDetector; import io.cdap.cdap.data.runtime.DataFabricModules; import io.cdap.cdap.data.runtime.DataSetServiceModules; @@ -132,6 +133,7 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.twill.api.TwillRunner; import org.apache.twill.discovery.DiscoveryServiceClient; import org.junit.After; import org.junit.AfterClass; @@ -275,6 +277,9 @@ protected void configure() { .build(ArtifactManagerFactory.class)); bind(TemporaryFolder.class).toInstance(TMP_FOLDER); bind(AuthorizationHandler.class).in(Scopes.SINGLETON); + + // Needed by MonitorHandlerModuler + bind(TwillRunner.class).to(NoopTwillRunnerService.class); } } ); diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryDatasetExecutorServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryDatasetExecutorServiceManager.java deleted file mode 100644 index 93a3d1528a7f..000000000000 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryDatasetExecutorServiceManager.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright © 2014 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.logging.run; - -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractInMemoryMasterServiceManager; - -/** - * In memory dataset executor service manager. - */ -public class InMemoryDatasetExecutorServiceManager extends AbstractInMemoryMasterServiceManager { - - @Override - public String getDescription() { - return Constants.Dataset.Executor.SERVICE_DESCRIPTION; - } -} diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryExploreServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryExploreServiceManager.java deleted file mode 100644 index 0fcfe615a0ae..000000000000 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryExploreServiceManager.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright © 2014 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.logging.run; - -import com.google.inject.Inject; -import io.cdap.cdap.common.conf.CConfiguration; -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractInMemoryMasterServiceManager; - -/** - * In memory explore service manager. - */ -public class InMemoryExploreServiceManager extends AbstractInMemoryMasterServiceManager { - - private final CConfiguration cConf; - - @Inject - public InMemoryExploreServiceManager(CConfiguration cConf) { - this.cConf = cConf; - } - - @Override - public boolean isServiceEnabled() { - return cConf.getBoolean(Constants.Explore.EXPLORE_ENABLED); - } - - @Override - public String getDescription() { - return Constants.Explore.SERVICE_DESCRIPTION; - } -} diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryLogSaverServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryLogSaverServiceManager.java deleted file mode 100644 index f55119515870..000000000000 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryLogSaverServiceManager.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright © 2014 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.logging.run; - -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractInMemoryMasterServiceManager; - -/** - * InMemory LogSaver Service Management class. - */ -public class InMemoryLogSaverServiceManager extends AbstractInMemoryMasterServiceManager { - - @Override - public boolean isLogAvailable() { - return false; - } - - @Override - public String getDescription() { - return Constants.LogSaver.SERVICE_DESCRIPTION; - } -} diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMessagingServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMessagingServiceManager.java deleted file mode 100644 index 62749170807b..000000000000 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMessagingServiceManager.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright © 2016 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.logging.run; - -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractInMemoryMasterServiceManager; - -/** - * In memory manager for messaging service. - */ -public class InMemoryMessagingServiceManager extends AbstractInMemoryMasterServiceManager { - - @Override - public String getDescription() { - return Constants.Service.MESSAGING_SERVICE; - } -} diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetadataServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetadataServiceManager.java deleted file mode 100644 index 871453301eb8..000000000000 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetadataServiceManager.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright © 2014 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.logging.run; - -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractInMemoryMasterServiceManager; - -/** - * In memory dataset executor service manager. - */ -public class InMemoryMetadataServiceManager extends AbstractInMemoryMasterServiceManager { - - @Override - public String getDescription() { - return Constants.Metadata.SERVICE_DESCRIPTION; - } -} diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetricsProcessorServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetricsProcessorServiceManager.java deleted file mode 100644 index 6a8f5aec6bd8..000000000000 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetricsProcessorServiceManager.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright © 2014 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.logging.run; - -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractInMemoryMasterServiceManager; - -/** - * InMemory MetricsProcessor Service Management class. - */ -public class InMemoryMetricsProcessorServiceManager extends AbstractInMemoryMasterServiceManager { - - @Override - public String getDescription() { - return Constants.MetricsProcessor.SERVICE_DESCRIPTION; - } - - @Override - public boolean isLogAvailable() { - return false; - } -} diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetricsServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetricsServiceManager.java deleted file mode 100644 index 01ead9b73b37..000000000000 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/InMemoryMetricsServiceManager.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright © 2014 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.logging.run; - -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractInMemoryMasterServiceManager; - -/** - * In memory metrics service manager. - */ -public class InMemoryMetricsServiceManager extends AbstractInMemoryMasterServiceManager { - - @Override - public String getDescription() { - return Constants.Metrics.SERVICE_DESCRIPTION; - } -} diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/LogSaverStatusServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/LogSaverServiceManager.java similarity index 67% rename from cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/LogSaverStatusServiceManager.java rename to cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/LogSaverServiceManager.java index f936a5f4f0c9..da28f932cd74 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/LogSaverStatusServiceManager.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/run/LogSaverServiceManager.java @@ -19,24 +19,23 @@ import com.google.inject.Inject; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractDistributedMasterServiceManager; -import org.apache.twill.api.TwillRunnerService; +import io.cdap.cdap.common.twill.AbstractMasterServiceManager; +import org.apache.twill.api.TwillRunner; import org.apache.twill.discovery.DiscoveryServiceClient; /** * CDAP Log Saver Service Management in Distributed Mode. */ -public class LogSaverStatusServiceManager extends AbstractDistributedMasterServiceManager { +public class LogSaverServiceManager extends AbstractMasterServiceManager { @Inject - public LogSaverStatusServiceManager(CConfiguration cConf, TwillRunnerService twillRunnerService, - DiscoveryServiceClient discoveryServiceClient) { - super(cConf, Constants.Service.LOGSAVER, twillRunnerService, discoveryServiceClient); + LogSaverServiceManager(CConfiguration cConf, TwillRunner twillRunner, DiscoveryServiceClient discoveryClient) { + super(cConf, discoveryClient, Constants.Service.LOGSAVER, twillRunner); } @Override public int getMaxInstances() { - return cConf.getInt(Constants.LogSaver.MAX_INSTANCES); + return getCConf().getInt(Constants.LogSaver.MAX_INSTANCES); } @Override diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/runtime/MetricsProcessorStatusServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/runtime/MetricsProcessorStatusServiceManager.java index d9f2265e0935..def5c88b0021 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/runtime/MetricsProcessorStatusServiceManager.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/runtime/MetricsProcessorStatusServiceManager.java @@ -19,24 +19,24 @@ import com.google.inject.Inject; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractDistributedMasterServiceManager; -import org.apache.twill.api.TwillRunnerService; +import io.cdap.cdap.common.twill.AbstractMasterServiceManager; +import org.apache.twill.api.TwillRunner; import org.apache.twill.discovery.DiscoveryServiceClient; /** * CDAP Metrics Processor Service Management in Distributed Mode. */ -public class MetricsProcessorStatusServiceManager extends AbstractDistributedMasterServiceManager { +public class MetricsProcessorStatusServiceManager extends AbstractMasterServiceManager { @Inject - public MetricsProcessorStatusServiceManager(CConfiguration cConf, TwillRunnerService twillRunnerService, - DiscoveryServiceClient discoveryServiceClient) { - super(cConf, Constants.Service.METRICS_PROCESSOR, twillRunnerService, discoveryServiceClient); + MetricsProcessorStatusServiceManager(CConfiguration cConf, TwillRunner twillRunner, + DiscoveryServiceClient discoveryClient) { + super(cConf, discoveryClient, Constants.Service.METRICS_PROCESSOR, twillRunner); } @Override public int getMaxInstances() { - return cConf.getInt(Constants.MetricsProcessor.MAX_INSTANCES); + return getCConf().getInt(Constants.MetricsProcessor.MAX_INSTANCES); } @Override diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/runtime/MetricsServiceManager.java b/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/runtime/MetricsServiceManager.java index 5c59bf5c13b7..bb5bf2a56a01 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/runtime/MetricsServiceManager.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/runtime/MetricsServiceManager.java @@ -19,24 +19,23 @@ import com.google.inject.Inject; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.twill.AbstractDistributedMasterServiceManager; -import org.apache.twill.api.TwillRunnerService; +import io.cdap.cdap.common.twill.AbstractMasterServiceManager; +import org.apache.twill.api.TwillRunner; import org.apache.twill.discovery.DiscoveryServiceClient; /** * CDAP Metrics Service Management in Distributed Mode. */ -public class MetricsServiceManager extends AbstractDistributedMasterServiceManager { +public class MetricsServiceManager extends AbstractMasterServiceManager { @Inject - public MetricsServiceManager(CConfiguration cConf, TwillRunnerService twillRunnerService, - DiscoveryServiceClient discoveryServiceClient) { - super(cConf, Constants.Service.METRICS, twillRunnerService, discoveryServiceClient); + MetricsServiceManager(CConfiguration cConf, TwillRunner twillRunner, DiscoveryServiceClient discoveryClient) { + super(cConf, discoveryClient, Constants.Service.METRICS, twillRunner); } @Override public int getMaxInstances() { - return cConf.getInt(Constants.Metrics.MAX_INSTANCES); + return getCConf().getInt(Constants.Metrics.MAX_INSTANCES); } @Override