diff --git a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java index b51713e771..2cf23d0897 100644 --- a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java +++ b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java @@ -26,9 +26,11 @@ import java.time.Instant; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; public class ResumableSimulationDriver<Model> implements AutoCloseable { @@ -52,6 +54,9 @@ public class ResumableSimulationDriver<Model> implements AutoCloseable { //mapping each activity name to its task id (in String form) in the simulation engine private final Map<ActivityDirectiveId, TaskId> plannedDirectiveToTask; + //subset of plannedDirectiveToTask to check for scheduling dependent tasks + private final Map<ActivityDirectiveId, TaskId> toCheckForDependencyScheduling; + //simulation results so far private SimulationResults lastSimResults; //cached simulation results cover the period [Duration.ZERO, lastSimResultsEnd] @@ -71,6 +76,7 @@ public ResumableSimulationDriver( ){ this.missionModel = missionModel; plannedDirectiveToTask = new HashMap<>(); + toCheckForDependencyScheduling = new HashMap<>(); this.planDuration = planDuration; countSimulationRestarts = 0; this.canceledListener = canceledListener; @@ -94,6 +100,7 @@ private void printTimeSpent(){ printTimeSpent(); durationSinceRestart = 0; plannedDirectiveToTask.clear(); + toCheckForDependencyScheduling.clear(); lastSimResults = null; lastSimResultsEnd = Duration.ZERO; long before = System.nanoTime(); @@ -273,13 +280,14 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective> schedule).compute(); // Filter out activities that are before the plan start resolved = StartOffsetReducer.filterOutNegativeStartOffset(resolved); - + final var toSchedule = new HashSet<ActivityDirectiveId>(); + toSchedule.add(null); scheduleActivities( + toSchedule, schedule, resolved, missionModel, - engine, - activityTopic + engine ); var allTaskFinished = false; @@ -303,6 +311,8 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective> final var commit = engine.performJobs(batch.jobs(), cells, curTime, Duration.MAX_VALUE); timeline.add(commit); + scheduleActivities(getSuccessorsToSchedule(engine), schedule, resolved, missionModel, engine); + // all tasks are complete : do not exit yet, there might be event triggered at the same time if (!plannedDirectiveToTask.isEmpty() && plannedDirectiveToTask .values() @@ -328,93 +338,65 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective> * @return its duration if the activity has been simulated and has finished simulating, an IllegalArgumentException otherwise */ public Optional<Duration> getActivityDuration(ActivityDirectiveId activityDirectiveId){ + //potential cause of non presence: (1) activity is outside plan bounds (2) activity has not been simulated yet + if(!plannedDirectiveToTask.containsKey(activityDirectiveId)) return Optional.empty(); return engine.getTaskDuration(plannedDirectiveToTask.get(activityDirectiveId)); } + private Set<ActivityDirectiveId> getSuccessorsToSchedule(final SimulationEngine engine) { + final var toSchedule = new HashSet<ActivityDirectiveId>(); + final var iterator = toCheckForDependencyScheduling.entrySet().iterator(); + while(iterator.hasNext()){ + final var taskToCheck = iterator.next(); + if(engine.isTaskComplete(taskToCheck.getValue())){ + toSchedule.add(taskToCheck.getKey()); + iterator.remove(); + } + } + return toSchedule; + } + private void scheduleActivities( - final Map<ActivityDirectiveId, ActivityDirective> schedule, + final Set<ActivityDirectiveId> toScheduleNow, + final Map<ActivityDirectiveId, ActivityDirective> completeSchedule, final HashMap<ActivityDirectiveId, List<Pair<ActivityDirectiveId, Duration>>> resolved, final MissionModel<Model> missionModel, - final SimulationEngine engine, - final Topic<ActivityDirectiveId> activityTopic - ) - { - if(resolved.get(null) == null) { return; } // Nothing to simulate - - for (final Pair<ActivityDirectiveId, Duration> directivePair : resolved.get(null)) { - final var directiveId = directivePair.getLeft(); - final var startOffset = directivePair.getRight(); - final var serializedDirective = schedule.get(directiveId).serializedActivity(); - - final TaskFactory<?> task; - try { - task = missionModel.getTaskFactory(serializedDirective); - } catch (final InstantiationException ex) { - // All activity instantiations are assumed to be validated by this point - throw new Error("Unexpected state: activity instantiation %s failed with: %s" - .formatted(serializedDirective.getTypeName(), ex.toString())); + final SimulationEngine engine){ + for(final var predecessor: toScheduleNow) { + for (final var directivePair : resolved.get(predecessor)) { + final var offset = directivePair.getRight(); + final var directiveIdToSchedule = directivePair.getLeft(); + final var serializedDirective = completeSchedule.get(directiveIdToSchedule).serializedActivity(); + final TaskFactory<?> task; + try { + task = missionModel.getTaskFactory(serializedDirective); + } catch (final InstantiationException ex) { + // All activity instantiations are assumed to be validated by this point + throw new Error("Unexpected state: activity instantiation %s failed with: %s" + .formatted(serializedDirective.getTypeName(), ex.toString())); + } + Duration computedStartTime = offset; + if (predecessor != null) { + computedStartTime = (curTime.isEqualTo(Duration.MIN_VALUE) ? Duration.ZERO : curTime).plus(offset); + } + final var taskId = engine.scheduleTask( + computedStartTime, + makeTaskFactory(directiveIdToSchedule, task, activityTopic)); + plannedDirectiveToTask.put(directiveIdToSchedule, taskId); + if (resolved.containsKey(directiveIdToSchedule)) { + toCheckForDependencyScheduling.put(directiveIdToSchedule, taskId); + } } - - final var taskId = engine.scheduleTask(startOffset, makeTaskFactory( - directiveId, - task, - schedule, - resolved, - missionModel, - activityTopic - )); - plannedDirectiveToTask.put(directiveId,taskId); } } - private static <Model, Output> TaskFactory<Unit> makeTaskFactory( + private static <Output> TaskFactory<Output> makeTaskFactory( final ActivityDirectiveId directiveId, final TaskFactory<Output> task, - final Map<ActivityDirectiveId, ActivityDirective> schedule, - final HashMap<ActivityDirectiveId, List<Pair<ActivityDirectiveId, Duration>>> resolved, - final MissionModel<Model> missionModel, - final Topic<ActivityDirectiveId> activityTopic - ) - { - // Emit the current activity (defined by directiveId) - return executor -> scheduler0 -> TaskStatus.calling((TaskFactory<Output>) (executor1 -> scheduler1 -> { - scheduler1.emit(directiveId, activityTopic); - return task.create(executor1).step(scheduler1); - }), scheduler2 -> { - // When the current activity finishes, get the list of the activities that needed this activity to finish to know their start time - final List<Pair<ActivityDirectiveId, Duration>> dependents = resolved.get(directiveId) == null ? List.of() : resolved.get(directiveId); - // Iterate over the dependents - for (final var dependent : dependents) { - scheduler2.spawn(executor2 -> scheduler3 -> - // Delay until the dependent starts - TaskStatus.delayed(dependent.getRight(), scheduler4 -> { - final var dependentDirectiveId = dependent.getLeft(); - final var serializedDependentDirective = schedule.get(dependentDirectiveId).serializedActivity(); - - // Initialize the Task for the dependent - final TaskFactory<?> dependantTask; - try { - dependantTask = missionModel.getTaskFactory(serializedDependentDirective); - } catch (final InstantiationException ex) { - // All activity instantiations are assumed to be validated by this point - throw new Error("Unexpected state: activity instantiation %s failed with: %s" - .formatted(serializedDependentDirective.getTypeName(), ex.toString())); - } - - // Schedule the dependent - // When it finishes, it will schedule the activities depending on it to know their start time - scheduler4.spawn(makeTaskFactory( - dependentDirectiveId, - dependantTask, - schedule, - resolved, - missionModel, - activityTopic - )); - return TaskStatus.completed(Unit.UNIT); - })); - } - return TaskStatus.completed(Unit.UNIT); - }); + final Topic<ActivityDirectiveId> activityTopic) { + return executor -> scheduler -> { + scheduler.emit(directiveId, activityTopic); + return task.create(executor).step(scheduler); + }; } } diff --git a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/solver/PrioritySolver.java b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/solver/PrioritySolver.java index f5a81bdfd8..9fe3b6f2d4 100644 --- a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/solver/PrioritySolver.java +++ b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/solver/PrioritySolver.java @@ -1028,7 +1028,7 @@ public Duration valueAt(Duration start, final EquationSolvingAlgorithms.History< if(computedDuration.isPresent()) { history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, start.plus(computedDuration.get())), new ActivityMetadata(actToSim)); } else{ - logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity."); + logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity or activity outside plan bounds."); history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, null), new ActivityMetadata(actToSim)); } } catch (SimulationFacade.SimulationException e) { diff --git a/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java b/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java index efc0353705..24f668ef9f 100644 --- a/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java +++ b/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java @@ -339,6 +339,21 @@ public void activitiesAnchoredToOtherActivities() throws SchedulingInterruptedEx assertEquals(1, driver.getCountSimulationRestarts()); } + @Test + @DisplayName("Reference to anchored activities are correctly maintained by the driver") + public void activitiesAnchoredToOtherActivitiesSimple() throws SchedulingInterruptedException { + final var activitiesToSimulate = new HashMap<ActivityDirectiveId, ActivityDirective>(2); + activitiesToSimulate.put( + new ActivityDirectiveId(0), + new ActivityDirective(oneMinute, serializedDelayDirective, null, true)); + activitiesToSimulate.put( + new ActivityDirectiveId(1), + new ActivityDirective(oneMinute, serializedDelayDirective, new ActivityDirectiveId(0), false)); + driver.simulateActivities(activitiesToSimulate); + final var durationOfAnchoredActivity = driver.getActivityDuration(new ActivityDirectiveId(1)); + assertTrue(durationOfAnchoredActivity.isPresent()); + } + @Test @DisplayName("Decomposition and anchors do not interfere with each other") public void decomposingActivitiesAndAnchors() throws SchedulingInterruptedException{ diff --git a/scheduler-worker/src/test/java/gov/nasa/jpl/aerie/scheduler/worker/services/SchedulingIntegrationTests.java b/scheduler-worker/src/test/java/gov/nasa/jpl/aerie/scheduler/worker/services/SchedulingIntegrationTests.java index b8691dbd29..fd5731ff8f 100644 --- a/scheduler-worker/src/test/java/gov/nasa/jpl/aerie/scheduler/worker/services/SchedulingIntegrationTests.java +++ b/scheduler-worker/src/test/java/gov/nasa/jpl/aerie/scheduler/worker/services/SchedulingIntegrationTests.java @@ -2695,15 +2695,14 @@ void testRelativeActivityPlanPositiveStartOffsetStart() { new ActivityDirectiveId(2L), new ActivityDirective( tenMinutes, - "GrowBanana", + "PickBanana", Map.of( - "quantity", SerializedValue.of(1), - "growingDuration", SerializedValue.of(activityDuration.in(Duration.MICROSECONDS))), + "quantity", SerializedValue.of(1)), new ActivityDirectiveId(1L), true)), List.of(new SchedulingGoal(new GoalId(0L), """ export default () => Goal.CoexistenceGoal({ - forEach: ActivityExpression.ofType(ActivityTypes.GrowBanana), + forEach: ActivityExpression.ofType(ActivityTypes.PickBanana), activityTemplate: ActivityTemplates.PeelBanana({peelDirection: "fromStem"}), startsAt: TimingConstraint.singleton(WindowProperty.START).plus(Temporal.Duration.from({ minutes : 5})) }) @@ -2724,20 +2723,20 @@ export default () => Goal.CoexistenceGoal({ final var planByActivityType = partitionByActivityType(results.updatedPlan()); final var peelBananas = planByActivityType.get("PeelBanana"); - final var growBananas = planByActivityType.get("GrowBanana"); + final var pickBananas = planByActivityType.get("PickBanana"); final var durationParamActivities = planByActivityType.get("DurationParameterActivity"); assertEquals(1, peelBananas.size()); - assertEquals(1, growBananas.size()); + assertEquals(1, pickBananas.size()); assertEquals(1, durationParamActivities.size()); final var peelBanana = peelBananas.iterator().next(); - final var growBanana = growBananas.iterator().next(); + final var pickBanana = pickBananas.iterator().next(); final var durationParamActivity = durationParamActivities.iterator().next(); assertEquals(Duration.ZERO, durationParamActivity.startOffset()); - assertEquals(tenMinutes, growBanana.startOffset()); - assertEquals(SerializedValue.of(1), growBanana.serializedActivity().getArguments().get("quantity")); + assertEquals(tenMinutes, pickBanana.startOffset()); + assertEquals(SerializedValue.of(1), pickBanana.serializedActivity().getArguments().get("quantity")); assertEquals(Duration.of(15, Duration.MINUTES), peelBanana.startOffset()); assertEquals(SerializedValue.of("fromStem"), peelBanana.serializedActivity().getArguments().get("peelDirection"));