From 26109d33127305f2026475d1e6c770b09d27ebdd Mon Sep 17 00:00:00 2001 From: Satish Reddy M Date: Wed, 20 Nov 2019 16:21:59 +0530 Subject: [PATCH] Code changes to add k8s auto discovery --- pom.xml | 4 +- .../extensions/kafka/KafkaMonitor.java | 47 +++++++++++++++++-- .../extensions/kafka/KafkaMonitorTask.java | 36 +++++++------- src/main/resources/conf/config.yml | 37 ++++++++++----- .../metrics/DomainMetricsProcessorTest.java | 15 +++--- 5 files changed, 94 insertions(+), 45 deletions(-) diff --git a/pom.xml b/pom.xml index e5a08dc..5f7c286 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.appdynamics.extensions kafka-monitoring-extension - 2.0.3 + 2.0.4-SNAPSHOT jar kafka-monitoring-extension http://maven.apache.org @@ -18,7 +18,7 @@ com.appdynamics appd-exts-commons - 2.1.0 + 2.2.2-SNAPSHOT commons-io diff --git a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java index 407603d..f9b6079 100644 --- a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java +++ b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java @@ -13,9 +13,15 @@ import com.appdynamics.extensions.kafka.utils.Constants; import com.appdynamics.extensions.kafka.utils.SslUtils; import com.appdynamics.extensions.util.AssertUtils; +import com.singularity.ee.agent.systemagent.api.exception.TaskExecutionException; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.PatternLayout; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.OutputStreamWriter; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,20 +50,51 @@ protected void doRun (TasksExecutionServiceProvider tasksExecutionServiceProvide List> kafkaServers = (List>) this.getContextConfiguration().getConfigYml().get(Constants.SERVERS); logger.info("The size of servers section is: "+kafkaServers); + + Map kubernetesConfig = (Map) this.getContextConfiguration().getConfigYml().get("kubernetes"); + + Boolean kubernetesMode = Boolean.valueOf(kubernetesConfig.get("useKubernetes").toString()); + for (Map kafkaServer : kafkaServers) { + + AssertUtils.assertNotNull(kafkaServer, "the server arguments are empty"); + if (kafkaServer.size() > 1 && !kubernetesMode) { + AssertUtils.assertNotNull(kafkaServer.get("displayName"), "The displayName can not be null"); + logger.info("Starting the Kafka Monitoring Task for server : " + kafkaServer.get("displayName")); + } else { + logger.info("Starting the Nginx Monitoring Task"); + } + KafkaMonitorTask task = new KafkaMonitorTask(tasksExecutionServiceProvider, this.getContextConfiguration(), kafkaServer); - AssertUtils.assertNotNull(kafkaServer.get(Constants.DISPLAY_NAME), - "The displayName can not be null"); tasksExecutionServiceProvider.submit(kafkaServer.get(Constants.DISPLAY_NAME), task); } } - protected int getTaskCount () { - List> servers = (List>) getContextConfiguration(). + protected List> getServers() { + List> servers = (List>) getContextConfiguration(). getConfigYml().get(Constants.SERVERS); AssertUtils.assertNotNull(servers, "The 'servers' section in config.yml is not initialized"); - return servers.size(); + return servers; + } + + + public static void main(String[] args) throws TaskExecutionException { + + + ConsoleAppender ca = new ConsoleAppender(); + ca.setWriter(new OutputStreamWriter(System.out)); + ca.setLayout(new PatternLayout("%-5p [%t]: %m%n")); + ca.setThreshold(Level.DEBUG); + org.apache.log4j.Logger.getRootLogger().addAppender(ca); + + KafkaMonitor monitor = new KafkaMonitor(); + + final Map taskArgs = new HashMap<>(); + taskArgs.put("config-file", "/Users/satishrm/AppDynamics/Code/extensions/kafka-monitoring-extension/src/main/resources/conf/config.yml"); + + monitor.execute(taskArgs, null); + } } \ No newline at end of file diff --git a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitorTask.java b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitorTask.java index 7a038ae..1d6b58d 100644 --- a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitorTask.java +++ b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitorTask.java @@ -18,12 +18,11 @@ import com.appdynamics.extensions.AMonitorTaskRunnable; import com.appdynamics.extensions.MetricWriteHelper; -import com.appdynamics.extensions.TaskInputArgs; import com.appdynamics.extensions.TasksExecutionServiceProvider; import com.appdynamics.extensions.conf.MonitorContextConfiguration; -import com.appdynamics.extensions.crypto.CryptoUtil; import com.appdynamics.extensions.kafka.metrics.DomainMetricsProcessor; import com.appdynamics.extensions.kafka.utils.Constants; +import com.appdynamics.extensions.util.CryptoUtils; import com.google.common.base.Strings; import com.google.common.collect.Maps; import org.slf4j.LoggerFactory; @@ -40,23 +39,28 @@ public class KafkaMonitorTask implements AMonitorTaskRunnable { private MonitorContextConfiguration configuration; private Map kafkaServer; private MetricWriteHelper metricWriteHelper; - private String displayName; private JMXConnectionAdapter jmxAdapter; private JMXConnector jmxConnector; + private String displayName = "DEFAULT"; - KafkaMonitorTask (TasksExecutionServiceProvider serviceProvider, MonitorContextConfiguration configuration, - Map kafkaServer) { + + KafkaMonitorTask(TasksExecutionServiceProvider serviceProvider, MonitorContextConfiguration configuration, + Map kafkaServer) { this.configuration = configuration; this.kafkaServer = kafkaServer; this.metricWriteHelper = serviceProvider.getMetricWriteHelper(); - this.displayName = (String) kafkaServer.get(Constants.DISPLAY_NAME); + String name = (String) kafkaServer.get(Constants.DISPLAY_NAME); + + if (!Strings.isNullOrEmpty(name)) { + displayName = name; + } } - public void onTaskComplete () { + public void onTaskComplete() { logger.info("All tasks for server {} finished", this.kafkaServer.get(Constants.DISPLAY_NAME)); } - public void run () { + public void run() { try { logger.info("Starting Kafka Monitoring task for Kafka server: {} ", this.kafkaServer.get(Constants.DISPLAY_NAME)); populateAndPrintMetrics(); @@ -68,7 +72,7 @@ public void run () { } } - public void populateAndPrintMetrics () { + public void populateAndPrintMetrics() { try { BigDecimal connectionStatus = openJMXConnection(); List> mBeansListFromConfig = (List>) configuration.getConfigYml() @@ -95,7 +99,7 @@ public void populateAndPrintMetrics () { } } - private BigDecimal openJMXConnection () { + private BigDecimal openJMXConnection() { try { Map requestMap = buildRequestMap(); jmxAdapter = JMXConnectionAdapter.create(requestMap); @@ -123,7 +127,7 @@ private BigDecimal openJMXConnection () { return null; } - private Map buildRequestMap () { + private Map buildRequestMap() { Map requestMap = new HashMap<>(); requestMap.put(Constants.HOST, this.kafkaServer.get(Constants.HOST)); requestMap.put(Constants.PORT, this.kafkaServer.get(Constants.PORT)); @@ -134,7 +138,7 @@ private Map buildRequestMap () { return requestMap; } - private String getPassword () { + private String getPassword() { String password = this.kafkaServer.get(Constants.PASSWORD); Map configMap = configuration.getConfigYml(); if (!Strings.isNullOrEmpty(password)) { @@ -145,15 +149,15 @@ private String getPassword () { String encryptedPassword = this.kafkaServer.get(Constants.ENCRYPTED_PASSWORD); if (!Strings.isNullOrEmpty(encryptionKey) && !Strings.isNullOrEmpty(encryptedPassword)) { java.util.Map cryptoMap = Maps.newHashMap(); - cryptoMap.put(TaskInputArgs.ENCRYPTED_PASSWORD, encryptedPassword); - cryptoMap.put(TaskInputArgs.ENCRYPTION_KEY, encryptionKey); - return CryptoUtil.getPassword(cryptoMap); + cryptoMap.put(com.appdynamics.extensions.Constants.ENCRYPTED_PASSWORD, encryptedPassword); + cryptoMap.put(com.appdynamics.extensions.Constants.ENCRYPTION_KEY, encryptionKey); + return CryptoUtils.getPassword(cryptoMap); } } return ""; } - private Map getConnectionParameters () { + private Map getConnectionParameters() { if (configuration.getConfigYml().containsKey(Constants.CONNECTION)) return (Map) configuration.getConfigYml().get(Constants.CONNECTION); else diff --git a/src/main/resources/conf/config.yml b/src/main/resources/conf/config.yml index ebadaf2..3ac0dd2 100755 --- a/src/main/resources/conf/config.yml +++ b/src/main/resources/conf/config.yml @@ -1,22 +1,33 @@ #If SIM is enabled, use this -#metricPrefix: 'Custom Metrics|Kafka' +metricPrefix: 'Custom Metrics|Kafka' #If SIM is not enabled, then use this -metricPrefix: 'Server|Component:|Custom Metrics|Kafka' +#metricPrefix: 'Server|Component:|Custom Metrics|Kafka' # To know your Component-ID, Please refer the link # https://community.appdynamics.com/t5/Knowledge-Base/How-to-troubleshoot-missing-custom-metrics-or-extensions-metrics/ta-p/28695 +kubernetes: + useKubernetes: "true" + namespace: "default" + containerImageNameToMatch: "kafka" + #If there is only 1 container port then extension uses that and no need to provide the containerPortName here. + #If there are multiple container ports then provide the containerPortName to filter out the port. + containerPortName: + podLabels: + - name: "run" + value: "my-kafka" + # Add your Kafka Instances below servers: - - serviceUrl: 'service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi' #provide jmx service URL [OR] provide [host][port] pair + - serviceUrl: 'service:jmx:rmi:///jndi/rmi://${host}:${port}/jmxrmi' #provide jmx service URL [OR] provide [host][port] pair host: '' port: '' username: '' password: '' encryptedPassword: '' - displayName: 'Local Kafka Server' - useSsl: false # set to true if you're using SSL for this server + displayName: + useSsl: 'false' # set to true if you're using SSL for this server # Provide the encryption key for the encrypted password encryptionKey: '' @@ -30,14 +41,14 @@ encryptionKey: '' # If you are using the connection section, # any change to the connection section below requires a machine agent restart for the changes to reflect -connection: - socketTimeout: 3000 - connectTimeout: 1000 - sslProtocols: 'TLSv1.2' - sslCipherSuites: '' - sslTrustStorePath: '' #if [sslTrustStorePath]: '' empty, it defaults to /conf/cacerts.jks - sslTrustStorePassword: '' # [sslTrustStorePassword: ""] defaults to '' - sslTrustStoreEncryptedPassword: '' #provide encrypted Password if encryption is needed +#connection: +# socketTimeout: 3000 +# connectTimeout: 1000 +# sslProtocols: 'TLSv1.2' +# sslCipherSuites: '' +# sslTrustStorePath: '' #if [sslTrustStorePath]: '' empty, it defaults to /conf/cacerts.jks +# sslTrustStorePassword: '' # [sslTrustStorePassword: ""] defaults to '' +# sslTrustStoreEncryptedPassword: '' #provide encrypted Password if encryption is needed # Each Kafka server needs 1 thread each, so please configure this according to the number of servers you are monitoring # [numberOfThreads] = Number_of_Kafka_Servers_Monitored diff --git a/src/test/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessorTest.java b/src/test/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessorTest.java index 7898a04..993d645 100644 --- a/src/test/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessorTest.java +++ b/src/test/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessorTest.java @@ -42,10 +42,9 @@ public void whenNonCompositeObjectsThenReturnMetrics() throws MalformedObjectNa MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration ("Kafka Monitor", - "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class), - Mockito.mock(AMonitorJob.class)); + "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class)); - contextConfiguration.setConfigYml("src/test/resources/conf/config_for_non_composite_metrics.yml"); + contextConfiguration.loadConfigYml("src/test/resources/conf/config_for_non_composite_metrics.yml"); Map config = contextConfiguration.getConfigYml(); List mBeans = (List) config.get("mbeans"); Set objectInstances = Sets.newHashSet(); @@ -98,11 +97,10 @@ public void whenCompositeObjectsThenReturnMetrics() throws MalformedObjectNameEx ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(List.class); MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration ("Kafka Monitor", - "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class), - Mockito.mock(AMonitorJob.class)); + "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class)); - contextConfiguration.setConfigYml("src/test/resources/conf/config_for_composite_metrics.yml"); + contextConfiguration.loadConfigYml("src/test/resources/conf/config_for_composite_metrics.yml"); Map config = contextConfiguration.getConfigYml(); List> mBeans = (List>) config.get("mbeans"); Set objectInstances = Sets.newHashSet(); @@ -151,10 +149,9 @@ public void whenCompositeAndNonCompositeObjectsThenReturnMetrics() throws IOExce ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(List.class); MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration ("Kafka Monitor", - "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class), - Mockito.mock(AMonitorJob.class)); + "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class)); - contextConfiguration.setConfigYml("src/test/resources/conf/config_composite_and_non_composite_metrics.yml"); + contextConfiguration.loadConfigYml("src/test/resources/conf/config_composite_and_non_composite_metrics.yml"); Map config = contextConfiguration.getConfigYml(); List> mBeans = (List>) config.get("mbeans"); Set objectInstances = Sets.newHashSet();