Skip to content

Commit

Permalink
Make resumable sim driver handle task scheduling
Browse files Browse the repository at this point in the history
This allows to remember references to scheduled tasks and thus to fix the bug resulting from the taskid of an anchored task not to be saved in plannedDirectiveToTask because they were scheduled by the sim engine.
  • Loading branch information
adrienmaillard committed Dec 19, 2023
1 parent 8f3d9bb commit 8116244
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.time.Instant;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

public class ResumableSimulationDriver<Model> implements AutoCloseable {
Expand All @@ -52,6 +54,9 @@ public class ResumableSimulationDriver<Model> implements AutoCloseable {
//mapping each activity name to its task id (in String form) in the simulation engine
private final Map<ActivityDirectiveId, TaskId> plannedDirectiveToTask;

//subset of plannedDirectiveToTask to check for scheduling dependent tasks
private final Map<ActivityDirectiveId, TaskId> toCheckForDependencyScheduling;

//simulation results so far
private SimulationResults lastSimResults;
//cached simulation results cover the period [Duration.ZERO, lastSimResultsEnd]
Expand All @@ -71,6 +76,7 @@ public ResumableSimulationDriver(
){
this.missionModel = missionModel;
plannedDirectiveToTask = new HashMap<>();
toCheckForDependencyScheduling = new HashMap<>();
this.planDuration = planDuration;
countSimulationRestarts = 0;
this.canceledListener = canceledListener;
Expand All @@ -94,6 +100,7 @@ private void printTimeSpent(){
printTimeSpent();
durationSinceRestart = 0;
plannedDirectiveToTask.clear();
toCheckForDependencyScheduling.clear();
lastSimResults = null;
lastSimResultsEnd = Duration.ZERO;
long before = System.nanoTime();
Expand Down Expand Up @@ -273,13 +280,14 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective>
schedule).compute();
// Filter out activities that are before the plan start
resolved = StartOffsetReducer.filterOutNegativeStartOffset(resolved);

final var toSchedule = new HashSet<ActivityDirectiveId>();
toSchedule.add(null);
scheduleActivities(
toSchedule,
schedule,
resolved,
missionModel,
engine,
activityTopic
engine
);

var allTaskFinished = false;
Expand All @@ -303,6 +311,8 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective>
final var commit = engine.performJobs(batch.jobs(), cells, curTime, Duration.MAX_VALUE);
timeline.add(commit);

scheduleActivities(getSuccessorsToSchedule(engine), schedule, resolved, missionModel, engine);

// all tasks are complete : do not exit yet, there might be event triggered at the same time
if (!plannedDirectiveToTask.isEmpty() && plannedDirectiveToTask
.values()
Expand All @@ -328,93 +338,67 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective>
* @return its duration if the activity has been simulated and has finished simulating, an IllegalArgumentException otherwise
*/
public Optional<Duration> getActivityDuration(ActivityDirectiveId activityDirectiveId){
//potential cause of non presence: activity is outside plan bounds
if(!plannedDirectiveToTask.containsKey(activityDirectiveId)) return Optional.empty();
return engine.getTaskDuration(plannedDirectiveToTask.get(activityDirectiveId));
}

private void scheduleActivities(
final Map<ActivityDirectiveId, ActivityDirective> schedule,
private Set<ActivityDirectiveId> getSuccessorsToSchedule(final SimulationEngine engine) {
final var toSchedule = new HashSet<ActivityDirectiveId>();
final var iterator = toCheckForDependencyScheduling.entrySet().iterator();
while(iterator.hasNext()){
final var taskToCheck = iterator.next();
if(engine.isTaskComplete(taskToCheck.getValue())){
toSchedule.add(taskToCheck.getKey());
iterator.remove();
}
}
return toSchedule;
}

public void scheduleActivities(
final Set<ActivityDirectiveId> toScheduleNow,
final Map<ActivityDirectiveId, ActivityDirective> completeSchedule,
final HashMap<ActivityDirectiveId, List<Pair<ActivityDirectiveId, Duration>>> resolved,
final MissionModel<Model> missionModel,
final SimulationEngine engine,
final Topic<ActivityDirectiveId> activityTopic
)
{
if(resolved.get(null) == null) { return; } // Nothing to simulate

for (final Pair<ActivityDirectiveId, Duration> directivePair : resolved.get(null)) {
final var directiveId = directivePair.getLeft();
final var startOffset = directivePair.getRight();
final var serializedDirective = schedule.get(directiveId).serializedActivity();

final TaskFactory<?> task;
try {
task = missionModel.getTaskFactory(serializedDirective);
} catch (final InstantiationException ex) {
// All activity instantiations are assumed to be validated by this point
throw new Error("Unexpected state: activity instantiation %s failed with: %s"
.formatted(serializedDirective.getTypeName(), ex.toString()));
final SimulationEngine engine){
for(final var predecessor: toScheduleNow) {
for (final var directivePair : resolved.get(predecessor)) {
final var offset = directivePair.getRight();
final var directiveIdToSchedule = directivePair.getLeft();
final var serializedDirective = completeSchedule.get(directiveIdToSchedule).serializedActivity();
final TaskFactory<?> task;
try {
task = missionModel.getTaskFactory(serializedDirective);
} catch (final InstantiationException ex) {
// All activity instantiations are assumed to be validated by this point
throw new Error("Unexpected state: activity instantiation %s failed with: %s"
.formatted(serializedDirective.getTypeName(), ex.toString()));
}
Duration computedStartTime = offset;
if (predecessor != null) {
computedStartTime = (curTime.isEqualTo(Duration.MIN_VALUE) ? Duration.ZERO : curTime).plus(offset);
}
final var taskId = engine.scheduleTask(
computedStartTime,
makeTaskFactory(directiveIdToSchedule, task, activityTopic));
plannedDirectiveToTask.put(directiveIdToSchedule, taskId);
if (resolved.containsKey(directiveIdToSchedule)) {
toCheckForDependencyScheduling.put(directiveIdToSchedule, taskId);
}
}

final var taskId = engine.scheduleTask(startOffset, makeTaskFactory(
directiveId,
task,
schedule,
resolved,
missionModel,
activityTopic
));
plannedDirectiveToTask.put(directiveId,taskId);
}
}

private static <Model, Output> TaskFactory<Unit> makeTaskFactory(
private static <Output> TaskFactory<Unit> makeTaskFactory(
final ActivityDirectiveId directiveId,
final TaskFactory<Output> task,
final Map<ActivityDirectiveId, ActivityDirective> schedule,
final HashMap<ActivityDirectiveId, List<Pair<ActivityDirectiveId, Duration>>> resolved,
final MissionModel<Model> missionModel,
final Topic<ActivityDirectiveId> activityTopic
)
{
// Emit the current activity (defined by directiveId)
return executor -> scheduler0 -> TaskStatus.calling((TaskFactory<Output>) (executor1 -> scheduler1 -> {
scheduler1.emit(directiveId, activityTopic);
return task.create(executor1).step(scheduler1);
}), scheduler2 -> {
// When the current activity finishes, get the list of the activities that needed this activity to finish to know their start time
final List<Pair<ActivityDirectiveId, Duration>> dependents = resolved.get(directiveId) == null ? List.of() : resolved.get(directiveId);
// Iterate over the dependents
for (final var dependent : dependents) {
scheduler2.spawn(executor2 -> scheduler3 ->
// Delay until the dependent starts
TaskStatus.delayed(dependent.getRight(), scheduler4 -> {
final var dependentDirectiveId = dependent.getLeft();
final var serializedDependentDirective = schedule.get(dependentDirectiveId).serializedActivity();

// Initialize the Task for the dependent
final TaskFactory<?> dependantTask;
try {
dependantTask = missionModel.getTaskFactory(serializedDependentDirective);
} catch (final InstantiationException ex) {
// All activity instantiations are assumed to be validated by this point
throw new Error("Unexpected state: activity instantiation %s failed with: %s"
.formatted(serializedDependentDirective.getTypeName(), ex.toString()));
}

// Schedule the dependent
// When it finishes, it will schedule the activities depending on it to know their start time
scheduler4.spawn(makeTaskFactory(
dependentDirectiveId,
dependantTask,
schedule,
resolved,
missionModel,
activityTopic
));
return TaskStatus.completed(Unit.UNIT);
}));
}
return TaskStatus.completed(Unit.UNIT);
});
}), scheduler2 -> TaskStatus.completed(Unit.UNIT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ public Duration valueAt(Duration start, final EquationSolvingAlgorithms.History<
if(computedDuration.isPresent()) {
history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, start.plus(computedDuration.get())), new ActivityMetadata(actToSim));
} else{
logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity.");
logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity or activity outside plan bounds.");
history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, null), new ActivityMetadata(actToSim));
}
} catch (SimulationFacade.SimulationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,21 @@ public void activitiesAnchoredToOtherActivities() throws SchedulingInterruptedEx
assertEquals(1, driver.getCountSimulationRestarts());
}

@Test
@DisplayName("Reference to anchored activities are correctly maintained by the driver")
public void activitiesAnchoredToOtherActivitiesSimple() throws SchedulingInterruptedException {
final var activitiesToSimulate = new HashMap<ActivityDirectiveId, ActivityDirective>(2);
activitiesToSimulate.put(
new ActivityDirectiveId(0),
new ActivityDirective(oneMinute, serializedDelayDirective, null, true));
activitiesToSimulate.put(
new ActivityDirectiveId(1),
new ActivityDirective(oneMinute, serializedDelayDirective, new ActivityDirectiveId(0), false));
driver.simulateActivities(activitiesToSimulate);
final var durationOfAnchoredActivity = driver.getActivityDuration(new ActivityDirectiveId(1));
assertTrue(durationOfAnchoredActivity.isPresent());
}

@Test
@DisplayName("Decomposition and anchors do not interfere with each other")
public void decomposingActivitiesAndAnchors() throws SchedulingInterruptedException{
Expand Down

0 comments on commit 8116244

Please sign in to comment.