Skip to content

Commit

Permalink
Merge pull request #1262 from NASA-AMMOS/fix-resumable-driver-anchors
Browse files Browse the repository at this point in the history
Handle task scheduling in ResumableSimulationDriver
  • Loading branch information
adrienmaillard authored Dec 20, 2023
2 parents 8f3d9bb + 6ebef0a commit 73b3ece
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.time.Instant;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

public class ResumableSimulationDriver<Model> implements AutoCloseable {
Expand All @@ -52,6 +54,9 @@ public class ResumableSimulationDriver<Model> implements AutoCloseable {
//mapping each activity name to its task id (in String form) in the simulation engine
private final Map<ActivityDirectiveId, TaskId> plannedDirectiveToTask;

//subset of plannedDirectiveToTask to check for scheduling dependent tasks
private final Map<ActivityDirectiveId, TaskId> toCheckForDependencyScheduling;

//simulation results so far
private SimulationResults lastSimResults;
//cached simulation results cover the period [Duration.ZERO, lastSimResultsEnd]
Expand All @@ -71,6 +76,7 @@ public ResumableSimulationDriver(
){
this.missionModel = missionModel;
plannedDirectiveToTask = new HashMap<>();
toCheckForDependencyScheduling = new HashMap<>();
this.planDuration = planDuration;
countSimulationRestarts = 0;
this.canceledListener = canceledListener;
Expand All @@ -94,6 +100,7 @@ private void printTimeSpent(){
printTimeSpent();
durationSinceRestart = 0;
plannedDirectiveToTask.clear();
toCheckForDependencyScheduling.clear();
lastSimResults = null;
lastSimResultsEnd = Duration.ZERO;
long before = System.nanoTime();
Expand Down Expand Up @@ -273,13 +280,14 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective>
schedule).compute();
// Filter out activities that are before the plan start
resolved = StartOffsetReducer.filterOutNegativeStartOffset(resolved);

final var toSchedule = new HashSet<ActivityDirectiveId>();
toSchedule.add(null);
scheduleActivities(
toSchedule,
schedule,
resolved,
missionModel,
engine,
activityTopic
engine
);

var allTaskFinished = false;
Expand All @@ -303,6 +311,8 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective>
final var commit = engine.performJobs(batch.jobs(), cells, curTime, Duration.MAX_VALUE);
timeline.add(commit);

scheduleActivities(getSuccessorsToSchedule(engine), schedule, resolved, missionModel, engine);

// all tasks are complete : do not exit yet, there might be event triggered at the same time
if (!plannedDirectiveToTask.isEmpty() && plannedDirectiveToTask
.values()
Expand All @@ -328,93 +338,65 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective>
* @return its duration if the activity has been simulated and has finished simulating, an IllegalArgumentException otherwise
*/
public Optional<Duration> getActivityDuration(ActivityDirectiveId activityDirectiveId){
//potential cause of non presence: (1) activity is outside plan bounds (2) activity has not been simulated yet
if(!plannedDirectiveToTask.containsKey(activityDirectiveId)) return Optional.empty();
return engine.getTaskDuration(plannedDirectiveToTask.get(activityDirectiveId));
}

private Set<ActivityDirectiveId> getSuccessorsToSchedule(final SimulationEngine engine) {
final var toSchedule = new HashSet<ActivityDirectiveId>();
final var iterator = toCheckForDependencyScheduling.entrySet().iterator();
while(iterator.hasNext()){
final var taskToCheck = iterator.next();
if(engine.isTaskComplete(taskToCheck.getValue())){
toSchedule.add(taskToCheck.getKey());
iterator.remove();
}
}
return toSchedule;
}

private void scheduleActivities(
final Map<ActivityDirectiveId, ActivityDirective> schedule,
final Set<ActivityDirectiveId> toScheduleNow,
final Map<ActivityDirectiveId, ActivityDirective> completeSchedule,
final HashMap<ActivityDirectiveId, List<Pair<ActivityDirectiveId, Duration>>> resolved,
final MissionModel<Model> missionModel,
final SimulationEngine engine,
final Topic<ActivityDirectiveId> activityTopic
)
{
if(resolved.get(null) == null) { return; } // Nothing to simulate

for (final Pair<ActivityDirectiveId, Duration> directivePair : resolved.get(null)) {
final var directiveId = directivePair.getLeft();
final var startOffset = directivePair.getRight();
final var serializedDirective = schedule.get(directiveId).serializedActivity();

final TaskFactory<?> task;
try {
task = missionModel.getTaskFactory(serializedDirective);
} catch (final InstantiationException ex) {
// All activity instantiations are assumed to be validated by this point
throw new Error("Unexpected state: activity instantiation %s failed with: %s"
.formatted(serializedDirective.getTypeName(), ex.toString()));
final SimulationEngine engine){
for(final var predecessor: toScheduleNow) {
for (final var directivePair : resolved.get(predecessor)) {
final var offset = directivePair.getRight();
final var directiveIdToSchedule = directivePair.getLeft();
final var serializedDirective = completeSchedule.get(directiveIdToSchedule).serializedActivity();
final TaskFactory<?> task;
try {
task = missionModel.getTaskFactory(serializedDirective);
} catch (final InstantiationException ex) {
// All activity instantiations are assumed to be validated by this point
throw new Error("Unexpected state: activity instantiation %s failed with: %s"
.formatted(serializedDirective.getTypeName(), ex.toString()));
}
Duration computedStartTime = offset;
if (predecessor != null) {
computedStartTime = (curTime.isEqualTo(Duration.MIN_VALUE) ? Duration.ZERO : curTime).plus(offset);
}
final var taskId = engine.scheduleTask(
computedStartTime,
makeTaskFactory(directiveIdToSchedule, task, activityTopic));
plannedDirectiveToTask.put(directiveIdToSchedule, taskId);
if (resolved.containsKey(directiveIdToSchedule)) {
toCheckForDependencyScheduling.put(directiveIdToSchedule, taskId);
}
}

final var taskId = engine.scheduleTask(startOffset, makeTaskFactory(
directiveId,
task,
schedule,
resolved,
missionModel,
activityTopic
));
plannedDirectiveToTask.put(directiveId,taskId);
}
}

private static <Model, Output> TaskFactory<Unit> makeTaskFactory(
private static <Output> TaskFactory<Output> makeTaskFactory(
final ActivityDirectiveId directiveId,
final TaskFactory<Output> task,
final Map<ActivityDirectiveId, ActivityDirective> schedule,
final HashMap<ActivityDirectiveId, List<Pair<ActivityDirectiveId, Duration>>> resolved,
final MissionModel<Model> missionModel,
final Topic<ActivityDirectiveId> activityTopic
)
{
// Emit the current activity (defined by directiveId)
return executor -> scheduler0 -> TaskStatus.calling((TaskFactory<Output>) (executor1 -> scheduler1 -> {
scheduler1.emit(directiveId, activityTopic);
return task.create(executor1).step(scheduler1);
}), scheduler2 -> {
// When the current activity finishes, get the list of the activities that needed this activity to finish to know their start time
final List<Pair<ActivityDirectiveId, Duration>> dependents = resolved.get(directiveId) == null ? List.of() : resolved.get(directiveId);
// Iterate over the dependents
for (final var dependent : dependents) {
scheduler2.spawn(executor2 -> scheduler3 ->
// Delay until the dependent starts
TaskStatus.delayed(dependent.getRight(), scheduler4 -> {
final var dependentDirectiveId = dependent.getLeft();
final var serializedDependentDirective = schedule.get(dependentDirectiveId).serializedActivity();

// Initialize the Task for the dependent
final TaskFactory<?> dependantTask;
try {
dependantTask = missionModel.getTaskFactory(serializedDependentDirective);
} catch (final InstantiationException ex) {
// All activity instantiations are assumed to be validated by this point
throw new Error("Unexpected state: activity instantiation %s failed with: %s"
.formatted(serializedDependentDirective.getTypeName(), ex.toString()));
}

// Schedule the dependent
// When it finishes, it will schedule the activities depending on it to know their start time
scheduler4.spawn(makeTaskFactory(
dependentDirectiveId,
dependantTask,
schedule,
resolved,
missionModel,
activityTopic
));
return TaskStatus.completed(Unit.UNIT);
}));
}
return TaskStatus.completed(Unit.UNIT);
});
final Topic<ActivityDirectiveId> activityTopic) {
return executor -> scheduler -> {
scheduler.emit(directiveId, activityTopic);
return task.create(executor).step(scheduler);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ public Duration valueAt(Duration start, final EquationSolvingAlgorithms.History<
if(computedDuration.isPresent()) {
history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, start.plus(computedDuration.get())), new ActivityMetadata(actToSim));
} else{
logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity.");
logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity or activity outside plan bounds.");
history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, null), new ActivityMetadata(actToSim));
}
} catch (SimulationFacade.SimulationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,21 @@ public void activitiesAnchoredToOtherActivities() throws SchedulingInterruptedEx
assertEquals(1, driver.getCountSimulationRestarts());
}

@Test
@DisplayName("Reference to anchored activities are correctly maintained by the driver")
public void activitiesAnchoredToOtherActivitiesSimple() throws SchedulingInterruptedException {
final var activitiesToSimulate = new HashMap<ActivityDirectiveId, ActivityDirective>(2);
activitiesToSimulate.put(
new ActivityDirectiveId(0),
new ActivityDirective(oneMinute, serializedDelayDirective, null, true));
activitiesToSimulate.put(
new ActivityDirectiveId(1),
new ActivityDirective(oneMinute, serializedDelayDirective, new ActivityDirectiveId(0), false));
driver.simulateActivities(activitiesToSimulate);
final var durationOfAnchoredActivity = driver.getActivityDuration(new ActivityDirectiveId(1));
assertTrue(durationOfAnchoredActivity.isPresent());
}

@Test
@DisplayName("Decomposition and anchors do not interfere with each other")
public void decomposingActivitiesAndAnchors() throws SchedulingInterruptedException{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2695,15 +2695,14 @@ void testRelativeActivityPlanPositiveStartOffsetStart() {
new ActivityDirectiveId(2L),
new ActivityDirective(
tenMinutes,
"GrowBanana",
"PickBanana",
Map.of(
"quantity", SerializedValue.of(1),
"growingDuration", SerializedValue.of(activityDuration.in(Duration.MICROSECONDS))),
"quantity", SerializedValue.of(1)),
new ActivityDirectiveId(1L),
true)),
List.of(new SchedulingGoal(new GoalId(0L), """
export default () => Goal.CoexistenceGoal({
forEach: ActivityExpression.ofType(ActivityTypes.GrowBanana),
forEach: ActivityExpression.ofType(ActivityTypes.PickBanana),
activityTemplate: ActivityTemplates.PeelBanana({peelDirection: "fromStem"}),
startsAt: TimingConstraint.singleton(WindowProperty.START).plus(Temporal.Duration.from({ minutes : 5}))
})
Expand All @@ -2724,20 +2723,20 @@ export default () => Goal.CoexistenceGoal({

final var planByActivityType = partitionByActivityType(results.updatedPlan());
final var peelBananas = planByActivityType.get("PeelBanana");
final var growBananas = planByActivityType.get("GrowBanana");
final var pickBananas = planByActivityType.get("PickBanana");
final var durationParamActivities = planByActivityType.get("DurationParameterActivity");

assertEquals(1, peelBananas.size());
assertEquals(1, growBananas.size());
assertEquals(1, pickBananas.size());
assertEquals(1, durationParamActivities.size());
final var peelBanana = peelBananas.iterator().next();
final var growBanana = growBananas.iterator().next();
final var pickBanana = pickBananas.iterator().next();
final var durationParamActivity = durationParamActivities.iterator().next();

assertEquals(Duration.ZERO, durationParamActivity.startOffset());

assertEquals(tenMinutes, growBanana.startOffset());
assertEquals(SerializedValue.of(1), growBanana.serializedActivity().getArguments().get("quantity"));
assertEquals(tenMinutes, pickBanana.startOffset());
assertEquals(SerializedValue.of(1), pickBanana.serializedActivity().getArguments().get("quantity"));

assertEquals(Duration.of(15, Duration.MINUTES), peelBanana.startOffset());
assertEquals(SerializedValue.of("fromStem"), peelBanana.serializedActivity().getArguments().get("peelDirection"));
Expand Down

0 comments on commit 73b3ece

Please sign in to comment.