-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-2561: Add job features to MetricsHeader #1402
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, Minor comments
|
||
package org.apache.samza.metrics; | ||
|
||
public enum ApiType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SamzaAPI not a better name?
public static String getApiType(Config config) { | ||
ApplicationConfig appConfig = new ApplicationConfig(config); | ||
String appClass = appConfig.getAppClass(); | ||
if (appClass == null || appClass.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this ever true? for legacy task class applications?
import org.apache.samza.config.TaskConfig; | ||
import org.apache.samza.metrics.ApiType; | ||
import org.apache.samza.metrics.DeploymentType; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
||
public class Util { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to your change but JobInfoUtil is a better name for this Util class rather than just Util
@@ -59,6 +59,14 @@ | |||
private int numPersistentStores = 2; | |||
private int containerNumCores = 2; | |||
private boolean autosizingEnabled = false; | |||
private String deploymentType = "test deployment type"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider adding valid values for deployType and API type, to unit test the util functions you just added
@@ -252,7 +264,17 @@ private void validateMetricsHeader(OutgoingMessageEnvelope outgoingMessageEnvelo | |||
Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion); | |||
Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname); | |||
Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName()); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revert whitespace change
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.samza.metrics; | ||
|
||
public enum DeploymentType { | ||
YARN, STANDALONE | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single line enums like these can likely be added to the metrics-header class, since that seems like the only place theyre used no?
Same for API type above.
public static int getContainerCount(Config config) { | ||
JobConfig jobConfig = new JobConfig(config); | ||
return jobConfig.getContainerCount(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why even have it as a util?
Could the caller not cast the "config" object into JobConfig, TaskConfig and ClusterManagerConfig once and invoke the respective function, rather than do a cast in each separate util.
Same for getContainerMemoryMb, getNumCores, getThreadPoolSize, getSspGrouperFactory, getHostAffinityEnabled, getContainerRetryCount, getContainerRetryWindowMs, getMaxConcurrency.
Then none of these "util" methods need to be there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
public static int getMaxJvmHeapMb() { | ||
Long maxJvmHeapMb = Runtime.getRuntime().maxMemory() / (1024 * 1024); | ||
return maxJvmHeapMb.intValue(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can DiagnosticsUtil.buildDiagnosticsManager also invoke this util method, instead of calling Runtime.getRuntime().maxMemory directly ?
Duration terminationDuration, boolean autosizingEnabled) { | ||
Duration terminationDuration, | ||
boolean autosizingEnabled, | ||
String deploymentType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should deploymentType and api type be enums, since we defined an enum for them above?
metricsSnapshot.getHeader().getContainerThreadPoolSize(), metricsSnapshot.getHeader().getHostAffinity(), | ||
metricsSnapshot.getHeader().getSspGrouper(), metricsSnapshot.getHeader().getMaxContainerRetryCount(), | ||
metricsSnapshot.getHeader().getContainerRetryWindowMs(), metricsSnapshot.getHeader().getTaskMaxConcurrency(), | ||
metricsSnapshot.getHeader().getMaxJvmHeapMb()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a new separate constructor for DiagnosticsStreamMessage that takes a metricsHeader as input?
serde, | ||
blacklist, | ||
Util.getDeploymentType(config), | ||
Util.getApiType(config), | ||
Util.getContainerCount(config), | ||
Util.getContainerMemoryMb(config), | ||
Util.getNumCores(config), | ||
Util.getThreadPoolSize(config), | ||
Util.getHostAffinityEnabled(config), | ||
Util.getSspGrouperFactory(config), | ||
Util.getContainerRetryCount(config), | ||
Util.getContainerRetryWindowMs(config), | ||
Util.getMaxConcurrency(config), | ||
Util.getMaxJvmHeapMb) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please maintain the existing ordering of passing in the parameters, then the serde and then the blacklist.
Similar for MetricsHeader above, where time fields are passed in last.
@@ -59,6 +59,14 @@ | |||
private int numPersistentStores = 2; | |||
private int containerNumCores = 2; | |||
private boolean autosizingEnabled = false; | |||
private String deploymentType = "test deployment type"; | |||
private String apiType = "test api type"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasnt this an enum ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took an initial pass, requires some simplification and cleanup.
Since a majority of the parameters are derived from config, in the interest of easy future extensibility, it'd be better to simply emit the entire config object once, from DiagnosticsManager at container-startup. |
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
||
public class Util { | ||
private static final Logger LOG = LoggerFactory.getLogger(Util.class); | ||
private static final String YARN_JOB_FACTORY_CLASS = "org.apache.samza.job.yarn.YarnJobFactory"; | ||
private static final String BEAM_RUNNER_CLASS = "org.apache.beam.runners.samza.SamzaRunner"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually needed given
#1185 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment about #1185
Issues: Currently, the MetricsHeader object emitted by the SamzaContainer does not exclude basic job level information.
Changes: Added a few features of the job to be emitted by the MetricsHeader
API Changes: With this change, the MetricsHeader class will emit other properties of the job like number of containers used, number of cores used, etc.
Upgrade instructions: None
Usage instructions: None
Tests: Modified existing tests to work with this change.