diff --git a/samza-core/src/main/java/org/apache/samza/metrics/ApiType.java b/samza-core/src/main/java/org/apache/samza/metrics/ApiType.java new file mode 100644 index 0000000000..9d36b35bdd --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/metrics/ApiType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +public enum ApiType { + SAMZA_LOW_LEVEL, SAMZA_HIGH_LEVEL, SAMZA_SQL, SAMZA_BEAM +} diff --git a/samza-core/src/main/java/org/apache/samza/metrics/DeploymentType.java b/samza-core/src/main/java/org/apache/samza/metrics/DeploymentType.java new file mode 100644 index 0000000000..ff100d5757 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/metrics/DeploymentType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +public enum DeploymentType { + YARN, STANDALONE +} diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java index 0b88fa044f..dfc852057c 100644 --- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java @@ -70,7 +70,10 @@ public static void writeMetadataFile(String jobName, String jobId, String contai MetricsHeader metricsHeader = new MetricsHeader(jobName, jobId, "samza-container-" + containerId, execEnvContainerId.orElse(""), LocalContainerRunner.class.getName(), Util.getTaskClassVersion(config), Util.getSamzaVersion(), - Util.getLocalHost().getHostName(), System.currentTimeMillis(), System.currentTimeMillis()); + Util.getLocalHost().getHostName(), System.currentTimeMillis(), System.currentTimeMillis(), Util.getDeploymentType(config), + Util.getApiType(config), Util.getContainerCount(config), Util.getContainerMemoryMb(config), Util.getNumCores(config), + Util.getThreadPoolSize(config), Util.getHostAffinityEnabled(config), Util.getSspGrouperFactory(config), + Util.getContainerRetryCount(config), Util.getContainerRetryWindowMs(config), Util.getMaxConcurrency(config), Util.getMaxJvmHeapMb()); class MetadataFileContents { public final String version; @@ -127,6 +130,14 @@ public static Optional> buildD String hostName = Util.getLocalHost().getHostName(); Optional diagnosticsReporterStreamName = metricsConfig.getMetricsSnapshotReporterStream(diagnosticsReporterName); + String deploymentType = Util.getDeploymentType(config); + String apiType = Util.getApiType(config); + int numContainers = Util.getContainerCount(config); + boolean hostAffinityEnabled = Util.getHostAffinityEnabled(config); + String sspGrouperFactory = Util.getSspGrouperFactory(config); + int containerRetryCount = Util.getContainerRetryCount(config); + long containerRetryWindowMs = Util.getContainerRetryWindowMs(config); + int maxConcurrency = Util.getMaxConcurrency(config); if (!diagnosticsReporterStreamName.isPresent()) { throw new ConfigException( @@ -151,7 +162,9 @@ public static Optional> buildD new StorageConfig(config).getNumPersistentStores(), maxHeapSizeBytes, containerThreadPoolSize, containerId, execEnvContainerId.orElse(""), taskClassVersion, samzaVersion, hostName, diagnosticsSystemStream, systemProducer, - Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled()); + Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled(), + deploymentType, apiType, numContainers, hostAffinityEnabled, sspGrouperFactory, containerRetryCount, + containerRetryWindowMs, maxConcurrency); diagnosticsManagerReporterPair = Optional.of(new ImmutablePair<>(diagnosticsManager, diagnosticsReporter)); } diff --git a/samza-core/src/main/java/org/apache/samza/util/Util.java b/samza-core/src/main/java/org/apache/samza/util/Util.java index f233c32a3a..bf6b3fc007 100644 --- a/samza-core/src/main/java/org/apache/samza/util/Util.java +++ b/samza-core/src/main/java/org/apache/samza/util/Util.java @@ -29,15 +29,23 @@ import java.util.stream.Collectors; import com.google.common.collect.Lists; import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.TaskConfig; +import org.apache.samza.metrics.ApiType; +import org.apache.samza.metrics.DeploymentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Util { private static final Logger LOG = LoggerFactory.getLogger(Util.class); + private static final String YARN_JOB_FACTORY_CLASS = "org.apache.samza.job.yarn.YarnJobFactory"; + private static final String BEAM_RUNNER_CLASS = "org.apache.beam.runners.samza.SamzaRunner"; + private static final String SQL_RUNNER_CLASS = "org.apache.samza.sql.runner.SamzaSqlApplication"; static final String FALLBACK_VERSION = "0.0.1"; @@ -123,4 +131,85 @@ private static InetAddress doGetLocalHost() throws UnknownHostException, SocketE } return localHost; } + + public static String getDeploymentType(Config config) { + JobConfig jobConfig = new JobConfig(config); + Optional streamJobFactoryClass = jobConfig.getStreamJobFactoryClass(); + if (streamJobFactoryClass.isPresent()) { + if (streamJobFactoryClass.get().equals(YARN_JOB_FACTORY_CLASS)) { + return DeploymentType.YARN.name(); + } else { + return DeploymentType.STANDALONE.name(); + } + } + return "NOT_DEFINED"; + } + + public static String getApiType(Config config) { + ApplicationConfig appConfig = new ApplicationConfig(config); + String appClass = appConfig.getAppClass(); + if (appClass == null || appClass.isEmpty()) { + return ApiType.SAMZA_LOW_LEVEL.name(); + } + if (appClass.equals(BEAM_RUNNER_CLASS)) { + return ApiType.SAMZA_BEAM.name(); + } + if (appClass.equals(SQL_RUNNER_CLASS)) { + return ApiType.SAMZA_SQL.name(); + } + if (appClass.getClass().isInstance(StreamApplication.class)) { + return ApiType.SAMZA_HIGH_LEVEL.name(); + } + return ApiType.SAMZA_LOW_LEVEL.name(); + } + + public static int getContainerCount(Config config) { + JobConfig jobConfig = new JobConfig(config); + return jobConfig.getContainerCount(); + } + + public static int getContainerMemoryMb(Config config) { + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + return clusterManagerConfig.getContainerMemoryMb(); + } + + public static int getNumCores(Config config) { + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + return clusterManagerConfig.getNumCores(); + } + + public static int getThreadPoolSize(Config config) { + JobConfig jobConfig = new JobConfig(config); + return jobConfig.getThreadPoolSize(); + } + + public static String getSspGrouperFactory(Config config) { + JobConfig jobConfig = new JobConfig(config); + return jobConfig.getSystemStreamPartitionGrouperFactory(); + } + + public static boolean getHostAffinityEnabled(Config config) { + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + return clusterManagerConfig.getHostAffinityEnabled(); + } + + public static int getContainerRetryCount(Config config) { + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + return clusterManagerConfig.getContainerRetryCount(); + } + + public static int getContainerRetryWindowMs(Config config) { + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + return clusterManagerConfig.getContainerRetryWindowMs(); + } + + public static int getMaxConcurrency(Config config) { + TaskConfig taskConfig = new TaskConfig(config); + return taskConfig.getMaxConcurrency(); + } + + public static int getMaxJvmHeapMb() { + Long maxJvmHeapMb = Runtime.getRuntime().maxMemory() / (1024 * 1024); + return maxJvmHeapMb.intValue(); + } } \ No newline at end of file diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java index f77dab898a..b848f944e2 100644 --- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java @@ -68,6 +68,15 @@ public class DiagnosticsManager { private final int containerThreadPoolSize; private final Map containerModels; private final boolean autosizingEnabled; + private final String deploymentType; + private final String apiType; + private final int numContainers; + private final boolean hostAffinityEnabled; + private final String sspGrouperFactory; + private final int containerRetryCount; + private final long containerRetryWindowMs; + private final int maxConcurrency; + private boolean jobParamsEmitted = false; private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data @@ -93,12 +102,23 @@ public DiagnosticsManager(String jobName, String hostname, SystemStream diagnosticSystemStream, SystemProducer systemProducer, - Duration terminationDuration, boolean autosizingEnabled) { + Duration terminationDuration, + boolean autosizingEnabled, + String deploymentType, + String apiType, + int numContainers, + boolean hostAffinityEnabled, + String sspGrouperFactory, + int containerRetryCount, + long containerRetryWindowMs, + int maxConcurrency) { this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize, containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer, terminationDuration, Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled); + new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled, + deploymentType, apiType, numContainers, hostAffinityEnabled, sspGrouperFactory, containerRetryCount, + containerRetryWindowMs, maxConcurrency); } @VisibleForTesting @@ -118,7 +138,16 @@ public DiagnosticsManager(String jobName, SystemStream diagnosticSystemStream, SystemProducer systemProducer, Duration terminationDuration, - ScheduledExecutorService executorService, boolean autosizingEnabled) { + ScheduledExecutorService executorService, + boolean autosizingEnabled, + String deploymentType, + String apiType, + int numContainers, + boolean hostAffinityEnabled, + String sspGrouperFactory, + int containerRetryCount, + long containerRetryWindowMs, + int maxConcurrency) { this.jobName = jobName; this.jobId = jobId; this.containerModels = containerModels; @@ -140,6 +169,14 @@ public DiagnosticsManager(String jobName, this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters this.scheduler = executorService; this.autosizingEnabled = autosizingEnabled; + this.deploymentType = deploymentType; + this.apiType = apiType; + this.numContainers = numContainers; + this.hostAffinityEnabled = hostAffinityEnabled; + this.sspGrouperFactory = sspGrouperFactory; + this.containerRetryCount = containerRetryCount; + this.containerRetryWindowMs = containerRetryWindowMs; + this.maxConcurrency = maxConcurrency; resetTime = Instant.now(); this.systemProducer.register(getClass().getSimpleName()); @@ -195,9 +232,12 @@ private class DiagnosticsStreamPublisher implements Runnable { @Override public void run() { try { + Long maxJvmHeapMb = maxHeapSizeBytes / (1024 * 1024); DiagnosticsStreamMessage diagnosticsStreamMessage = new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId, - taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli()); + taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli(), deploymentType, + apiType, numContainers, containerMemoryMb, containerNumCores, containerThreadPoolSize, hostAffinityEnabled, + sspGrouperFactory, containerRetryCount, containerRetryWindowMs, maxConcurrency, maxJvmHeapMb.intValue()); // Add job-related params to the message (if not already published) if (!jobParamsEmitted) { diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java index 15cce035ac..21a5e56c1b 100644 --- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java @@ -65,12 +65,17 @@ public class DiagnosticsStreamMessage { private final Map> metricsMessage; public DiagnosticsStreamMessage(String jobName, String jobId, String containerName, String executionEnvContainerId, - String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp) { + String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp, + String deploymentType, String apiType, int numContainers, int containerMemoryMb, int numCores, int threadPoolSize, + boolean hostAffinityEnabled, String sspGrouperFactory, int containerRetryCount, + long containerRetryWindowMs, int maxConcurrency, int maxJvmHeapMb) { // Create the metricHeader metricsHeader = new MetricsHeader(jobName, jobId, containerName, executionEnvContainerId, DiagnosticsManager.class.getName(), - taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp); + taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp, deploymentType, apiType, numContainers, + containerMemoryMb, numCores, threadPoolSize, hostAffinityEnabled, sspGrouperFactory, + containerRetryCount, containerRetryWindowMs, maxConcurrency, maxJvmHeapMb); this.metricsMessage = new HashMap<>(); } @@ -237,7 +242,13 @@ public static DiagnosticsStreamMessage convertToDiagnosticsStreamMessage(Metrics metricsSnapshot.getHeader().getContainerName(), metricsSnapshot.getHeader().getExecEnvironmentContainerId(), metricsSnapshot.getHeader().getVersion(), metricsSnapshot.getHeader().getSamzaVersion(), metricsSnapshot.getHeader().getHost(), metricsSnapshot.getHeader().getTime(), - metricsSnapshot.getHeader().getResetTime()); + metricsSnapshot.getHeader().getResetTime(), metricsSnapshot.getHeader().getDeploymentType(), + metricsSnapshot.getHeader().getApiType(), metricsSnapshot.getHeader().getNumContainers(), + metricsSnapshot.getHeader().getContainerMemoryMb(), metricsSnapshot.getHeader().getContainerCpuCores(), + metricsSnapshot.getHeader().getContainerThreadPoolSize(), metricsSnapshot.getHeader().getHostAffinity(), + metricsSnapshot.getHeader().getSspGrouper(), metricsSnapshot.getHeader().getMaxContainerRetryCount(), + metricsSnapshot.getHeader().getContainerRetryWindowMs(), metricsSnapshot.getHeader().getTaskMaxConcurrency(), + metricsSnapshot.getHeader().getMaxJvmHeapMb()); Map> metricsMap = metricsSnapshot.getMetrics().getAsMap(); Map diagnosticsManagerGroupMap = metricsMap.get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER); diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala index 2fef04eb34..e2fcc7d570 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala @@ -35,7 +35,19 @@ object MetricsHeader { map.get("samza-version").toString, map.get("host").toString, map.get("time").asInstanceOf[Number].longValue, - map.get("reset-time").asInstanceOf[Number].longValue) + map.get("reset-time").asInstanceOf[Number].longValue, + map.get("deployment-type").toString, + map.get("api-type").toString, + map.get("num-containers").asInstanceOf[Number].intValue(), + map.get("container-memory-mb").asInstanceOf[Number].intValue(), + map.get("container-cpu-cores").asInstanceOf[Number].intValue(), + map.get("container-thread-pool-size").asInstanceOf[Number].intValue(), + map.get("host-affinity").asInstanceOf[Boolean].booleanValue(), + map.get("ssp-grouper").toString, + map.get("max-container-retry-count").asInstanceOf[Number].intValue(), + map.get("container-retry-window-ms").asInstanceOf[Number].longValue(), + map.get("task-max-concurrency").asInstanceOf[Number].intValue(), + map.get("max-jvm-heap-mb").asInstanceOf[Number].intValue()) } } @@ -52,7 +64,19 @@ class MetricsHeader( @BeanProperty val samzaVersion: String, @BeanProperty val host: String, @BeanProperty val time: Long, - @BeanProperty val resetTime: Long) { + @BeanProperty val resetTime: Long, + @BeanProperty val deploymentType: String, + @BeanProperty val apiType: String, + @BeanProperty val numContainers: Int, + @BeanProperty val containerMemoryMb: Int, + @BeanProperty val containerCpuCores: Int, + @BeanProperty val containerThreadPoolSize: Int, + @BeanProperty val hostAffinity: Boolean, + @BeanProperty val sspGrouper: String, + @BeanProperty val maxContainerRetryCount: Int, + @BeanProperty val containerRetryWindowMs: Long, + @BeanProperty val taskMaxConcurrency: Int, + @BeanProperty val maxJvmHeapMb: Int) { def getAsMap: Map[String, Object] = { val map = new HashMap[String, Object] @@ -66,6 +90,18 @@ class MetricsHeader( map.put("host", host) map.put("time", time: java.lang.Long) map.put("reset-time", resetTime: java.lang.Long) + map.put("deployment-type", deploymentType) + map.put("api-type", apiType) + map.put("num-containers", numContainers: java.lang.Integer) + map.put("container-memory-mb", containerMemoryMb: java.lang.Integer) + map.put("container-cpu-cores", containerCpuCores: java.lang.Integer) + map.put("container-thread-pool-size", containerThreadPoolSize: java.lang.Integer) + map.put("host-affinity", hostAffinity: java.lang.Boolean) + map.put("ssp-grouper", sspGrouper) + map.put("max-container-retry-count", maxContainerRetryCount: java.lang.Integer) + map.put("container-retry-window-ms", containerRetryWindowMs: java.lang.Long) + map.put("task-max-concurrency", taskMaxConcurrency: java.lang.Integer) + map.put("max-jvm-heap-mb", maxJvmHeapMb: java.lang.Integer) map } } diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala index 7a0d79fc39..f2dee389b4 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala @@ -57,6 +57,18 @@ class MetricsSnapshotReporter( host: String, serializer: Serializer[MetricsSnapshot] = null, blacklist: Option[String], + deploymentType: String, + apiType: String, + numContainers: Int, + containerMemoryMb: Int, + numCores: Int, + threadPoolSize: Int, + hostAffinityEnabled: Boolean, + sspGrouperFactory: String, + containerRetryCount: Int, + containerRetryWindowMs: Long, + maxConcurrency: Int, + maxJvmHeapMb: Int, clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging { val execEnvironmentContainerId = Option[String](System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).getOrElse("") @@ -148,7 +160,10 @@ class MetricsSnapshotReporter( // publish to Kafka only if the metricsMsg carries any metrics if (!metricsMsg.isEmpty) { - val header = new MetricsHeader(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host, clock(), resetTime) + val header = new MetricsHeader(jobName, jobId, containerName, execEnvironmentContainerId, source, version, + samzaVersion, host, clock(), resetTime, deploymentType, apiType, numContainers, containerMemoryMb, numCores, + threadPoolSize, hostAffinityEnabled, sspGrouperFactory, containerRetryCount, + containerRetryWindowMs, maxConcurrency, maxJvmHeapMb) val metrics = new Metrics(metricsMsg) debug("Flushing metrics for %s to %s with header and map: header=%s, map=%s." format(source, out, header.getAsMap, metrics.getAsMap())) diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala index 2f9a0baf59..fbeb0acefb 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala @@ -124,7 +124,20 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging Util.getTaskClassVersion(config), Util.getSamzaVersion, Util.getLocalHost.getHostName, - serde, blacklist) + serde, + blacklist, + Util.getDeploymentType(config), + Util.getApiType(config), + Util.getContainerCount(config), + Util.getContainerMemoryMb(config), + Util.getNumCores(config), + Util.getThreadPoolSize(config), + Util.getHostAffinityEnabled(config), + Util.getSspGrouperFactory(config), + Util.getContainerRetryCount(config), + Util.getContainerRetryWindowMs(config), + Util.getMaxConcurrency(config), + Util.getMaxJvmHeapMb) reporter.register(this.getClass.getSimpleName, registry) diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java index 6429a54246..099886c086 100644 --- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java +++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java @@ -59,6 +59,14 @@ public class TestDiagnosticsManager { private int numPersistentStores = 2; private int containerNumCores = 2; private boolean autosizingEnabled = false; + private String deploymentType = "test deployment type"; + private String apiType = "test api type"; + private int numContainers = 1; + private boolean hostAffinityEnabled = false; + private String sspGrouperFactory = "org.apache.samza.container.grouper.stream.GroupByPartitionFactory"; + private int containerRetryCount = 8; + private long containerRetryWindowMs = 300000; + private int maxConcurrency = 1; private Map containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels(); private Collection exceptionEventList = TestDiagnosticsStreamMessage.getExceptionList(); @@ -78,9 +86,10 @@ public void setup() { }); this.diagnosticsManager = - new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize, - "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream, - mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled); + new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, + containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream, + mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled, deploymentType, apiType, numContainers, + hostAffinityEnabled, sspGrouperFactory, containerRetryCount, containerRetryWindowMs, maxConcurrency); exceptionEventList.forEach( diagnosticsExceptionEvent -> this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent)); @@ -95,7 +104,8 @@ public void testDiagnosticsManagerStart() { new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream, mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, - autosizingEnabled); + autosizingEnabled, deploymentType, apiType, numContainers, + hostAffinityEnabled, sspGrouperFactory, containerRetryCount, containerRetryWindowMs, maxConcurrency); diagnosticsManager.start(); @@ -114,7 +124,8 @@ public void testDiagnosticsManagerStop() throws InterruptedException { new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService, - autosizingEnabled); + autosizingEnabled, deploymentType, apiType, numContainers, + hostAffinityEnabled, sspGrouperFactory, containerRetryCount, containerRetryWindowMs, maxConcurrency); diagnosticsManager.stop(); @@ -134,7 +145,8 @@ public void testDiagnosticsManagerForceStop() throws InterruptedException { new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService, - autosizingEnabled); + autosizingEnabled, deploymentType, apiType, numContainers, + hostAffinityEnabled, sspGrouperFactory, containerRetryCount, containerRetryWindowMs, maxConcurrency); diagnosticsManager.stop(); @@ -252,7 +264,17 @@ private void validateMetricsHeader(OutgoingMessageEnvelope outgoingMessageEnvelo Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion); Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname); Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName()); - + Assert.assertEquals(metricsSnapshot.getHeader().getDeploymentType(), deploymentType); + Assert.assertEquals(metricsSnapshot.getHeader().getApiType(), apiType); + Assert.assertEquals(metricsSnapshot.getHeader().getNumContainers(), numContainers); + Assert.assertEquals(metricsSnapshot.getHeader().getContainerMemoryMb(), containerMb); + Assert.assertEquals(metricsSnapshot.getHeader().getContainerCpuCores(), containerNumCores); + Assert.assertEquals(metricsSnapshot.getHeader().getContainerThreadPoolSize(), containerThreadPoolSize); + Assert.assertEquals(metricsSnapshot.getHeader().getHostAffinity(), hostAffinityEnabled); + Assert.assertEquals(metricsSnapshot.getHeader().getSspGrouper(), sspGrouperFactory); + Assert.assertEquals(metricsSnapshot.getHeader().getMaxContainerRetryCount(), containerRetryCount); + Assert.assertEquals(metricsSnapshot.getHeader().getContainerRetryWindowMs(), containerRetryWindowMs); + Assert.assertEquals(metricsSnapshot.getHeader().getTaskMaxConcurrency(), maxConcurrency); } private void validateOutgoingMessageEnvelope(OutgoingMessageEnvelope outgoingMessageEnvelope) { diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java index cd506b25e0..cd173df22a 100644 --- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java +++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java @@ -50,7 +50,10 @@ public class TestDiagnosticsStreamMessage { private DiagnosticsStreamMessage getDiagnosticsStreamMessage() { DiagnosticsStreamMessage diagnosticsStreamMessage = new DiagnosticsStreamMessage(jobName, jobId, containerName, executionEnvContainerId, taskClassVersion, - samzaVersion, hostname, timestamp, resetTimestamp); + samzaVersion, hostname, timestamp, resetTimestamp, "test deployment type", "test api type", + 1, 1024, 2, 1, false, + "org.apache.samza.container.grouper.stream.GroupByPartitionFactory", 8, + 300000, 1, 756); diagnosticsStreamMessage.addContainerMb(1024); diagnosticsStreamMessage.addContainerNumCores(2); diff --git a/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java b/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java index 1f69a7e2c9..2a733826e3 100644 --- a/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java +++ b/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java @@ -178,7 +178,10 @@ public void testMetricsEmission() { private MetricsSnapshotReporter getMetricsSnapshotReporter(String blacklist) { return new MetricsSnapshotReporter(producer, SYSTEM_STREAM, REPORTING_INTERVAL, JOB_NAME, JOB_ID, CONTAINER_NAME, - TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer, new Some<>(blacklist), getClock()); + TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer, new Some<>(blacklist), "test deployment type", + "test api type", 1, 1024, 1, 1, + false, "org.apache.samza.container.grouper.stream.GroupByPartitionFactory", + 8, 300000, 1, 756, getClock()); } private AbstractFunction0 getClock() { diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java index b0b65e07e1..730cfb6618 100644 --- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java @@ -38,7 +38,9 @@ public class TestMetricsSnapshotSerdeV2 { public void testSerde() { MetricsHeader metricsHeader = new MetricsHeader("jobName", "i001", "container 0", "test container ID", "source", "300.14.25.1", "1", "1", 1, - 1); + 1, "test deployment type", "test api type", 1, 1024, 1, + 1, false, "org.apache.samza.container.grouper.stream.GroupByPartitionFactory", + 8, 300000, 1, 756); BoundedList boundedList = new BoundedList("exceptions"); DiagnosticsExceptionEvent diagnosticsExceptionEvent1 = diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala index 360e6fa193..09d9fda0ed 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala @@ -34,7 +34,11 @@ class TestMetricsSnapshotSerde { @Ignore @Test def testMetricsSerdeShouldSerializeAndDeserializeAMetric { - val header = new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id", "test source", "version", "samzaversion", "host", 1L, 2L) + val header = new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id", + "test source", "version", "samzaversion", "host", 1L, 2L, + "test deployment type", "test api type", 1, 1024, 1, + 1, false, "org.apache.samza.container.grouper.stream.GroupByPartitionFactory", + 8, 300000, 1, 756) val metricsMap = new HashMap[String, Object]() metricsMap.put("test2", "foo") val metricsGroupMap = new HashMap[String, Map[String, Object]]()