Skip to content

Commit

Permalink
Rename PortableJobFields inside MetricsHeader (apache#14)
Browse files Browse the repository at this point in the history
* Make naming consistent for PortabeJobFields

* rename portable job fields for consistency
  • Loading branch information
Aman Singh authored Nov 22, 2023
1 parent 71caa39 commit 415637a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class MetricsHeader {
private static final String TIME = "time";
private static final String RESET_TIME = "reset-time";
private static final String METRICS_SCHEMA_VERSION = "metrics-schema-version";
private static final String PORTABLE_JOB_DATA = "portable-job-data";
private static final String PORTABLE_JOB_FIELDS = "portable-job-fields";
private final String jobName;
private final String jobId;
private final String containerName;
Expand All @@ -53,7 +53,7 @@ public class MetricsHeader {
private final long time;
private final long resetTime;
private final Optional<Short> schemaVersion;
private final Optional<PortableJobFields> portableJobData;
private final Optional<PortableJobFields> portableJobFields;

public MetricsHeader(String jobName, String jobId, String containerName, String execEnvironmentContainerId,
String source, String version, String samzaVersion, String host, long time, long resetTime) {
Expand All @@ -70,7 +70,7 @@ public MetricsHeader(String jobName, String jobId, String containerName, String

public MetricsHeader(String jobName, String jobId, String containerName, String execEnvironmentContainerId,
Optional<String> samzaEpochId, String source, String version, String samzaVersion, String host, long time,
long resetTime, Optional<Short> schemaVersion, Optional<PortableJobFields> portableJobData) {
long resetTime, Optional<Short> schemaVersion, Optional<PortableJobFields> portableJobFields) {
this.jobName = jobName;
this.jobId = jobId;
this.containerName = containerName;
Expand All @@ -83,7 +83,7 @@ public MetricsHeader(String jobName, String jobId, String containerName, String
this.time = time;
this.resetTime = resetTime;
this.schemaVersion = schemaVersion;
this.portableJobData = portableJobData;
this.portableJobFields = portableJobFields;
}

public Map<String, Object> getAsMap() {
Expand All @@ -100,7 +100,7 @@ public Map<String, Object> getAsMap() {
map.put(TIME, time);
map.put(RESET_TIME, resetTime);
this.schemaVersion.ifPresent(schemaVersion -> map.put(METRICS_SCHEMA_VERSION, schemaVersion));
this.portableJobData.ifPresent(portableJobFields -> map.put(PORTABLE_JOB_DATA, portableJobFields));
this.portableJobFields.ifPresent(portableJobFields -> map.put(PORTABLE_JOB_FIELDS, portableJobFields));
return map;
}

Expand Down Expand Up @@ -154,13 +154,13 @@ public long getResetTime() {

public Optional<Short> getSchemaVersion() { return schemaVersion; }

public Optional<PortableJobFields> getPortableJobData() { return portableJobData; }
public Optional<PortableJobFields> getPortableJobFields() { return portableJobFields; }

public static MetricsHeader fromMap(Map<String, Object> map) {

Optional<PortableJobFields> portableJobFields = Optional.empty();
if (map.containsKey(PORTABLE_JOB_DATA)) {
portableJobFields = (Optional<PortableJobFields>) map.get(PORTABLE_JOB_DATA);
if (map.containsKey(PORTABLE_JOB_FIELDS)) {
portableJobFields = (Optional<PortableJobFields>) map.get(PORTABLE_JOB_FIELDS);
}

Optional<Short> schemaVersion = Optional.empty();
Expand Down Expand Up @@ -194,13 +194,14 @@ public boolean equals(Object o) {
execEnvironmentContainerId, that.execEnvironmentContainerId) && Objects.equals(samzaEpochId,
that.samzaEpochId) && Objects.equals(source, that.source) && Objects.equals(version, that.version)
&& Objects.equals(samzaVersion, that.samzaVersion) && Objects.equals(host, that.host)
&& Objects.equals(portableJobData, that.portableJobData);
&& Objects.equals(schemaVersion, that.schemaVersion)
&& Objects.equals(portableJobFields, that.portableJobFields);
}

@Override
public int hashCode() {
return Objects.hash(jobName, jobId, containerName, execEnvironmentContainerId, samzaEpochId, source,
version, samzaVersion, host, time, resetTime, portableJobData);
version, samzaVersion, host, time, resetTime, schemaVersion, portableJobFields);
}

@Override
Expand All @@ -209,7 +210,7 @@ public String toString() {
+ containerName + '\'' + ", execEnvironmentContainerId='" + execEnvironmentContainerId + '\''
+ ", samzaEpochId=" + samzaEpochId + ", source='" + source + '\'' + ", version='" + version + '\''
+ ", samzaVersion='" + samzaVersion + '\'' + ", host='" + host + '\'' + ", time=" + time + ", resetTime="
+ resetTime + ", portableJobData=" + portableJobData + '}';
+ resetTime + ", schemaVersion=" + schemaVersion + ", portableJobFields=" + portableJobFields + '}';
}

public static class PortableJobFields {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testGetAsMap() {
expected.put("time", TIME);
expected.put("reset-time", RESET_TIME);
expected.put("metrics-schema-version", schemaVersion);
expected.put("portable-job-data", portableJobFields);
expected.put("portable-job-fields", portableJobFields);
assertEquals(expected, metricsHeader.getAsMap());

// test with empty samza epoch id
Expand All @@ -82,7 +82,7 @@ public void testGetAsMap() {
SAMZA_VERSION, HOST, TIME, RESET_TIME, Optional.empty(), Optional.empty());

expected.remove("metrics-schema-version");
expected.remove("portable-job-data");
expected.remove("portable-job-fields");
assertEquals(expected, metricsHeader.getAsMap());
}

Expand All @@ -106,7 +106,7 @@ public void testFromMap() {
map.put("time", TIME);
map.put("reset-time", RESET_TIME);
map.put("metrics-schema-version", schemaVersion);
map.put("portable-job-data", portableJobFields);
map.put("portable-job-fields", portableJobFields);

MetricsHeader expected =
new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.of(SAMZA_EPOCH_ID),
Expand All @@ -125,7 +125,7 @@ public void testFromMap() {

// test with missing portable job data
map.remove("metrics-schema-version");
map.remove("portable-job-data");
map.remove("portable-job-fields");
expected =
new MetricsHeader(JOB_NAME, JOB_ID, CONTAINER_NAME, EXEC_ENV_CONTAINER_ID, Optional.empty(), SOURCE, VERSION,
SAMZA_VERSION, HOST, TIME, RESET_TIME, Optional.empty(), Optional.empty());
Expand Down

0 comments on commit 415637a

Please sign in to comment.