Skip to content

Commit

Permalink
(CDAP-14987) Reporting service status correctly in k8s
Browse files Browse the repository at this point in the history
- Refactor to unify MasterServiceManager code
- Use service discovery to report service health
- Handle enable and disable of services correctly
  • Loading branch information
chtyim committed Sep 7, 2019
1 parent 120ab1e commit 1a722cc
Show file tree
Hide file tree
Showing 34 changed files with 480 additions and 751 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2014 Cask Data, Inc.
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand All @@ -14,83 +14,92 @@
* the License.
*/

package io.cdap.cdap.common.twill;
package io.cdap.cdap.app.guice;

import com.google.common.collect.ImmutableList;
import io.cdap.cdap.proto.Containers;
import io.cdap.cdap.common.twill.MasterServiceManager;
import io.cdap.cdap.proto.SystemServiceLiveInfo;
import org.apache.twill.api.logging.LogEntry;

import java.util.Map;
import java.util.Set;

/**
* InMemory CDAP Service Management class.
* A {@link MasterServiceManager} that delegates all methods to another {@link MasterServiceManager}.
*/
public abstract class AbstractInMemoryMasterServiceManager implements MasterServiceManager {
class DelegatingMasterServiceManager implements MasterServiceManager {

private final MasterServiceManager delegate;

DelegatingMasterServiceManager(MasterServiceManager delegate) {
this.delegate = delegate;
}

MasterServiceManager getDelegate() {
return delegate;
}

@Override
public SystemServiceLiveInfo getLiveInfo() {
return new SystemServiceLiveInfo(ImmutableList.<Containers.ContainerInfo>of());
public boolean isServiceEnabled() {
return getDelegate().isServiceEnabled();
}

@Override
public String getDescription() {
return getDelegate().getDescription();
}

@Override
public int getInstances() {
return 1;
return getDelegate().getInstances();
}

@Override
public boolean setInstances(int instanceCount) {
return false;
return getDelegate().setInstances(instanceCount);
}

@Override
public int getMinInstances() {
return 1;
return getDelegate().getMinInstances();
}

@Override
public int getMaxInstances() {
return 1;
return getDelegate().getMaxInstances();
}

@Override
public boolean isLogAvailable() {
return true;
}

@Override
public boolean canCheckStatus() {
return true;
return getDelegate().isLogAvailable();
}

@Override
public boolean isServiceAvailable() {
return true;
return getDelegate().isServiceAvailable();
}

@Override
public boolean isServiceEnabled() {
return true;
public SystemServiceLiveInfo getLiveInfo() {
return getDelegate().getLiveInfo();
}

@Override
public void restartAllInstances() {
// No operation for in memory manager.
getDelegate().restartAllInstances();
}

@Override
public void restartInstances(int instanceId, int... moreInstanceIds) {
// No operation for in memory manager.
getDelegate().restartInstances(instanceId, moreInstanceIds);
}

@Override
public void updateServiceLogLevels(Map<String, LogEntry.Level> logLevels) {
// No operation for in memory manager.
getDelegate().updateServiceLogLevels(logLevels);
}

@Override
public void resetServiceLogLevels(Set<String> loggerNames) {
// No operation for in memory manager.
getDelegate().resetServiceLogLevels(loggerNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,11 @@
import io.cdap.cdap.explore.service.ExploreServiceManager;
import io.cdap.cdap.gateway.handlers.DatasetServiceStore;
import io.cdap.cdap.gateway.handlers.MonitorHandler;
import io.cdap.cdap.internal.app.runtime.batch.InMemoryTransactionServiceManager;
import io.cdap.cdap.internal.app.runtime.distributed.AppFabricServiceManager;
import io.cdap.cdap.internal.app.runtime.distributed.TransactionServiceManager;
import io.cdap.cdap.internal.app.runtime.monitor.NonHadoopAppFabricServiceManager;
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.logging.run.InMemoryAppFabricServiceManager;
import io.cdap.cdap.logging.run.InMemoryDatasetExecutorServiceManager;
import io.cdap.cdap.logging.run.InMemoryExploreServiceManager;
import io.cdap.cdap.logging.run.InMemoryLogSaverServiceManager;
import io.cdap.cdap.logging.run.InMemoryMessagingServiceManager;
import io.cdap.cdap.logging.run.InMemoryMetadataServiceManager;
import io.cdap.cdap.logging.run.InMemoryMetricsProcessorServiceManager;
import io.cdap.cdap.logging.run.InMemoryMetricsServiceManager;
import io.cdap.cdap.logging.run.LogSaverStatusServiceManager;
import io.cdap.cdap.logging.run.LogSaverServiceManager;
import io.cdap.cdap.messaging.distributed.MessagingServiceManager;
import io.cdap.cdap.metrics.runtime.MetricsProcessorStatusServiceManager;
import io.cdap.cdap.metrics.runtime.MetricsServiceManager;
Expand All @@ -66,7 +58,7 @@
*/
public class MonitorHandlerModule extends AbstractModule {

public static final String SERVICE_STORE_DS_MODULES = "service.store.ds.modules";
private static final String SERVICE_STORE_DS_MODULES = "service.store.ds.modules";
private final boolean isHadoop;

public MonitorHandlerModule(boolean isHadoop) {
Expand Down Expand Up @@ -112,15 +104,24 @@ protected void configure() {
private void addNonHadoopBindings(Binder binder) {
MapBinder<String, MasterServiceManager> mapBinder = MapBinder.newMapBinder(binder, String.class,
MasterServiceManager.class);
mapBinder.addBinding(Constants.Service.LOGSAVER).to(InMemoryLogSaverServiceManager.class);
mapBinder.addBinding(Constants.Service.TRANSACTION).to(InMemoryTransactionServiceManager.class);
mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR).to(InMemoryMetricsProcessorServiceManager.class);
mapBinder.addBinding(Constants.Service.METRICS).to(InMemoryMetricsServiceManager.class);
mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP).to(InMemoryAppFabricServiceManager.class);
mapBinder.addBinding(Constants.Service.DATASET_EXECUTOR).to(InMemoryDatasetExecutorServiceManager.class);
mapBinder.addBinding(Constants.Service.METADATA_SERVICE).to(InMemoryMetadataServiceManager.class);
mapBinder.addBinding(Constants.Service.EXPLORE_HTTP_USER_SERVICE).to(InMemoryExploreServiceManager.class);
mapBinder.addBinding(Constants.Service.MESSAGING_SERVICE).to(InMemoryMessagingServiceManager.class);
mapBinder.addBinding(Constants.Service.LOGSAVER)
.toProvider(new NonHadoopMasterServiceManagerProvider(LogSaverServiceManager.class));
mapBinder.addBinding(Constants.Service.TRANSACTION)
.toProvider(new NonHadoopMasterServiceManagerProvider(TransactionServiceManager.class));
mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR)
.toProvider(new NonHadoopMasterServiceManagerProvider(MetricsProcessorStatusServiceManager.class));
mapBinder.addBinding(Constants.Service.METRICS)
.toProvider(new NonHadoopMasterServiceManagerProvider(MetricsServiceManager.class));
mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP)
.toProvider(new NonHadoopMasterServiceManagerProvider(NonHadoopAppFabricServiceManager.class));
mapBinder.addBinding(Constants.Service.DATASET_EXECUTOR)
.toProvider(new NonHadoopMasterServiceManagerProvider(DatasetExecutorServiceManager.class));
mapBinder.addBinding(Constants.Service.METADATA_SERVICE)
.toProvider(new NonHadoopMasterServiceManagerProvider(MetadataServiceManager.class));
mapBinder.addBinding(Constants.Service.EXPLORE_HTTP_USER_SERVICE)
.toProvider(new NonHadoopMasterServiceManagerProvider(ExploreServiceManager.class));
mapBinder.addBinding(Constants.Service.MESSAGING_SERVICE)
.toProvider(new NonHadoopMasterServiceManagerProvider(MessagingServiceManager.class));

// The ServiceStore uses a special non-TX KV Table.
bindDatasetModule(binder, new InMemoryKVTableDefinition.Module());
Expand All @@ -134,7 +135,7 @@ private void addNonHadoopBindings(Binder binder) {
private void addHadoopBindings(Binder binder) {
MapBinder<String, MasterServiceManager> mapBinder = MapBinder.newMapBinder(binder, String.class,
MasterServiceManager.class);
mapBinder.addBinding(Constants.Service.LOGSAVER).to(LogSaverStatusServiceManager.class);
mapBinder.addBinding(Constants.Service.LOGSAVER).to(LogSaverServiceManager.class);
mapBinder.addBinding(Constants.Service.TRANSACTION).to(TransactionServiceManager.class);
mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR).to(MetricsProcessorStatusServiceManager.class);
mapBinder.addBinding(Constants.Service.METRICS).to(MetricsServiceManager.class);
Expand All @@ -153,7 +154,7 @@ private void addHadoopBindings(Binder binder) {
*/
private void bindDatasetModule(Binder binder, DatasetModule module) {
MapBinder<String, DatasetModule> mapBinder = MapBinder.newMapBinder(
binder(), String.class, DatasetModule.class, Names.named(SERVICE_STORE_DS_MODULES));
binder, String.class, DatasetModule.class, Names.named(SERVICE_STORE_DS_MODULES));

mapBinder.addBinding(module.getClass().getName()).toInstance(module);
}
Expand All @@ -178,4 +179,29 @@ public DatasetFramework get() {
return new InMemoryDatasetFramework(new DefaultDatasetDefinitionRegistryFactory(injector), datasetModules);
}
}

/**
* Provides for {@link MasterServiceManager} used in non-hadoop environment.
*/
private static final class NonHadoopMasterServiceManagerProvider implements Provider<MasterServiceManager> {

private final Class<? extends MasterServiceManager> serviceManagerClass;
@Inject
private Injector injector;

NonHadoopMasterServiceManagerProvider(Class<? extends MasterServiceManager> serviceManagerClass) {
this.serviceManagerClass = serviceManagerClass;
}

@Override
public MasterServiceManager get() {
return new DelegatingMasterServiceManager(injector.getInstance(serviceManagerClass)) {
@Override
public int getInstances() {
// For now it is always just one instance for each master service in non-hadoop environment.
return 1;
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,12 @@ public class MonitorHandler extends AbstractAppFabricHttpHandler {

private static final String STATUSOK = Constants.Monitor.STATUS_OK;
private static final String STATUSNOTOK = Constants.Monitor.STATUS_NOTOK;
private static final String NOTAPPLICABLE = "NA";

private final Map<String, MasterServiceManager> serviceManagementMap;
private final ServiceStore serviceStore;

@Inject
public MonitorHandler(Map<String, MasterServiceManager> serviceMap,
ServiceStore serviceStore) throws Exception {
public MonitorHandler(Map<String, MasterServiceManager> serviceMap, ServiceStore serviceStore) {
this.serviceManagementMap = serviceMap;
this.serviceStore = serviceStore;
}
Expand Down Expand Up @@ -160,7 +158,7 @@ public void getBootStatus(HttpRequest request, HttpResponder responder) {
Map<String, String> result = new HashMap<>();
for (String service : serviceManagementMap.keySet()) {
MasterServiceManager masterServiceManager = serviceManagementMap.get(service);
if (masterServiceManager.isServiceEnabled() && masterServiceManager.canCheckStatus()) {
if (masterServiceManager.isServiceEnabled()) {
String status = masterServiceManager.isServiceAvailable() ? STATUSOK : STATUSNOTOK;
result.put(service, status);
}
Expand All @@ -170,16 +168,15 @@ public void getBootStatus(HttpRequest request, HttpResponder responder) {

@Path("/system/services")
@GET
public void getServiceSpec(HttpRequest request, HttpResponder responder) throws Exception {
public void getServiceSpec(HttpRequest request, HttpResponder responder) {
List<SystemServiceMeta> response = new ArrayList<>();
SortedSet<String> services = new TreeSet<>(serviceManagementMap.keySet());
List<String> serviceList = new ArrayList<>(services);
for (String service : serviceList) {
MasterServiceManager serviceManager = serviceManagementMap.get(service);
if (serviceManager.isServiceEnabled()) {
String logs = serviceManager.isLogAvailable() ? Constants.Monitor.STATUS_OK : Constants.Monitor.STATUS_NOTOK;
String canCheck = serviceManager.canCheckStatus() ? (
serviceManager.isServiceAvailable() ? STATUSOK : STATUSNOTOK) : NOTAPPLICABLE;
String canCheck = serviceManager.isServiceAvailable() ? STATUSOK : STATUSNOTOK;
//TODO: Add metric name for Event Rate monitoring
response.add(new SystemServiceMeta(service, serviceManager.getDescription(), canCheck, logs,
serviceManager.getMinInstances(), serviceManager.getMaxInstances(),
Expand Down Expand Up @@ -277,7 +274,7 @@ public void resetServiceLogLevels(FullHttpRequest request, HttpResponder respond

try {
Set<String> loggerNames = parseBody(request, SET_STRING_TYPE);
masterServiceManager.resetServiceLogLevels(loggerNames == null ? Collections.<String>emptySet() : loggerNames);
masterServiceManager.resetServiceLogLevels(loggerNames == null ? Collections.emptySet() : loggerNames);
responder.sendStatus(HttpResponseStatus.OK);
} catch (IllegalStateException ise) {
throw new ServiceUnavailableException(String.format("Failed to reset log levels for service %s " +
Expand All @@ -287,7 +284,7 @@ public void resetServiceLogLevels(FullHttpRequest request, HttpResponder respond
}
}

private int getSystemServiceInstanceCount(String serviceName) throws Exception {
private int getSystemServiceInstanceCount(String serviceName) {
Integer count = serviceStore.getServiceInstance(serviceName);

//In standalone mode, this count will be null. And thus we just return the actual instance count.
Expand Down

This file was deleted.

Loading

0 comments on commit 1a722cc

Please sign in to comment.