Skip to content

Commit

Permalink
Merge pull request #1251 from NASA-AMMOS/feat/cancel-scheduling
Browse files Browse the repository at this point in the history
Cancel a Scheduling Run
  • Loading branch information
Mythicaeda authored Dec 7, 2023
2 parents 2dff316 + f1f82b1 commit a2cc355
Show file tree
Hide file tree
Showing 42 changed files with 691 additions and 264 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
drop trigger notify_scheduling_workers_cancel on scheduling_request;
drop function notify_scheduling_workers_cancel();
call migrations.mark_migration_rolled_back('12');
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
create function notify_scheduling_workers_cancel()
returns trigger
security definer
language plpgsql as $$
begin
perform pg_notify('scheduling_cancel', '' || new.specification_id);
return null;
end
$$;

create trigger notify_scheduling_workers_cancel
after update of canceled on scheduling_request
for each row
when ((old.status != 'success' or old.status != 'failed') and new.canceled)
execute function notify_scheduling_workers_cancel();

call migrations.mark_migration_applied('12');
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import gov.nasa.jpl.aerie.e2e.types.ExternalDataset.ProfileInput.ProfileSegmentInput;
import gov.nasa.jpl.aerie.e2e.types.Plan;
import gov.nasa.jpl.aerie.e2e.types.ProfileSegment;
import gov.nasa.jpl.aerie.e2e.types.SchedulingRequest.SchedulingStatus;
import gov.nasa.jpl.aerie.e2e.types.ValueSchema;
import gov.nasa.jpl.aerie.e2e.utils.GatewayRequests;
import gov.nasa.jpl.aerie.e2e.utils.HasuraRequests;
Expand Down Expand Up @@ -712,4 +713,86 @@ void useExternalDatasetInGoal() throws IOException {
assertEquals(Json.createObjectBuilder().add("biteSize", 1).build(), activity.arguments());
}
}

@Nested
class CancelingScheduling {
private int fooId;
private int fooPlan;
private int fooSchedulingSpecId;
private int fooGoalId;

@BeforeEach
void beforeEach() throws IOException, InterruptedException {
// Insert the Mission Model
// Long Foo plans take long enough to be canceled without risking a race condition like with Banananation
try (final var gateway = new GatewayRequests(playwright)) {
fooId = hasura.createMissionModel(
gateway.uploadFooJar(),
"Foo (e2e tests)",
"aerie_e2e_tests",
"Simulation Tests");
}
// Insert the Plan
fooPlan = hasura.createPlan(
fooId,
"Foo Plan - Simulation Tests",
"720:00:00",
planStartTimestamp);

// Insert Scheduling Spec
fooSchedulingSpecId = hasura.insertSchedulingSpecification(
fooPlan,
hasura.getPlanRevision(fooPlan),
planStartTimestamp,
"2023-01-31T00:00:00+00:00",
JsonValue.EMPTY_JSON_OBJECT,
false);

// Add Goal
fooGoalId = hasura.insertSchedulingGoal(
"Foo Recurrence Test Goal",
fooId,
"""
export default function recurrenceGoalExample() {
return Goal.ActivityRecurrenceGoal({
activityTemplate: ActivityTemplates.bar(),
interval: Temporal.Duration.from({ hours: 2 }),
});
}""");
hasura.createSchedulingSpecGoal(fooGoalId, fooSchedulingSpecId, 0);
}

@AfterEach
void afterEach() throws IOException {
// Remove Goal
hasura.deleteSchedulingGoal(fooGoalId);

// Remove Model and Plan
hasura.deletePlan(fooPlan);
hasura.deleteMissionModel(fooId);
}

/**
* Cancelling a scheduling run causes the "Reason" field to report that the run was canceled
*/
@Test
void cancelingSchedulingUpdatesRequestReason() throws IOException {
final var results = hasura.cancelingScheduling(fooSchedulingSpecId);
// Assert that the run was incomplete
assertEquals(SchedulingStatus.incomplete, results.status());

// Assert reason
assertTrue(results.reason().isPresent());
final var reason = results.reason().get();
assertEquals("SCHEDULING_CANCELED", reason.type());
assertEquals("Scheduling run was canceled", reason.message());
assertEquals("", reason.trace());

// Assert the data in the reason
final var reasonData = reason.data();
assertEquals(2, reasonData.size());
assertTrue(reasonData.containsKey("location"));
assertEquals("Scheduling was interrupted while "+ reasonData.getString("location"), reasonData.getString("message"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ void simulationMicrosecondResolution() throws IOException {
assertDoesNotThrow(() -> hasura.awaitSimulation(planId));
}



@Nested
class TemporalSubsetSimulation {
private int firstHalfActivityId;
Expand Down Expand Up @@ -259,7 +257,7 @@ void beforeEach() throws IOException, InterruptedException {
// Long Foo plans take long enough to be canceled without risking a race condition like with Banananation
try (final var gateway = new GatewayRequests(playwright)) {
fooId = hasura.createMissionModel(
gateway.uploadJarFile("../examples/foo-missionmodel/build/libs/foo-missionmodel.jar"),
gateway.uploadFooJar(),
"Foo (e2e tests)",
"aerie_e2e_tests",
"Simulation Tests");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package gov.nasa.jpl.aerie.e2e.types;

import javax.json.JsonObject;
import java.util.Optional;

public record SchedulingRequest(
int analysisId,
int specificationId,
int specificationRevision,
SchedulingStatus status,
boolean canceled,
Optional<SchedulingReason> reason
) {
public enum SchedulingStatus { pending, incomplete, failed, success }

public record SchedulingReason(
String type,
String message,
String trace,
JsonObject data
)
{
public static SchedulingReason fromJSON(JsonObject json) {
return new SchedulingReason(
json.getString("type"),
json.getString("message"),
json.getString("trace"),
json.getJsonObject("data")
);
}
}

public static SchedulingRequest fromJSON(JsonObject json) {
return new SchedulingRequest(
json.getInt("analysis_id"),
json.getInt("specification_id"),
json.getInt("specification_revision"),
SchedulingStatus.valueOf(json.getString("status")),
json.getBoolean("canceled"),
json.isNull("reason") ? Optional.empty() : Optional.of(SchedulingReason.fromJSON(json.getJsonObject("reason")))
);
}
}
24 changes: 24 additions & 0 deletions e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/GQL.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ mutation AssignTemplateToSimulation($simulation_id: Int!, $simulation_template_i
simulation_template_id
}
}"""),
CANCEL_SCHEDULING("""
mutation cancelScheduling($analysis_id: Int!) {
update_scheduling_request(where: {analysis_id: {_eq: $analysis_id}}, _set: {canceled: true}) {
returning {
analysis_id
specification_id
specification_revision
canceled
reason
status
}
}
}"""),
CANCEL_SIMULATION("""
mutation cancelSimulation($id: Int!) {
update_simulation_dataset_by_pk(pk_columns: {id: $id}, _set: {canceled: true}) {
Expand Down Expand Up @@ -354,6 +367,17 @@ query GetSchedulingDslTypeScript($missionModelId: Int!, $planId: Int) {
}
}
}"""),
GET_SCHEDULING_REQUEST("""
query GetSchedulingRequest($specificationId: Int!, $specificationRev: Int!) {
scheduling_request_by_pk(specification_id: $specificationId, specification_revision: $specificationRev) {
specification_id
specification_revision
analysis_id
canceled
reason
status
}
}"""),
GET_SIMULATION_DATASET("""
query GetSimulationDataset($id: Int!) {
simulationDataset: simulation_dataset_by_pk(id: $id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public int uploadJarFile() throws IOException {
return uploadJarFile("../examples/banananation/build/libs/banananation.jar");
}

/**
* Uploads the Foo JAR
*/
public int uploadFooJar() throws IOException {
return uploadJarFile("../examples/foo-missionmodel/build/libs/foo-missionmodel.jar");
}

/**
* Uploads the JAR found at searchPath
* @param jarPath is relative to the e2e-tests directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.microsoft.playwright.options.RequestOptions;
import gov.nasa.jpl.aerie.e2e.types.*;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;

import javax.json.Json;
import javax.json.JsonArray;
Expand All @@ -21,6 +20,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Hasura API request functions
*/
Expand All @@ -33,7 +35,7 @@ public class HasuraRequests implements AutoCloseable {
public HasuraRequests(Playwright playwright) {
request = playwright.request().newContext(
new APIRequest.NewContextOptions()
.setBaseURL(BaseURL.HASURA.url));
.setBaseURL(BaseURL.HASURA.url).setTimeout(0));
}

@Override
Expand Down Expand Up @@ -289,7 +291,7 @@ public SimulationResponse awaitSimulation(int planId, int timeout) throws IOExce
case "complete" -> {
return response;
}
default -> throw new IOException("Simulation returned bad status " + response.status() + " with reason " +response.reason());
default -> fail("Simulation returned bad status " + response.status() + " with reason " +response.reason());
}
}
throw new TimeoutError("Simulation timed out after " + timeout + " seconds");
Expand Down Expand Up @@ -327,8 +329,8 @@ public SimulationDataset cancelingSimulation(int planId, int timeout) throws IOE
}
return cancelSimulation(response.simDatasetId(), timeout-i);
}
case "complete" -> Assertions.fail("Simulation completed before it could be canceled");
default -> throw new IOException("Simulation returned bad status " + response.status() + " with reason " +response.reason());
case "complete" -> fail("Simulation completed before it could be canceled");
default -> fail("Simulation returned bad status " + response.status() + " with reason " +response.reason());
}
}
throw new TimeoutError("Simulation timed out after " + timeout + " seconds");
Expand Down Expand Up @@ -383,6 +385,37 @@ private SchedulingResponse schedule(int schedulingSpecId) throws IOException {
return SchedulingResponse.fromJSON(data);
}

private SchedulingRequest cancelSchedulingRun(int analysisId, int timeout) throws IOException {
final var variables = Json.createObjectBuilder().add("analysis_id", analysisId).build();
//assert that we only canceled one task
final var cancelRequest = makeRequest(GQL.CANCEL_SCHEDULING, variables)
.getJsonObject("update_scheduling_request")
.getJsonArray("returning");
assertEquals(1, cancelRequest.size());
final int specId = cancelRequest.getJsonObject(0).getInt("specification_id");
final int specRev = cancelRequest.getJsonObject(0).getInt("specification_revision");
for(int i = 0; i <timeout; ++i) {
try {
Thread.sleep(1000); //1s
} catch (InterruptedException ex) {throw new RuntimeException(ex);}
final var response = getSchedulingRequest(specId, specRev);
// If reason is present, that means that the scheduler has posted
// and we are not just seeing the side effects of `GQL.CANCEL_SCHEDULING`
if(response.canceled() && response.reason().isPresent()) return response;
}
throw new TimeoutError("Canceling scheduling timed out after " + timeout + " seconds");
}

private SchedulingRequest getSchedulingRequest(int specificationId, int specificationRevision) throws IOException {
final var variables = Json.createObjectBuilder()
.add("specificationId", specificationId)
.add("specificationRev", specificationRevision)
.build();
final var data = makeRequest(GQL.GET_SCHEDULING_REQUEST, variables).getJsonObject("scheduling_request_by_pk");
return SchedulingRequest.fromJSON(data);
}


/**
* Run scheduling on the specified scheduling specification with a timeout of 30 seconds
*/
Expand All @@ -408,12 +441,47 @@ public SchedulingResponse awaitScheduling(int schedulingSpecId, int timeout) thr
case "complete" -> {
return response;
}
default -> throw new IOException("Scheduling returned bad status " + response.status() + " with reason " +response.reason());
default -> fail("Scheduling returned bad status " + response.status() + " with reason " +response.reason());
}
}
throw new TimeoutError("Scheduling timed out after " + timeout + " seconds");
}

/**
* Start and immediately cancel a scheduling run with a timeout of 30 seconds
* @param schedulingSpecId the scheduling specification to use
*
*/
public SchedulingRequest cancelingScheduling(int schedulingSpecId) throws IOException {
return cancelingScheduling(schedulingSpecId, 30);
}

/**
* Start and immediately cancel a scheduling run with a set timeout
* @param schedulingSpecId the scheduling specification to use
* @param timeout the length of the timeout, in seconds
*/
public SchedulingRequest cancelingScheduling(int schedulingSpecId, int timeout) throws IOException {
for(int i = 0; i < timeout; ++i) {
final var response = schedule(schedulingSpecId);
switch (response.status()) {
case "pending" -> {
try {
Thread.sleep(1000); //1s
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
case "incomplete" -> {
return cancelSchedulingRun(response.analysisId(), timeout - i);
}
case "complete" -> fail("Scheduling completed before it could be canceled");
default -> fail("Scheduling returned bad status " + response.status() + " with reason " +response.reason());
}
}
throw new TimeoutError("Scheduling timed out after " + timeout + " seconds");
}

public int insertSchedulingGoal(String name, int modelId, String definition) throws IOException {
return insertSchedulingGoal(name, modelId, definition, "");
}
Expand Down
Loading

0 comments on commit a2cc355

Please sign in to comment.