-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-2561: Add job features to MetricsHeader #1402
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.samza.metrics; | ||
|
||
public enum ApiType { | ||
SAMZA_LOW_LEVEL, SAMZA_HIGH_LEVEL, SAMZA_SQL, SAMZA_BEAM | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.samza.metrics; | ||
|
||
public enum DeploymentType { | ||
YARN, STANDALONE | ||
} | ||
Comment on lines
+1
to
+24
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Single line enums like these can likely be added to the metrics-header class, since that seems like the only place theyre used no? |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,15 +29,23 @@ | |
import java.util.stream.Collectors; | ||
import com.google.common.collect.Lists; | ||
import org.apache.samza.SamzaException; | ||
import org.apache.samza.application.StreamApplication; | ||
import org.apache.samza.config.ApplicationConfig; | ||
import org.apache.samza.config.ClusterManagerConfig; | ||
import org.apache.samza.config.Config; | ||
import org.apache.samza.config.JobConfig; | ||
import org.apache.samza.config.TaskConfig; | ||
import org.apache.samza.metrics.ApiType; | ||
import org.apache.samza.metrics.DeploymentType; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
||
public class Util { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related to your change but JobInfoUtil is a better name for this Util class rather than just Util |
||
private static final Logger LOG = LoggerFactory.getLogger(Util.class); | ||
private static final String YARN_JOB_FACTORY_CLASS = "org.apache.samza.job.yarn.YarnJobFactory"; | ||
private static final String BEAM_RUNNER_CLASS = "org.apache.beam.runners.samza.SamzaRunner"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this actually needed given |
||
private static final String SQL_RUNNER_CLASS = "org.apache.samza.sql.runner.SamzaSqlApplication"; | ||
|
||
static final String FALLBACK_VERSION = "0.0.1"; | ||
|
||
|
@@ -123,4 +131,85 @@ private static InetAddress doGetLocalHost() throws UnknownHostException, SocketE | |
} | ||
return localHost; | ||
} | ||
|
||
public static String getDeploymentType(Config config) { | ||
JobConfig jobConfig = new JobConfig(config); | ||
Optional<String> streamJobFactoryClass = jobConfig.getStreamJobFactoryClass(); | ||
if (streamJobFactoryClass.isPresent()) { | ||
if (streamJobFactoryClass.get().equals(YARN_JOB_FACTORY_CLASS)) { | ||
return DeploymentType.YARN.name(); | ||
} else { | ||
return DeploymentType.STANDALONE.name(); | ||
} | ||
} | ||
return "NOT_DEFINED"; | ||
} | ||
|
||
public static String getApiType(Config config) { | ||
ApplicationConfig appConfig = new ApplicationConfig(config); | ||
String appClass = appConfig.getAppClass(); | ||
if (appClass == null || appClass.isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this ever true? for legacy task class applications? |
||
return ApiType.SAMZA_LOW_LEVEL.name(); | ||
} | ||
if (appClass.equals(BEAM_RUNNER_CLASS)) { | ||
return ApiType.SAMZA_BEAM.name(); | ||
} | ||
if (appClass.equals(SQL_RUNNER_CLASS)) { | ||
return ApiType.SAMZA_SQL.name(); | ||
} | ||
if (appClass.getClass().isInstance(StreamApplication.class)) { | ||
return ApiType.SAMZA_HIGH_LEVEL.name(); | ||
} | ||
return ApiType.SAMZA_LOW_LEVEL.name(); | ||
} | ||
|
||
public static int getContainerCount(Config config) { | ||
JobConfig jobConfig = new JobConfig(config); | ||
return jobConfig.getContainerCount(); | ||
} | ||
|
||
Comment on lines
+166
to
+170
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why even have it as a util? Same for getContainerMemoryMb, getNumCores, getThreadPoolSize, getSspGrouperFactory, getHostAffinityEnabled, getContainerRetryCount, getContainerRetryWindowMs, getMaxConcurrency. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
public static int getContainerMemoryMb(Config config) { | ||
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); | ||
return clusterManagerConfig.getContainerMemoryMb(); | ||
} | ||
|
||
public static int getNumCores(Config config) { | ||
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); | ||
return clusterManagerConfig.getNumCores(); | ||
} | ||
|
||
public static int getThreadPoolSize(Config config) { | ||
JobConfig jobConfig = new JobConfig(config); | ||
return jobConfig.getThreadPoolSize(); | ||
} | ||
|
||
public static String getSspGrouperFactory(Config config) { | ||
JobConfig jobConfig = new JobConfig(config); | ||
return jobConfig.getSystemStreamPartitionGrouperFactory(); | ||
} | ||
|
||
public static boolean getHostAffinityEnabled(Config config) { | ||
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); | ||
return clusterManagerConfig.getHostAffinityEnabled(); | ||
} | ||
|
||
public static int getContainerRetryCount(Config config) { | ||
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); | ||
return clusterManagerConfig.getContainerRetryCount(); | ||
} | ||
|
||
public static int getContainerRetryWindowMs(Config config) { | ||
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); | ||
return clusterManagerConfig.getContainerRetryWindowMs(); | ||
} | ||
|
||
public static int getMaxConcurrency(Config config) { | ||
TaskConfig taskConfig = new TaskConfig(config); | ||
return taskConfig.getMaxConcurrency(); | ||
} | ||
|
||
public static int getMaxJvmHeapMb() { | ||
Long maxJvmHeapMb = Runtime.getRuntime().maxMemory() / (1024 * 1024); | ||
return maxJvmHeapMb.intValue(); | ||
} | ||
Comment on lines
+211
to
+214
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can DiagnosticsUtil.buildDiagnosticsManager also invoke this util method, instead of calling Runtime.getRuntime().maxMemory directly ? |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,15 @@ public class DiagnosticsManager { | |
private final int containerThreadPoolSize; | ||
private final Map<String, ContainerModel> containerModels; | ||
private final boolean autosizingEnabled; | ||
private final String deploymentType; | ||
private final String apiType; | ||
private final int numContainers; | ||
private final boolean hostAffinityEnabled; | ||
private final String sspGrouperFactory; | ||
private final int containerRetryCount; | ||
private final long containerRetryWindowMs; | ||
private final int maxConcurrency; | ||
|
||
private boolean jobParamsEmitted = false; | ||
|
||
private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data | ||
|
@@ -93,12 +102,23 @@ public DiagnosticsManager(String jobName, | |
String hostname, | ||
SystemStream diagnosticSystemStream, | ||
SystemProducer systemProducer, | ||
Duration terminationDuration, boolean autosizingEnabled) { | ||
Duration terminationDuration, | ||
boolean autosizingEnabled, | ||
String deploymentType, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should deploymentType and api type be enums, since we defined an enum for them above? |
||
String apiType, | ||
int numContainers, | ||
boolean hostAffinityEnabled, | ||
String sspGrouperFactory, | ||
int containerRetryCount, | ||
long containerRetryWindowMs, | ||
int maxConcurrency) { | ||
|
||
this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize, | ||
containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer, | ||
terminationDuration, Executors.newSingleThreadScheduledExecutor( | ||
new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled); | ||
new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled, | ||
deploymentType, apiType, numContainers, hostAffinityEnabled, sspGrouperFactory, containerRetryCount, | ||
containerRetryWindowMs, maxConcurrency); | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -118,7 +138,16 @@ public DiagnosticsManager(String jobName, | |
SystemStream diagnosticSystemStream, | ||
SystemProducer systemProducer, | ||
Duration terminationDuration, | ||
ScheduledExecutorService executorService, boolean autosizingEnabled) { | ||
ScheduledExecutorService executorService, | ||
boolean autosizingEnabled, | ||
String deploymentType, | ||
String apiType, | ||
int numContainers, | ||
boolean hostAffinityEnabled, | ||
String sspGrouperFactory, | ||
int containerRetryCount, | ||
long containerRetryWindowMs, | ||
int maxConcurrency) { | ||
this.jobName = jobName; | ||
this.jobId = jobId; | ||
this.containerModels = containerModels; | ||
|
@@ -140,6 +169,14 @@ public DiagnosticsManager(String jobName, | |
this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters | ||
this.scheduler = executorService; | ||
this.autosizingEnabled = autosizingEnabled; | ||
this.deploymentType = deploymentType; | ||
this.apiType = apiType; | ||
this.numContainers = numContainers; | ||
this.hostAffinityEnabled = hostAffinityEnabled; | ||
this.sspGrouperFactory = sspGrouperFactory; | ||
this.containerRetryCount = containerRetryCount; | ||
this.containerRetryWindowMs = containerRetryWindowMs; | ||
this.maxConcurrency = maxConcurrency; | ||
|
||
resetTime = Instant.now(); | ||
this.systemProducer.register(getClass().getSimpleName()); | ||
|
@@ -195,9 +232,12 @@ private class DiagnosticsStreamPublisher implements Runnable { | |
@Override | ||
public void run() { | ||
try { | ||
Long maxJvmHeapMb = maxHeapSizeBytes / (1024 * 1024); | ||
DiagnosticsStreamMessage diagnosticsStreamMessage = | ||
new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId, | ||
taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli()); | ||
taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli(), deploymentType, | ||
apiType, numContainers, containerMemoryMb, containerNumCores, containerThreadPoolSize, hostAffinityEnabled, | ||
sspGrouperFactory, containerRetryCount, containerRetryWindowMs, maxConcurrency, maxJvmHeapMb.intValue()); | ||
|
||
// Add job-related params to the message (if not already published) | ||
if (!jobParamsEmitted) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,12 +65,17 @@ public class DiagnosticsStreamMessage { | |
private final Map<String, Map<String, Object>> metricsMessage; | ||
|
||
public DiagnosticsStreamMessage(String jobName, String jobId, String containerName, String executionEnvContainerId, | ||
String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp) { | ||
String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp, | ||
String deploymentType, String apiType, int numContainers, int containerMemoryMb, int numCores, int threadPoolSize, | ||
boolean hostAffinityEnabled, String sspGrouperFactory, int containerRetryCount, | ||
long containerRetryWindowMs, int maxConcurrency, int maxJvmHeapMb) { | ||
|
||
// Create the metricHeader | ||
metricsHeader = | ||
new MetricsHeader(jobName, jobId, containerName, executionEnvContainerId, DiagnosticsManager.class.getName(), | ||
taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp); | ||
taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp, deploymentType, apiType, numContainers, | ||
containerMemoryMb, numCores, threadPoolSize, hostAffinityEnabled, sspGrouperFactory, | ||
containerRetryCount, containerRetryWindowMs, maxConcurrency, maxJvmHeapMb); | ||
|
||
this.metricsMessage = new HashMap<>(); | ||
} | ||
|
@@ -237,7 +242,13 @@ public static DiagnosticsStreamMessage convertToDiagnosticsStreamMessage(Metrics | |
metricsSnapshot.getHeader().getContainerName(), metricsSnapshot.getHeader().getExecEnvironmentContainerId(), | ||
metricsSnapshot.getHeader().getVersion(), metricsSnapshot.getHeader().getSamzaVersion(), | ||
metricsSnapshot.getHeader().getHost(), metricsSnapshot.getHeader().getTime(), | ||
metricsSnapshot.getHeader().getResetTime()); | ||
metricsSnapshot.getHeader().getResetTime(), metricsSnapshot.getHeader().getDeploymentType(), | ||
metricsSnapshot.getHeader().getApiType(), metricsSnapshot.getHeader().getNumContainers(), | ||
metricsSnapshot.getHeader().getContainerMemoryMb(), metricsSnapshot.getHeader().getContainerCpuCores(), | ||
metricsSnapshot.getHeader().getContainerThreadPoolSize(), metricsSnapshot.getHeader().getHostAffinity(), | ||
metricsSnapshot.getHeader().getSspGrouper(), metricsSnapshot.getHeader().getMaxContainerRetryCount(), | ||
metricsSnapshot.getHeader().getContainerRetryWindowMs(), metricsSnapshot.getHeader().getTaskMaxConcurrency(), | ||
metricsSnapshot.getHeader().getMaxJvmHeapMb()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a new separate constructor for DiagnosticsStreamMessage that takes a metricsHeader as input? |
||
|
||
Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap(); | ||
Map<String, Object> diagnosticsManagerGroupMap = metricsMap.get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SamzaAPI not a better name?