Skip to content

Commit

Permalink
Fix getJobStatus to correctly serialize the state changes (#388)
Browse files Browse the repository at this point in the history
* Correctly serialize and deserialize the statusChanges field

* Use escaped JSON instead when returning from getJobStatus
  • Loading branch information
burmanm authored Sep 22, 2023
1 parent 6b965aa commit 40cef22
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import com.datastax.oss.driver.api.querybuilder.schema.OngoingPartitionKey;
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.DataTypeCqlNameParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -94,6 +97,23 @@ public Map<String, String> getJobStatus(@RpcParam(name = "job_id") String jobId)
resultMap.put("error", jobWithId.getError().getLocalizedMessage());
}

List<Map<String, String>> statusChanges = new ArrayList<>();
for (Job.StatusChange statusChange : jobWithId.getStatusChanges()) {
Map<String, String> change = Maps.newHashMap();
change.put("status", statusChange.getStatus().name());
change.put("change_time", Long.valueOf(statusChange.getChangeTime()).toString());
change.put("message", statusChange.getMessage());
statusChanges.add(change);
}

ObjectMapper objectMapper = new ObjectMapper();
try {
String s = objectMapper.writeValueAsString(statusChanges);
resultMap.put("status_changes", s);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

return resultMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class Job implements Serializable {
Expand Down Expand Up @@ -35,7 +40,7 @@ public enum JobStatus {
@JsonProperty(value = "error")
private String error;

public class StatusChange {
public static class StatusChange {
@JsonProperty(value = "status")
String status;

Expand All @@ -45,8 +50,12 @@ public class StatusChange {
@JsonProperty(value = "message")
String message;

public StatusChange(String type, String message) {
changeTime = System.currentTimeMillis();
@JsonCreator
public StatusChange(
@JsonProperty(value = "status") String type,
@JsonProperty(value = "change_time") String time,
@JsonProperty(value = "message") String message) {
changeTime = Long.parseLong(time);
status = type;
this.message = message;
}
Expand Down Expand Up @@ -75,14 +84,32 @@ public Job(
@JsonProperty(value = "submit_time") long submitTime,
@JsonProperty(value = "end_time") long finishedTime,
@JsonProperty(value = "error") String error,
@JsonProperty(value = "status_changes") List<StatusChange> changes) {
@JsonProperty(value = "status_changes") String changes) {
this.jobId = jobId;
this.jobType = jobType;
this.status = JobStatus.valueOf(status);
this.submitTime = submitTime;
this.finishedTime = finishedTime;
this.error = error;
this.statusChanges = changes;
this.statusChanges = changes(changes);
}

public List<StatusChange> changes(String s) {
ObjectMapper objectMapper = new ObjectMapper();
if (s.length() < 2) {
return new ArrayList<>();
}
try {
JsonNode parent = objectMapper.readTree(s);

List<StatusChange> changes =
objectMapper.readValue(parent.traverse(), new TypeReference<List<StatusChange>>() {});

return changes;

} catch (IOException e) {
throw new RuntimeException(e);
}
}

public String getJobId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -585,13 +587,28 @@ public void testJobStatus() throws Exception {

when(mockResultSet.one()).thenReturn(mockRow);

Map<String, String> jobDetailsRow = new HashMap<>();
Map<String, Object> jobDetailsRow = new HashMap<>();
jobDetailsRow.put("id", "0fe65b47-98c2-47d8-9c3c-5810c9988e10");
jobDetailsRow.put("type", "CLEANUP");
jobDetailsRow.put("status", "COMPLETED");
jobDetailsRow.put("submit_time", String.valueOf(System.currentTimeMillis()));
jobDetailsRow.put("end_time", String.valueOf(System.currentTimeMillis()));

List<Map<String, String>> statusChanges = new ArrayList<>();
Map<String, String> change = Maps.newHashMap();
change.put("status", "SUCCESS");
change.put("change_time", "1695183696663");
change.put("message", "No message");
statusChanges.add(change);

ObjectMapper objectMapper = new ObjectMapper();
try {
String s = objectMapper.writeValueAsString(statusChanges);
jobDetailsRow.put("status_changes", s);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

when(mockRow.getObject(0)).thenReturn(jobDetailsRow);

MockHttpResponse response =
Expand All @@ -609,6 +626,8 @@ public void testJobStatus() throws Exception {
assertEquals("0fe65b47-98c2-47d8-9c3c-5810c9988e10", jobDetails.getJobId());
assertEquals("COMPLETED", jobDetails.getStatus().toString());
assertEquals("CLEANUP", jobDetails.getJobType());
assertEquals(1, jobDetails.getStatusChanges().size());
assertEquals("SUCCESS", jobDetails.getStatusChanges().get(0).getStatus());
}

@Test
Expand Down

0 comments on commit 40cef22

Please sign in to comment.