Skip to content

Commit

Permalink
Code changes to add k8s auto discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
Satish Reddy M committed Nov 20, 2019
1 parent e05175e commit 26109d3
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.appdynamics.extensions</groupId>
<artifactId>kafka-monitoring-extension</artifactId>
<version>2.0.3</version>
<version>2.0.4-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-monitoring-extension</name>
<url>http://maven.apache.org</url>
Expand All @@ -18,7 +18,7 @@
<dependency>
<groupId>com.appdynamics</groupId>
<artifactId>appd-exts-commons</artifactId>
<version>2.1.0</version>
<version>2.2.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down
47 changes: 42 additions & 5 deletions src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@
import com.appdynamics.extensions.kafka.utils.Constants;
import com.appdynamics.extensions.kafka.utils.SslUtils;
import com.appdynamics.extensions.util.AssertUtils;
import com.singularity.ee.agent.systemagent.api.exception.TaskExecutionException;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.PatternLayout;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.OutputStreamWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -44,20 +50,51 @@ protected void doRun (TasksExecutionServiceProvider tasksExecutionServiceProvide
List<Map<String, String>> kafkaServers = (List<Map<String, String>>)
this.getContextConfiguration().getConfigYml().get(Constants.SERVERS);
logger.info("The size of servers section is: "+kafkaServers);

Map<String, ?> kubernetesConfig = (Map<String, ?>) this.getContextConfiguration().getConfigYml().get("kubernetes");

Boolean kubernetesMode = Boolean.valueOf(kubernetesConfig.get("useKubernetes").toString());

for (Map<String, String> kafkaServer : kafkaServers) {

AssertUtils.assertNotNull(kafkaServer, "the server arguments are empty");
if (kafkaServer.size() > 1 && !kubernetesMode) {
AssertUtils.assertNotNull(kafkaServer.get("displayName"), "The displayName can not be null");
logger.info("Starting the Kafka Monitoring Task for server : " + kafkaServer.get("displayName"));
} else {
logger.info("Starting the Nginx Monitoring Task");
}

KafkaMonitorTask task = new KafkaMonitorTask(tasksExecutionServiceProvider,
this.getContextConfiguration(), kafkaServer);
AssertUtils.assertNotNull(kafkaServer.get(Constants.DISPLAY_NAME),
"The displayName can not be null");
tasksExecutionServiceProvider.submit(kafkaServer.get(Constants.DISPLAY_NAME), task);
}
}

protected int getTaskCount () {
List<Map<String, String>> servers = (List<Map<String, String>>) getContextConfiguration().
protected List<Map<String, ?>> getServers() {
List<Map<String, ?>> servers = (List<Map<String, ?>>) getContextConfiguration().
getConfigYml().get(Constants.SERVERS);
AssertUtils.assertNotNull(servers, "The 'servers' section in config.yml is not initialized");
return servers.size();
return servers;
}


public static void main(String[] args) throws TaskExecutionException {


ConsoleAppender ca = new ConsoleAppender();
ca.setWriter(new OutputStreamWriter(System.out));
ca.setLayout(new PatternLayout("%-5p [%t]: %m%n"));
ca.setThreshold(Level.DEBUG);
org.apache.log4j.Logger.getRootLogger().addAppender(ca);

KafkaMonitor monitor = new KafkaMonitor();

final Map<String, String> taskArgs = new HashMap<>();
taskArgs.put("config-file", "/Users/satishrm/AppDynamics/Code/extensions/kafka-monitoring-extension/src/main/resources/conf/config.yml");

monitor.execute(taskArgs, null);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

import com.appdynamics.extensions.AMonitorTaskRunnable;
import com.appdynamics.extensions.MetricWriteHelper;
import com.appdynamics.extensions.TaskInputArgs;
import com.appdynamics.extensions.TasksExecutionServiceProvider;
import com.appdynamics.extensions.conf.MonitorContextConfiguration;
import com.appdynamics.extensions.crypto.CryptoUtil;
import com.appdynamics.extensions.kafka.metrics.DomainMetricsProcessor;
import com.appdynamics.extensions.kafka.utils.Constants;
import com.appdynamics.extensions.util.CryptoUtils;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.slf4j.LoggerFactory;
Expand All @@ -40,23 +39,28 @@ public class KafkaMonitorTask implements AMonitorTaskRunnable {
private MonitorContextConfiguration configuration;
private Map<String, String> kafkaServer;
private MetricWriteHelper metricWriteHelper;
private String displayName;
private JMXConnectionAdapter jmxAdapter;
private JMXConnector jmxConnector;
private String displayName = "DEFAULT";

KafkaMonitorTask (TasksExecutionServiceProvider serviceProvider, MonitorContextConfiguration configuration,
Map kafkaServer) {

KafkaMonitorTask(TasksExecutionServiceProvider serviceProvider, MonitorContextConfiguration configuration,
Map kafkaServer) {
this.configuration = configuration;
this.kafkaServer = kafkaServer;
this.metricWriteHelper = serviceProvider.getMetricWriteHelper();
this.displayName = (String) kafkaServer.get(Constants.DISPLAY_NAME);
String name = (String) kafkaServer.get(Constants.DISPLAY_NAME);

if (!Strings.isNullOrEmpty(name)) {
displayName = name;
}
}

public void onTaskComplete () {
public void onTaskComplete() {
logger.info("All tasks for server {} finished", this.kafkaServer.get(Constants.DISPLAY_NAME));
}

public void run () {
public void run() {
try {
logger.info("Starting Kafka Monitoring task for Kafka server: {} ", this.kafkaServer.get(Constants.DISPLAY_NAME));
populateAndPrintMetrics();
Expand All @@ -68,7 +72,7 @@ public void run () {
}
}

public void populateAndPrintMetrics () {
public void populateAndPrintMetrics() {
try {
BigDecimal connectionStatus = openJMXConnection();
List<Map<String, ?>> mBeansListFromConfig = (List<Map<String, ?>>) configuration.getConfigYml()
Expand All @@ -95,7 +99,7 @@ public void populateAndPrintMetrics () {
}
}

private BigDecimal openJMXConnection () {
private BigDecimal openJMXConnection() {
try {
Map<String, String> requestMap = buildRequestMap();
jmxAdapter = JMXConnectionAdapter.create(requestMap);
Expand Down Expand Up @@ -123,7 +127,7 @@ private BigDecimal openJMXConnection () {
return null;
}

private Map<String, String> buildRequestMap () {
private Map<String, String> buildRequestMap() {
Map<String, String> requestMap = new HashMap<>();
requestMap.put(Constants.HOST, this.kafkaServer.get(Constants.HOST));
requestMap.put(Constants.PORT, this.kafkaServer.get(Constants.PORT));
Expand All @@ -134,7 +138,7 @@ private Map<String, String> buildRequestMap () {
return requestMap;
}

private String getPassword () {
private String getPassword() {
String password = this.kafkaServer.get(Constants.PASSWORD);
Map<String, ?> configMap = configuration.getConfigYml();
if (!Strings.isNullOrEmpty(password)) {
Expand All @@ -145,15 +149,15 @@ private String getPassword () {
String encryptedPassword = this.kafkaServer.get(Constants.ENCRYPTED_PASSWORD);
if (!Strings.isNullOrEmpty(encryptionKey) && !Strings.isNullOrEmpty(encryptedPassword)) {
java.util.Map<String, String> cryptoMap = Maps.newHashMap();
cryptoMap.put(TaskInputArgs.ENCRYPTED_PASSWORD, encryptedPassword);
cryptoMap.put(TaskInputArgs.ENCRYPTION_KEY, encryptionKey);
return CryptoUtil.getPassword(cryptoMap);
cryptoMap.put(com.appdynamics.extensions.Constants.ENCRYPTED_PASSWORD, encryptedPassword);
cryptoMap.put(com.appdynamics.extensions.Constants.ENCRYPTION_KEY, encryptionKey);
return CryptoUtils.getPassword(cryptoMap);
}
}
return "";
}

private Map<String, ?> getConnectionParameters () {
private Map<String, ?> getConnectionParameters() {
if (configuration.getConfigYml().containsKey(Constants.CONNECTION))
return (Map<String, ?>) configuration.getConfigYml().get(Constants.CONNECTION);
else
Expand Down
37 changes: 24 additions & 13 deletions src/main/resources/conf/config.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
#If SIM is enabled, use this
#metricPrefix: 'Custom Metrics|Kafka'
metricPrefix: 'Custom Metrics|Kafka'

#If SIM is not enabled, then use this
metricPrefix: 'Server|Component:<Component-ID>|Custom Metrics|Kafka'
#metricPrefix: 'Server|Component:<Component-ID>|Custom Metrics|Kafka'

# To know your Component-ID, Please refer the link
# https://community.appdynamics.com/t5/Knowledge-Base/How-to-troubleshoot-missing-custom-metrics-or-extensions-metrics/ta-p/28695

kubernetes:
useKubernetes: "true"
namespace: "default"
containerImageNameToMatch: "kafka"
#If there is only 1 container port then extension uses that and no need to provide the containerPortName here.
#If there are multiple container ports then provide the containerPortName to filter out the port.
containerPortName:
podLabels:
- name: "run"
value: "my-kafka"

# Add your Kafka Instances below
servers:
- serviceUrl: 'service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi' #provide jmx service URL [OR] provide [host][port] pair
- serviceUrl: 'service:jmx:rmi:///jndi/rmi://${host}:${port}/jmxrmi' #provide jmx service URL [OR] provide [host][port] pair
host: ''
port: ''
username: ''
password: ''
encryptedPassword: ''
displayName: 'Local Kafka Server'
useSsl: false # set to true if you're using SSL for this server
displayName:
useSsl: 'false' # set to true if you're using SSL for this server

# Provide the encryption key for the encrypted password
encryptionKey: ''
Expand All @@ -30,14 +41,14 @@ encryptionKey: ''

# If you are using the connection section,
# any change to the connection section below requires a machine agent restart for the changes to reflect
connection:
socketTimeout: 3000
connectTimeout: 1000
sslProtocols: 'TLSv1.2'
sslCipherSuites: ''
sslTrustStorePath: '' #if [sslTrustStorePath]: '' empty, it defaults to <MachineAgent_Home>/conf/cacerts.jks
sslTrustStorePassword: '' # [sslTrustStorePassword: ""] defaults to ''
sslTrustStoreEncryptedPassword: '' #provide encrypted Password if encryption is needed
#connection:
# socketTimeout: 3000
# connectTimeout: 1000
# sslProtocols: 'TLSv1.2'
# sslCipherSuites: ''
# sslTrustStorePath: '' #if [sslTrustStorePath]: '' empty, it defaults to <MachineAgent_Home>/conf/cacerts.jks
# sslTrustStorePassword: '' # [sslTrustStorePassword: ""] defaults to ''
# sslTrustStoreEncryptedPassword: '' #provide encrypted Password if encryption is needed

# Each Kafka server needs 1 thread each, so please configure this according to the number of servers you are monitoring
# [numberOfThreads] = Number_of_Kafka_Servers_Monitored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ public void whenNonCompositeObjectsThenReturnMetrics() throws MalformedObjectNa

MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration
("Kafka Monitor",
"Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class),
Mockito.mock(AMonitorJob.class));
"Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class));

contextConfiguration.setConfigYml("src/test/resources/conf/config_for_non_composite_metrics.yml");
contextConfiguration.loadConfigYml("src/test/resources/conf/config_for_non_composite_metrics.yml");
Map config = contextConfiguration.getConfigYml();
List<Map> mBeans = (List<Map>) config.get("mbeans");
Set<ObjectInstance> objectInstances = Sets.newHashSet();
Expand Down Expand Up @@ -98,11 +97,10 @@ public void whenCompositeObjectsThenReturnMetrics() throws MalformedObjectNameEx
ArgumentCaptor<List> pathCaptor = ArgumentCaptor.forClass(List.class);
MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration
("Kafka Monitor",
"Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class),
Mockito.mock(AMonitorJob.class));
"Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class));


contextConfiguration.setConfigYml("src/test/resources/conf/config_for_composite_metrics.yml");
contextConfiguration.loadConfigYml("src/test/resources/conf/config_for_composite_metrics.yml");
Map config = contextConfiguration.getConfigYml();
List<Map<String, ?>> mBeans = (List<Map<String, ?>>) config.get("mbeans");
Set<ObjectInstance> objectInstances = Sets.newHashSet();
Expand Down Expand Up @@ -151,10 +149,9 @@ public void whenCompositeAndNonCompositeObjectsThenReturnMetrics() throws IOExce
ArgumentCaptor<List> pathCaptor = ArgumentCaptor.forClass(List.class);
MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration
("Kafka Monitor",
"Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class),
Mockito.mock(AMonitorJob.class));
"Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class));

contextConfiguration.setConfigYml("src/test/resources/conf/config_composite_and_non_composite_metrics.yml");
contextConfiguration.loadConfigYml("src/test/resources/conf/config_composite_and_non_composite_metrics.yml");
Map config = contextConfiguration.getConfigYml();
List<Map<String, ?>> mBeans = (List<Map<String, ?>>) config.get("mbeans");
Set<ObjectInstance> objectInstances = Sets.newHashSet();
Expand Down

0 comments on commit 26109d3

Please sign in to comment.