From 5f5fb6101b0cc2c2a80f03574b461d96f7dde6bc Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Tue, 5 Sep 2023 15:58:44 -0700 Subject: [PATCH 01/26] Count the number of subtasks remaining before a call() can complete --- .../driver/engine/SimulationEngine.java | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index efbddb54f2..1d715c1b92 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -25,6 +25,7 @@ import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue; import gov.nasa.jpl.aerie.merlin.protocol.types.TaskStatus; import gov.nasa.jpl.aerie.merlin.protocol.types.ValueSchema; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; @@ -65,6 +66,8 @@ public final class SimulationEngine implements AutoCloseable { private final Map conditions = new HashMap<>(); /** The profiling state for each tracked resource. */ private final Map> resources = new HashMap<>(); + /** The number of subtasks remaining to complete before a task may resume. */ + private final Map subtasks = new HashMap<>(); /** The task that spawned a given task (if any). */ private final Map taskParent = new HashMap<>(); @@ -80,7 +83,7 @@ public TaskId scheduleTask(final Duration startTime, final TaskFactory< if (startTime.isNegative()) throw new IllegalArgumentException("Cannot schedule a task before the start time of the simulation"); final var task = TaskId.generate(); - this.tasks.put(task, new ExecutionState.InProgress<>(startTime, state.create(this.executor))); + this.tasks.put(task, new ExecutionState.InProgress<>(startTime, Optional.empty(), state.create(this.executor))); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime)); return task; } @@ -200,7 +203,7 @@ private void stepEffectModel( final Duration currentTime ) { // Step the modeling state forward. - final var scheduler = new EngineScheduler(currentTime, task, frame); + final var scheduler = new EngineScheduler(currentTime, task, progress.caller(), frame); final var status = progress.state().step(scheduler); // TODO: Report which topics this activity wrote to at this point in time. This is useful insight for any user. @@ -212,6 +215,11 @@ private void stepEffectModel( this.tasks.put(task, progress.completedAt(currentTime, children)); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime)); + progress.caller().ifPresent($ -> { + if (this.subtasks.get($).decrementAndGet() == 0) { + this.subtasks.remove($); + } + }); } else if (status instanceof TaskStatus.Delayed s) { if (s.delay().isNegative()) throw new IllegalArgumentException("Cannot schedule a task in the past"); @@ -219,9 +227,10 @@ private void stepEffectModel( this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); } else if (status instanceof TaskStatus.CallingTask s) { final var target = TaskId.generate(); - SimulationEngine.this.tasks.put(target, new ExecutionState.InProgress<>(currentTime, s.child().create(this.executor))); + SimulationEngine.this.tasks.put(target, new ExecutionState.InProgress<>(currentTime, Optional.of(task), s.child().create(this.executor))); SimulationEngine.this.taskParent.put(target, task); SimulationEngine.this.taskChildren.computeIfAbsent(task, $ -> new HashSet<>()).add(target); + SimulationEngine.this.subtasks.put(task, new MutableInt(1)); frame.signal(JobId.forTask(target)); this.tasks.put(task, progress.continueWith(s.continuation())); @@ -681,11 +690,18 @@ private static Optional min(final Optional a, final Optional private final class EngineScheduler implements Scheduler { private final Duration currentTime; private final TaskId activeTask; + private final Optional caller; private final TaskFrame frame; - public EngineScheduler(final Duration currentTime, final TaskId activeTask, final TaskFrame frame) { + public EngineScheduler( + final Duration currentTime, + final TaskId activeTask, + final Optional caller, + final TaskFrame frame) + { this.currentTime = Objects.requireNonNull(currentTime); this.activeTask = Objects.requireNonNull(activeTask); + this.caller = Objects.requireNonNull(caller); this.frame = Objects.requireNonNull(frame); } @@ -712,9 +728,10 @@ public void emit(final EventType event, final Topic topic @Override public void spawn(final TaskFactory state) { final var task = TaskId.generate(); - SimulationEngine.this.tasks.put(task, new ExecutionState.InProgress<>(this.currentTime, state.create(SimulationEngine.this.executor))); + SimulationEngine.this.tasks.put(task, new ExecutionState.InProgress<>(this.currentTime, this.caller, state.create(SimulationEngine.this.executor))); SimulationEngine.this.taskParent.put(task, this.activeTask); SimulationEngine.this.taskChildren.computeIfAbsent(this.activeTask, $ -> new HashSet<>()).add(task); + this.caller.ifPresent($ -> SimulationEngine.this.subtasks.get($).increment()); this.frame.signal(JobId.forTask(task)); } } @@ -753,7 +770,7 @@ static ConditionJobId forCondition(final ConditionId condition) { /** The lifecycle stages every task passes through. */ private sealed interface ExecutionState { /** The task is in its primary operational phase. */ - record InProgress(Duration startOffset, Task state) + record InProgress(Duration startOffset, Optional caller, Task state) implements ExecutionState { public AwaitingChildren completedAt( @@ -763,7 +780,7 @@ public AwaitingChildren completedAt( } public InProgress continueWith(final Task newState) { - return new InProgress<>(this.startOffset, newState); + return new InProgress<>(this.startOffset, this.caller, newState); } } From 23e2d1e78ad1f44201613a8511eaa6939f3de5c9 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Tue, 5 Sep 2023 16:24:01 -0700 Subject: [PATCH 02/26] Resume calling tasks by counting subtasks --- .../nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 1d715c1b92..8e046011d9 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -218,6 +218,7 @@ private void stepEffectModel( progress.caller().ifPresent($ -> { if (this.subtasks.get($).decrementAndGet() == 0) { this.subtasks.remove($); + this.scheduledJobs.schedule(JobId.forTask($), SubInstant.Tasks.at(currentTime)); } }); } else if (status instanceof TaskStatus.Delayed s) { @@ -234,7 +235,6 @@ private void stepEffectModel( frame.signal(JobId.forTask(target)); this.tasks.put(task, progress.continueWith(s.continuation())); - this.waitingTasks.subscribeQuery(task, Set.of(SignalId.forTask(target))); } else if (status instanceof TaskStatus.AwaitingCondition s) { final var condition = ConditionId.generate(); this.conditions.put(condition, s.condition()); From 3e3abe87e2f05961be3c7cfb2fd097f76af66931 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 04:40:36 -0700 Subject: [PATCH 03/26] Organize tasks into a tree of spans --- .../driver/engine/SimulationEngine.java | 65 ++++++++++++++++--- .../aerie/merlin/driver/engine/SpanId.java | 10 +++ 2 files changed, 67 insertions(+), 8 deletions(-) create mode 100644 merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SpanId.java diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 8e046011d9..8572dbfff7 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -68,6 +68,19 @@ public final class SimulationEngine implements AutoCloseable { private final Map> resources = new HashMap<>(); /** The number of subtasks remaining to complete before a task may resume. */ private final Map subtasks = new HashMap<>(); + /** The set of all spans of work contributed to by modeled tasks. */ + private final Map spans = new HashMap<>(); + /** A count of the remaining live tasks (and other spans) under each span. */ + private final Map spanTasks = new HashMap<>(); + + /** The span of time over which a subtree of tasks has acted. */ + public record Span(Optional parent, Duration startOffset, Optional endOffset) { + /** Close out a span, marking it as inactive past the given time. */ + public Span close(final Duration endOffset) { + if (this.endOffset.isPresent()) throw new Error("Attempt to close an already-closed span"); + return new Span(this.parent, this.startOffset, Optional.of(endOffset)); + } + } /** The task that spawned a given task (if any). */ private final Map taskParent = new HashMap<>(); @@ -82,8 +95,12 @@ public final class SimulationEngine implements AutoCloseable { public TaskId scheduleTask(final Duration startTime, final TaskFactory state) { if (startTime.isNegative()) throw new IllegalArgumentException("Cannot schedule a task before the start time of the simulation"); + final var span = SpanId.generate(); + this.spans.put(span, new Span(Optional.empty(), startTime, Optional.empty())); + final var task = TaskId.generate(); - this.tasks.put(task, new ExecutionState.InProgress<>(startTime, Optional.empty(), state.create(this.executor))); + this.spanTasks.put(span, new MutableInt(1)); + this.tasks.put(task, new ExecutionState.InProgress<>(span, startTime, Optional.empty(), state.create(this.executor))); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime)); return task; } @@ -203,7 +220,7 @@ private void stepEffectModel( final Duration currentTime ) { // Step the modeling state forward. - final var scheduler = new EngineScheduler(currentTime, task, progress.caller(), frame); + final var scheduler = new EngineScheduler(currentTime, task, progress.span(), progress.caller(), frame); final var status = progress.state().step(scheduler); // TODO: Report which topics this activity wrote to at this point in time. This is useful insight for any user. @@ -213,6 +230,21 @@ private void stepEffectModel( if (status instanceof TaskStatus.Completed) { final var children = new LinkedList<>(this.taskChildren.getOrDefault(task, Collections.emptySet())); + // Propagate completion up the span hierarchy. + // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. + var span = progress.span(); + while (true) { + if (this.spanTasks.get(span).decrementAndGet() > 0) break; + this.spanTasks.remove(span); + + this.spans.compute(span, (_id, $) -> $.close(currentTime)); + + final var span$ = this.spans.get(span).parent; + if (span$.isEmpty()) break; + + span = span$.get(); + } + this.tasks.put(task, progress.completedAt(currentTime, children)); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime)); progress.caller().ifPresent($ -> { @@ -227,8 +259,14 @@ private void stepEffectModel( this.tasks.put(task, progress.continueWith(s.continuation())); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); } else if (status instanceof TaskStatus.CallingTask s) { + // TODO: Not *every* task should get a new span. Allow the model to decide where spans go. + final var targetSpan = SpanId.generate(); + SimulationEngine.this.spans.put(targetSpan, new Span(Optional.of(progress.span()), currentTime, Optional.empty())); + SimulationEngine.this.spanTasks.get(progress.span()).increment(); + final var target = TaskId.generate(); - SimulationEngine.this.tasks.put(target, new ExecutionState.InProgress<>(currentTime, Optional.of(task), s.child().create(this.executor))); + SimulationEngine.this.spanTasks.put(targetSpan, new MutableInt(1)); + SimulationEngine.this.tasks.put(target, new ExecutionState.InProgress<>(targetSpan, currentTime, Optional.of(task), s.child().create(this.executor))); SimulationEngine.this.taskParent.put(target, task); SimulationEngine.this.taskChildren.computeIfAbsent(task, $ -> new HashSet<>()).add(target); SimulationEngine.this.subtasks.put(task, new MutableInt(1)); @@ -690,17 +728,20 @@ private static Optional min(final Optional a, final Optional private final class EngineScheduler implements Scheduler { private final Duration currentTime; private final TaskId activeTask; + private final SpanId span; private final Optional caller; private final TaskFrame frame; public EngineScheduler( final Duration currentTime, final TaskId activeTask, + final SpanId span, final Optional caller, final TaskFrame frame) { this.currentTime = Objects.requireNonNull(currentTime); this.activeTask = Objects.requireNonNull(activeTask); + this.span = Objects.requireNonNull(span); this.caller = Objects.requireNonNull(caller); this.frame = Objects.requireNonNull(frame); } @@ -727,8 +768,14 @@ public void emit(final EventType event, final Topic topic @Override public void spawn(final TaskFactory state) { + // TODO: Not *every* task should get a new span. Allow the model to decide where spans go. + final var taskSpan = SpanId.generate(); + SimulationEngine.this.spans.put(taskSpan, new Span(Optional.of(this.span), this.currentTime, Optional.empty())); + SimulationEngine.this.spanTasks.get(this.span).increment(); + final var task = TaskId.generate(); - SimulationEngine.this.tasks.put(task, new ExecutionState.InProgress<>(this.currentTime, this.caller, state.create(SimulationEngine.this.executor))); + SimulationEngine.this.spanTasks.put(taskSpan, new MutableInt(1)); + SimulationEngine.this.tasks.put(task, new ExecutionState.InProgress<>(taskSpan, this.currentTime, this.caller, state.create(SimulationEngine.this.executor))); SimulationEngine.this.taskParent.put(task, this.activeTask); SimulationEngine.this.taskChildren.computeIfAbsent(this.activeTask, $ -> new HashSet<>()).add(task); this.caller.ifPresent($ -> SimulationEngine.this.subtasks.get($).increment()); @@ -770,34 +817,36 @@ static ConditionJobId forCondition(final ConditionId condition) { /** The lifecycle stages every task passes through. */ private sealed interface ExecutionState { /** The task is in its primary operational phase. */ - record InProgress(Duration startOffset, Optional caller, Task state) + record InProgress(SpanId span, Duration startOffset, Optional caller, Task state) implements ExecutionState { public AwaitingChildren completedAt( final Duration endOffset, final LinkedList remainingChildren) { - return new AwaitingChildren<>(this.startOffset, endOffset, remainingChildren); + return new AwaitingChildren<>(this.span, this.startOffset, endOffset, remainingChildren); } public InProgress continueWith(final Task newState) { - return new InProgress<>(this.startOffset, this.caller, newState); + return new InProgress<>(this.span, this.startOffset, this.caller, newState); } } /** The task has completed its primary operation, but has unfinished children. */ record AwaitingChildren( + SpanId span, Duration startOffset, Duration endOffset, LinkedList remainingChildren ) implements ExecutionState { public Terminated joinedAt(final Duration joinOffset) { - return new Terminated<>(this.startOffset, this.endOffset, joinOffset); + return new Terminated<>(this.span, this.startOffset, this.endOffset, joinOffset); } } /** The task and all its delegated children have completed. */ record Terminated( + SpanId span, Duration startOffset, Duration endOffset, Duration joinOffset diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SpanId.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SpanId.java new file mode 100644 index 0000000000..96f5795b0c --- /dev/null +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SpanId.java @@ -0,0 +1,10 @@ +package gov.nasa.jpl.aerie.merlin.driver.engine; + +import java.util.UUID; + +/** A typed wrapper for span IDs. */ +public record SpanId(String id) { + public static SpanId generate() { + return new SpanId(UUID.randomUUID().toString()); + } +} From cc1a25e48cd5b3c861251fb6b373f09e7fbd59d7 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 05:45:29 -0700 Subject: [PATCH 04/26] Replace String with TaskId in several places --- .../driver/engine/SimulationEngine.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 8572dbfff7..530766bb64 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -379,16 +379,16 @@ public boolean isTaskComplete(final TaskId task) { } private record TaskInfo( - Map taskToPlannedDirective, - Map input, - Map output + Map taskToPlannedDirective, + Map input, + Map output ) { public TaskInfo() { this(new HashMap<>(), new HashMap<>(), new HashMap<>()); } public boolean isActivity(final TaskId id) { - return this.input.containsKey(id.id()); + return this.input.containsKey(id); } public record Trait(Iterable> topics, Topic activityTopic) implements EffectTrait> { @@ -412,7 +412,7 @@ public Consumer atom(final Event ev) { return taskInfo -> { // Identify activities. ev.extract(this.activityTopic) - .ifPresent(directiveId -> taskInfo.taskToPlannedDirective.put(ev.provenance().id(), directiveId)); + .ifPresent(directiveId -> taskInfo.taskToPlannedDirective.put(ev.provenance(), directiveId)); for (final var topic : this.topics) { // Identify activity inputs. @@ -432,7 +432,7 @@ void extractInput(final SerializableTopic topic, final Event ev, final TaskIn final var activityType = topic.name().substring("ActivityType.Input.".length()); taskInfo.input.put( - ev.provenance().id(), + ev.provenance(), new SerializedActivity(activityType, topic.outputType().serialize(input).asMap().orElseThrow())); }); } @@ -443,7 +443,7 @@ void extractOutput(final SerializableTopic topic, final Event ev, final TaskI ev.extract(topic.topic()).ifPresent(output -> { taskInfo.output.put( - ev.provenance().id(), + ev.provenance(), topic.outputType().serialize(output)); }); } @@ -507,7 +507,7 @@ public static SimulationResults computeResults( // Give every task corresponding to a child activity an ID that doesn't conflict with any root activity. - final var taskToSimulatedActivityId = new HashMap(taskInfo.taskToPlannedDirective.size()); + final var taskToSimulatedActivityId = new HashMap(taskInfo.taskToPlannedDirective.size()); final var usedSimulatedActivityIds = new HashSet<>(); for (final var entry : taskInfo.taskToPlannedDirective.entrySet()) { taskToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); @@ -516,10 +516,10 @@ public static SimulationResults computeResults( long counter = 1L; for (final var task : engine.tasks.keySet()) { if (!taskInfo.isActivity(task)) continue; - if (taskToSimulatedActivityId.containsKey(task.id())) continue; + if (taskToSimulatedActivityId.containsKey(task)) continue; while (usedSimulatedActivityIds.contains(counter)) counter++; - taskToSimulatedActivityId.put(task.id(), new SimulatedActivityId(counter++)); + taskToSimulatedActivityId.put(task, new SimulatedActivityId(counter++)); } // Identify the nearest ancestor *activity* (excluding intermediate anonymous tasks). @@ -533,7 +533,7 @@ public static SimulationResults computeResults( } if (parent != null) { - activityParents.put(taskToSimulatedActivityId.get(task.id()), taskToSimulatedActivityId.get(parent.id())); + activityParents.put(taskToSimulatedActivityId.get(task), taskToSimulatedActivityId.get(parent)); } }); @@ -547,12 +547,12 @@ public static SimulationResults computeResults( engine.tasks.forEach((task, state) -> { if (!taskInfo.isActivity(task)) return; - final var activityId = taskToSimulatedActivityId.get(task.id()); - final var directiveId = taskInfo.taskToPlannedDirective.get(task.id()); // will be null for non-directives + final var activityId = taskToSimulatedActivityId.get(task); + final var directiveId = taskInfo.taskToPlannedDirective.get(task); // will be null for non-directives if (state instanceof ExecutionState.Terminated e) { - final var inputAttributes = taskInfo.input().get(task.id()); - final var outputAttributes = taskInfo.output().get(task.id()); + final var inputAttributes = taskInfo.input().get(task); + final var outputAttributes = taskInfo.output().get(task); simulatedActivities.put(activityId, new SimulatedActivity( inputAttributes.getTypeName(), @@ -565,7 +565,7 @@ public static SimulationResults computeResults( outputAttributes )); } else if (state instanceof ExecutionState.InProgress e){ - final var inputAttributes = taskInfo.input().get(task.id()); + final var inputAttributes = taskInfo.input().get(task); unfinishedActivities.put(activityId, new UnfinishedActivity( inputAttributes.getTypeName(), inputAttributes.getArguments(), @@ -575,7 +575,7 @@ public static SimulationResults computeResults( (activityParents.containsKey(activityId)) ? Optional.empty() : Optional.of(directiveId) )); } else if (state instanceof ExecutionState.AwaitingChildren e){ - final var inputAttributes = taskInfo.input().get(task.id()); + final var inputAttributes = taskInfo.input().get(task); unfinishedActivities.put(activityId, new UnfinishedActivity( inputAttributes.getTypeName(), inputAttributes.getArguments(), From 83a25912a0dfea40c458d7391510c14f9b0afeb6 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 05:50:47 -0700 Subject: [PATCH 05/26] Associate events to spans, not to tasks --- .../driver/engine/SimulationEngine.java | 50 +++++++------------ .../aerie/merlin/driver/timeline/Event.java | 8 +-- .../merlin/driver/engine/TaskFrameTest.java | 2 +- 3 files changed, 24 insertions(+), 36 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 530766bb64..37f613b533 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -379,15 +379,15 @@ public boolean isTaskComplete(final TaskId task) { } private record TaskInfo( - Map taskToPlannedDirective, - Map input, - Map output + Map taskToPlannedDirective, + Map input, + Map output ) { public TaskInfo() { this(new HashMap<>(), new HashMap<>(), new HashMap<>()); } - public boolean isActivity(final TaskId id) { + public boolean isActivity(final SpanId id) { return this.input.containsKey(id); } @@ -507,14 +507,14 @@ public static SimulationResults computeResults( // Give every task corresponding to a child activity an ID that doesn't conflict with any root activity. - final var taskToSimulatedActivityId = new HashMap(taskInfo.taskToPlannedDirective.size()); + final var taskToSimulatedActivityId = new HashMap(taskInfo.taskToPlannedDirective.size()); final var usedSimulatedActivityIds = new HashSet<>(); for (final var entry : taskInfo.taskToPlannedDirective.entrySet()) { taskToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); usedSimulatedActivityIds.add(entry.getValue().id()); } long counter = 1L; - for (final var task : engine.tasks.keySet()) { + for (final var task : engine.spans.keySet()) { if (!taskInfo.isActivity(task)) continue; if (taskToSimulatedActivityId.containsKey(task)) continue; @@ -524,16 +524,16 @@ public static SimulationResults computeResults( // Identify the nearest ancestor *activity* (excluding intermediate anonymous tasks). final var activityParents = new HashMap(); - engine.tasks.forEach((task, state) -> { + engine.spans.forEach((task, state) -> { if (!taskInfo.isActivity(task)) return; - var parent = engine.taskParent.get(task); - while (parent != null && !taskInfo.isActivity(parent)) { - parent = engine.taskParent.get(parent); + var parent = state.parent(); + while (parent.isPresent() && !taskInfo.isActivity(parent.get())) { + parent = engine.spans.get(parent.get()).parent(); } - if (parent != null) { - activityParents.put(taskToSimulatedActivityId.get(task), taskToSimulatedActivityId.get(parent)); + if (parent.isPresent()) { + activityParents.put(taskToSimulatedActivityId.get(task), taskToSimulatedActivityId.get(parent.get())); } }); @@ -544,48 +544,36 @@ public static SimulationResults computeResults( final var simulatedActivities = new HashMap(); final var unfinishedActivities = new HashMap(); - engine.tasks.forEach((task, state) -> { + engine.spans.forEach((task, state) -> { if (!taskInfo.isActivity(task)) return; final var activityId = taskToSimulatedActivityId.get(task); final var directiveId = taskInfo.taskToPlannedDirective.get(task); // will be null for non-directives - if (state instanceof ExecutionState.Terminated e) { + if (state.endOffset().isPresent()) { final var inputAttributes = taskInfo.input().get(task); final var outputAttributes = taskInfo.output().get(task); simulatedActivities.put(activityId, new SimulatedActivity( inputAttributes.getTypeName(), inputAttributes.getArguments(), - startTime.plus(e.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), - e.joinOffset().minus(e.startOffset()), + startTime.plus(state.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), + state.endOffset().get().minus(state.startOffset()), activityParents.get(activityId), activityChildren.getOrDefault(activityId, Collections.emptyList()), (activityParents.containsKey(activityId)) ? Optional.empty() : Optional.of(directiveId), outputAttributes )); - } else if (state instanceof ExecutionState.InProgress e){ - final var inputAttributes = taskInfo.input().get(task); - unfinishedActivities.put(activityId, new UnfinishedActivity( - inputAttributes.getTypeName(), - inputAttributes.getArguments(), - startTime.plus(e.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), - activityParents.get(activityId), - activityChildren.getOrDefault(activityId, Collections.emptyList()), - (activityParents.containsKey(activityId)) ? Optional.empty() : Optional.of(directiveId) - )); - } else if (state instanceof ExecutionState.AwaitingChildren e){ + } else { final var inputAttributes = taskInfo.input().get(task); unfinishedActivities.put(activityId, new UnfinishedActivity( inputAttributes.getTypeName(), inputAttributes.getArguments(), - startTime.plus(e.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), + startTime.plus(state.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), activityParents.get(activityId), activityChildren.getOrDefault(activityId, Collections.emptyList()), (activityParents.containsKey(activityId)) ? Optional.empty() : Optional.of(directiveId) )); - } else { - throw new Error("Unexpected subtype of %s: %s".formatted(ExecutionState.class, state.getClass())); } }); @@ -761,7 +749,7 @@ public State get(final CellId token) { @Override public void emit(final EventType event, final Topic topic) { // Append this event to the timeline. - this.frame.emit(Event.create(topic, event, this.activeTask)); + this.frame.emit(Event.create(topic, event, this.span)); SimulationEngine.this.invalidateTopic(topic, this.currentTime); } diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/timeline/Event.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/timeline/Event.java index e9f334a7da..7fd8840319 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/timeline/Event.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/timeline/Event.java @@ -1,6 +1,6 @@ package gov.nasa.jpl.aerie.merlin.driver.timeline; -import gov.nasa.jpl.aerie.merlin.driver.engine.TaskId; +import gov.nasa.jpl.aerie.merlin.driver.engine.SpanId; import gov.nasa.jpl.aerie.merlin.protocol.driver.Topic; import java.util.Objects; @@ -16,7 +16,7 @@ private Event(final Event.GenericEvent inner) { } public static - Event create(final Topic topic, final EventType event, final TaskId provenance) { + Event create(final Topic topic, final EventType event, final SpanId provenance) { return new Event(new Event.GenericEvent<>(topic, event, provenance)); } @@ -34,7 +34,7 @@ public Topic topic() { return this.inner.topic(); } - public TaskId provenance() { + public SpanId provenance() { return this.inner.provenance(); } @@ -43,7 +43,7 @@ public String toString() { return "<@%s, %s>".formatted(System.identityHashCode(this.inner.topic), this.inner.event); } - private record GenericEvent(Topic topic, EventType event, TaskId provenance) { + private record GenericEvent(Topic topic, EventType event, SpanId provenance) { private GenericEvent { Objects.requireNonNull(topic); Objects.requireNonNull(event); diff --git a/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/engine/TaskFrameTest.java b/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/engine/TaskFrameTest.java index 44e8cbed5e..1b435dfbce 100644 --- a/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/engine/TaskFrameTest.java +++ b/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/engine/TaskFrameTest.java @@ -28,7 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public final class TaskFrameTest { - private static final TaskId ORIGIN = TaskId.generate(); + private static final SpanId ORIGIN = SpanId.generate(); // This regression test identified a bug in the LiveCells-chain-avoidance optimization in TaskFrame. @Test From 967c36d3ae0ead93786eb2410d9face117f07696 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 05:56:17 -0700 Subject: [PATCH 06/26] Rename several variables from `task`-based to `span`-based --- .../driver/engine/SimulationEngine.java | 93 ++++++++++--------- 1 file changed, 49 insertions(+), 44 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 37f613b533..1f7e7cf0da 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -378,12 +378,12 @@ public boolean isTaskComplete(final TaskId task) { return (this.tasks.get(task) instanceof ExecutionState.Terminated); } - private record TaskInfo( - Map taskToPlannedDirective, + private record SpanInfo( + Map spanToPlannedDirective, Map input, Map output ) { - public TaskInfo() { + public SpanInfo() { this(new HashMap<>(), new HashMap<>(), new HashMap<>()); } @@ -391,58 +391,63 @@ public boolean isActivity(final SpanId id) { return this.input.containsKey(id); } - public record Trait(Iterable> topics, Topic activityTopic) implements EffectTrait> { + public record Trait(Iterable> topics, Topic activityTopic) implements EffectTrait> { @Override - public Consumer empty() { - return taskInfo -> {}; + public Consumer empty() { + return spanInfo -> {}; } @Override - public Consumer sequentially(final Consumer prefix, final Consumer suffix) { - return taskInfo -> { prefix.accept(taskInfo); suffix.accept(taskInfo); }; + public Consumer sequentially(final Consumer prefix, final Consumer suffix) { + return spanInfo -> { prefix.accept(spanInfo); suffix.accept(spanInfo); }; } @Override - public Consumer concurrently(final Consumer left, final Consumer right) { - // SAFETY: For each task, `left` commutes with `right`, because no task runs concurrently with itself. - return taskInfo -> { left.accept(taskInfo); right.accept(taskInfo); }; + public Consumer concurrently(final Consumer left, final Consumer right) { + // SAFETY: `left` and `right` should commute. HOWEVER, if a span happens to directly contain two activities + // -- that is, two activities both contribute events under the same span's provenance -- then this + // does not actually commute. + // Arguably, this is a model-specific analysis anyway, since we're looking for specific events + // and inferring model structure from them, and at this time we're only working with models + // for which every activity has a span to itself. + return spanInfo -> { left.accept(spanInfo); right.accept(spanInfo); }; } - public Consumer atom(final Event ev) { - return taskInfo -> { + public Consumer atom(final Event ev) { + return spanInfo -> { // Identify activities. ev.extract(this.activityTopic) - .ifPresent(directiveId -> taskInfo.taskToPlannedDirective.put(ev.provenance(), directiveId)); + .ifPresent(directiveId -> spanInfo.spanToPlannedDirective.put(ev.provenance(), directiveId)); for (final var topic : this.topics) { // Identify activity inputs. - extractInput(topic, ev, taskInfo); + extractInput(topic, ev, spanInfo); // Identify activity outputs. - extractOutput(topic, ev, taskInfo); + extractOutput(topic, ev, spanInfo); } }; } private static - void extractInput(final SerializableTopic topic, final Event ev, final TaskInfo taskInfo) { + void extractInput(final SerializableTopic topic, final Event ev, final SpanInfo spanInfo) { if (!topic.name().startsWith("ActivityType.Input.")) return; ev.extract(topic.topic()).ifPresent(input -> { final var activityType = topic.name().substring("ActivityType.Input.".length()); - taskInfo.input.put( + spanInfo.input.put( ev.provenance(), new SerializedActivity(activityType, topic.outputType().serialize(input).asMap().orElseThrow())); }); } private static - void extractOutput(final SerializableTopic topic, final Event ev, final TaskInfo taskInfo) { + void extractOutput(final SerializableTopic topic, final Event ev, final SpanInfo spanInfo) { if (!topic.name().startsWith("ActivityType.Output.")) return; ev.extract(topic.topic()).ifPresent(output -> { - taskInfo.output.put( + spanInfo.output.put( ev.provenance(), topic.outputType().serialize(output)); }); @@ -465,14 +470,14 @@ public static SimulationResults computeResults( final TemporalEventSource timeline, final Iterable> serializableTopics ) { - // Collect per-task information from the event graph. - final var taskInfo = new TaskInfo(); + // Collect per-span information from the event graph. + final var spanInfo = new SpanInfo(); for (final var point : timeline) { if (!(point instanceof TemporalEventSource.TimePoint.Commit p)) continue; - final var trait = new TaskInfo.Trait(serializableTopics, activityTopic); - p.events().evaluate(trait, trait::atom).accept(taskInfo); + final var trait = new SpanInfo.Trait(serializableTopics, activityTopic); + p.events().evaluate(trait, trait::atom).accept(spanInfo); } // Extract profiles for every resource. @@ -507,52 +512,52 @@ public static SimulationResults computeResults( // Give every task corresponding to a child activity an ID that doesn't conflict with any root activity. - final var taskToSimulatedActivityId = new HashMap(taskInfo.taskToPlannedDirective.size()); + final var spanToSimulatedActivityId = new HashMap(spanInfo.spanToPlannedDirective.size()); final var usedSimulatedActivityIds = new HashSet<>(); - for (final var entry : taskInfo.taskToPlannedDirective.entrySet()) { - taskToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); + for (final var entry : spanInfo.spanToPlannedDirective.entrySet()) { + spanToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); usedSimulatedActivityIds.add(entry.getValue().id()); } long counter = 1L; - for (final var task : engine.spans.keySet()) { - if (!taskInfo.isActivity(task)) continue; - if (taskToSimulatedActivityId.containsKey(task)) continue; + for (final var span : engine.spans.keySet()) { + if (!spanInfo.isActivity(span)) continue; + if (spanToSimulatedActivityId.containsKey(span)) continue; while (usedSimulatedActivityIds.contains(counter)) counter++; - taskToSimulatedActivityId.put(task, new SimulatedActivityId(counter++)); + spanToSimulatedActivityId.put(span, new SimulatedActivityId(counter++)); } // Identify the nearest ancestor *activity* (excluding intermediate anonymous tasks). final var activityParents = new HashMap(); - engine.spans.forEach((task, state) -> { - if (!taskInfo.isActivity(task)) return; + engine.spans.forEach((span, state) -> { + if (!spanInfo.isActivity(span)) return; var parent = state.parent(); - while (parent.isPresent() && !taskInfo.isActivity(parent.get())) { + while (parent.isPresent() && !spanInfo.isActivity(parent.get())) { parent = engine.spans.get(parent.get()).parent(); } if (parent.isPresent()) { - activityParents.put(taskToSimulatedActivityId.get(task), taskToSimulatedActivityId.get(parent.get())); + activityParents.put(spanToSimulatedActivityId.get(span), spanToSimulatedActivityId.get(parent.get())); } }); final var activityChildren = new HashMap>(); - activityParents.forEach((task, parent) -> { - activityChildren.computeIfAbsent(parent, $ -> new LinkedList<>()).add(task); + activityParents.forEach((activity, parent) -> { + activityChildren.computeIfAbsent(parent, $ -> new LinkedList<>()).add(activity); }); final var simulatedActivities = new HashMap(); final var unfinishedActivities = new HashMap(); - engine.spans.forEach((task, state) -> { - if (!taskInfo.isActivity(task)) return; + engine.spans.forEach((span, state) -> { + if (!spanInfo.isActivity(span)) return; - final var activityId = taskToSimulatedActivityId.get(task); - final var directiveId = taskInfo.taskToPlannedDirective.get(task); // will be null for non-directives + final var activityId = spanToSimulatedActivityId.get(span); + final var directiveId = spanInfo.spanToPlannedDirective.get(span); // will be null for non-directives if (state.endOffset().isPresent()) { - final var inputAttributes = taskInfo.input().get(task); - final var outputAttributes = taskInfo.output().get(task); + final var inputAttributes = spanInfo.input().get(span); + final var outputAttributes = spanInfo.output().get(span); simulatedActivities.put(activityId, new SimulatedActivity( inputAttributes.getTypeName(), @@ -565,7 +570,7 @@ public static SimulationResults computeResults( outputAttributes )); } else { - final var inputAttributes = taskInfo.input().get(task); + final var inputAttributes = spanInfo.input().get(span); unfinishedActivities.put(activityId, new UnfinishedActivity( inputAttributes.getTypeName(), inputAttributes.getArguments(), From 76d43de7db0b24e1204c501c14ac3921dc9a06cf Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 06:05:45 -0700 Subject: [PATCH 07/26] Change `isTaskComplete` and `getTaskDuration` to use Span information --- .../aerie/merlin/driver/SimulationDriver.java | 4 ++-- .../driver/engine/SimulationEngine.java | 23 ++++++++++--------- .../simulation/ResumableSimulationDriver.java | 12 +++++----- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java index 9608639f2d..fc906e3568 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java @@ -165,11 +165,11 @@ void simulateTask(final MissionModel missionModel, final TaskFactory duration() { + return this.endOffset.map($ -> $.minus(this.startOffset)); + } } /** The task that spawned a given task (if any). */ @@ -92,7 +96,7 @@ public Span close(final Duration endOffset) { private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); /** Schedule a new task to be performed at the given time. */ - public TaskId scheduleTask(final Duration startTime, final TaskFactory state) { + public SpanId scheduleTask(final Duration startTime, final TaskFactory state) { if (startTime.isNegative()) throw new IllegalArgumentException("Cannot schedule a task before the start time of the simulation"); final var span = SpanId.generate(); @@ -102,7 +106,8 @@ public TaskId scheduleTask(final Duration startTime, final TaskFactory< this.spanTasks.put(span, new MutableInt(1)); this.tasks.put(task, new ExecutionState.InProgress<>(span, startTime, Optional.empty(), state.create(this.executor))); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime)); - return task; + + return span; } /** Register a resource whose profile should be accumulated over time. */ @@ -373,9 +378,9 @@ public void close() { this.executor.shutdownNow(); } - /** Determine if a given task has fully completed. */ - public boolean isTaskComplete(final TaskId task) { - return (this.tasks.get(task) instanceof ExecutionState.Terminated); + /** Determine if a given span has fully completed. */ + public boolean isSpanComplete(final SpanId span) { + return this.spans.get(span).endOffset().isPresent(); } private record SpanInfo( @@ -625,12 +630,8 @@ public static SimulationResults computeResults( serializedTimeline); } - public Optional getTaskDuration(TaskId taskId){ - final var state = tasks.get(taskId); - if (state instanceof ExecutionState.Terminated e) { - return Optional.of(e.joinOffset().minus(e.startOffset())); - } - return Optional.empty(); + public Span getSpan(SpanId spanId) { + return this.spans.get(spanId); } diff --git a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java index 2cf23d0897..a8305913ec 100644 --- a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java +++ b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java @@ -8,7 +8,7 @@ import gov.nasa.jpl.aerie.merlin.driver.StartOffsetReducer; import gov.nasa.jpl.aerie.merlin.driver.engine.JobSchedule; import gov.nasa.jpl.aerie.merlin.driver.engine.SimulationEngine; -import gov.nasa.jpl.aerie.merlin.driver.engine.TaskId; +import gov.nasa.jpl.aerie.merlin.driver.engine.SpanId; import gov.nasa.jpl.aerie.merlin.driver.timeline.LiveCells; import gov.nasa.jpl.aerie.merlin.driver.timeline.TemporalEventSource; import gov.nasa.jpl.aerie.merlin.protocol.driver.Topic; @@ -52,10 +52,10 @@ public class ResumableSimulationDriver implements AutoCloseable { private final Topic activityTopic = new Topic<>(); //mapping each activity name to its task id (in String form) in the simulation engine - private final Map plannedDirectiveToTask; + private final Map plannedDirectiveToTask; //subset of plannedDirectiveToTask to check for scheduling dependent tasks - private final Map toCheckForDependencyScheduling; + private final Map toCheckForDependencyScheduling; //simulation results so far private SimulationResults lastSimResults; @@ -317,7 +317,7 @@ private void simulateSchedule(final Map if (!plannedDirectiveToTask.isEmpty() && plannedDirectiveToTask .values() .stream() - .allMatch(engine::isTaskComplete)) { + .allMatch(engine::isSpanComplete)) { allTaskFinished = true; } @@ -340,7 +340,7 @@ private void simulateSchedule(final Map public Optional getActivityDuration(ActivityDirectiveId activityDirectiveId){ //potential cause of non presence: (1) activity is outside plan bounds (2) activity has not been simulated yet if(!plannedDirectiveToTask.containsKey(activityDirectiveId)) return Optional.empty(); - return engine.getTaskDuration(plannedDirectiveToTask.get(activityDirectiveId)); + return engine.getSpan(plannedDirectiveToTask.get(activityDirectiveId)).duration(); } private Set getSuccessorsToSchedule(final SimulationEngine engine) { @@ -348,7 +348,7 @@ private Set getSuccessorsToSchedule(final SimulationEngine final var iterator = toCheckForDependencyScheduling.entrySet().iterator(); while(iterator.hasNext()){ final var taskToCheck = iterator.next(); - if(engine.isTaskComplete(taskToCheck.getValue())){ + if(engine.isSpanComplete(taskToCheck.getValue())){ toSchedule.add(taskToCheck.getKey()); iterator.remove(); } From 78e5795902c8f52b7f79a7e8f7aa66248bb78627 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 06:19:36 -0700 Subject: [PATCH 08/26] Remove obsolete task lifecycle stages --- .../driver/engine/SimulationEngine.java | 125 +++--------------- 1 file changed, 15 insertions(+), 110 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 21c98dcb7e..f2f43461bd 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -86,12 +86,6 @@ public Optional duration() { } } - /** The task that spawned a given task (if any). */ - private final Map taskParent = new HashMap<>(); - /** The set of children for each task (if any). */ - @DerivedFrom("taskParent") - private final Map> taskChildren = new HashMap<>(); - /** A thread pool that modeled tasks can use to keep track of their state between steps. */ private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); @@ -104,7 +98,7 @@ public SpanId scheduleTask(final Duration startTime, final TaskFactory< final var task = TaskId.generate(); this.spanTasks.put(span, new MutableInt(1)); - this.tasks.put(task, new ExecutionState.InProgress<>(span, startTime, Optional.empty(), state.create(this.executor))); + this.tasks.put(task, new ExecutionState<>(span, Optional.empty(), state.create(this.executor))); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime)); return span; @@ -193,39 +187,22 @@ public void performJob( /** Perform the next step of a modeled task. */ public void stepTask(final TaskId task, final TaskFrame frame, final Duration currentTime) { - // The handler for each individual task stage is responsible - // for putting an updated lifecycle back into the task set. - var lifecycle = this.tasks.remove(task); - - stepTaskHelper(task, frame, currentTime, lifecycle); - } + // The handler for the next status of the task is responsible + // for putting an updated state back into the task set. + var state = this.tasks.remove(task); - private void stepTaskHelper( - final TaskId task, - final TaskFrame frame, - final Duration currentTime, - final ExecutionState lifecycle) - { - // Extract the current modeling state. - if (lifecycle instanceof ExecutionState.InProgress e) { - stepEffectModel(task, e, frame, currentTime); - } else if (lifecycle instanceof ExecutionState.AwaitingChildren e) { - stepWaitingTask(task, e, frame, currentTime); - } else { - // TODO: Log this issue to somewhere more general than stderr. - System.err.println("Task %s is ready but in unexpected execution state %s".formatted(task, lifecycle)); - } + stepEffectModel(task, state, frame, currentTime); } /** Make progress in a task by stepping its associated effect model forward. */ private void stepEffectModel( final TaskId task, - final ExecutionState.InProgress progress, + final ExecutionState progress, final TaskFrame frame, final Duration currentTime ) { // Step the modeling state forward. - final var scheduler = new EngineScheduler(currentTime, task, progress.span(), progress.caller(), frame); + final var scheduler = new EngineScheduler(currentTime, progress.span(), progress.caller(), frame); final var status = progress.state().step(scheduler); // TODO: Report which topics this activity wrote to at this point in time. This is useful insight for any user. @@ -233,8 +210,6 @@ private void stepEffectModel( // Based on the task's return status, update its execution state and schedule its resumption. if (status instanceof TaskStatus.Completed) { - final var children = new LinkedList<>(this.taskChildren.getOrDefault(task, Collections.emptySet())); - // Propagate completion up the span hierarchy. // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. var span = progress.span(); @@ -250,8 +225,7 @@ private void stepEffectModel( span = span$.get(); } - this.tasks.put(task, progress.completedAt(currentTime, children)); - this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime)); + // Notify any blocked caller of our completion. progress.caller().ifPresent($ -> { if (this.subtasks.get($).decrementAndGet() == 0) { this.subtasks.remove($); @@ -271,9 +245,7 @@ private void stepEffectModel( final var target = TaskId.generate(); SimulationEngine.this.spanTasks.put(targetSpan, new MutableInt(1)); - SimulationEngine.this.tasks.put(target, new ExecutionState.InProgress<>(targetSpan, currentTime, Optional.of(task), s.child().create(this.executor))); - SimulationEngine.this.taskParent.put(target, task); - SimulationEngine.this.taskChildren.computeIfAbsent(task, $ -> new HashSet<>()).add(target); + SimulationEngine.this.tasks.put(target, new ExecutionState<>(targetSpan, Optional.of(task), s.child().create(this.executor))); SimulationEngine.this.subtasks.put(task, new MutableInt(1)); frame.signal(JobId.forTask(target)); @@ -290,34 +262,6 @@ private void stepEffectModel( } } - /** Make progress in a task by checking if all of the tasks it's waiting on have completed. */ - private void stepWaitingTask( - final TaskId task, - final ExecutionState.AwaitingChildren awaiting, - final TaskFrame frame, - final Duration currentTime - ) { - // TERMINATION: We break when there are no remaining children, - // and we always remove one if we don't break for other reasons. - while (true) { - if (awaiting.remainingChildren().isEmpty()) { - this.tasks.put(task, awaiting.joinedAt(currentTime)); - frame.signal(JobId.forSignal(SignalId.forTask(task))); - break; - } - - final var nextChild = awaiting.remainingChildren().getFirst(); - if (!(this.tasks.get(nextChild) instanceof ExecutionState.Terminated)) { - this.tasks.put(task, awaiting); - this.waitingTasks.subscribeQuery(task, Set.of(SignalId.forTask(nextChild))); - break; - } - - // This child is complete, so skip checking it next time; move to the next one. - awaiting.remainingChildren().removeFirst(); - } - } - /** Cause any tasks waiting on the given signal to be resumed concurrently with other jobs in the current frame. */ public void stepSignalledTasks(final SignalId signal, final TaskFrame frame) { final var tasks = this.waitingTasks.invalidateTopic(signal); @@ -370,9 +314,7 @@ public void updateResource( @Override public void close() { for (final var task : this.tasks.values()) { - if (task instanceof ExecutionState.InProgress r) { - r.state.release(); - } + task.state().release(); } this.executor.shutdownNow(); @@ -721,20 +663,17 @@ private static Optional min(final Optional a, final Optional /** A handle for processing requests and effects from a modeled task. */ private final class EngineScheduler implements Scheduler { private final Duration currentTime; - private final TaskId activeTask; private final SpanId span; private final Optional caller; private final TaskFrame frame; public EngineScheduler( final Duration currentTime, - final TaskId activeTask, final SpanId span, final Optional caller, final TaskFrame frame) { this.currentTime = Objects.requireNonNull(currentTime); - this.activeTask = Objects.requireNonNull(activeTask); this.span = Objects.requireNonNull(span); this.caller = Objects.requireNonNull(caller); this.frame = Objects.requireNonNull(frame); @@ -769,9 +708,7 @@ public void spawn(final TaskFactory state) { final var task = TaskId.generate(); SimulationEngine.this.spanTasks.put(taskSpan, new MutableInt(1)); - SimulationEngine.this.tasks.put(task, new ExecutionState.InProgress<>(taskSpan, this.currentTime, this.caller, state.create(SimulationEngine.this.executor))); - SimulationEngine.this.taskParent.put(task, this.activeTask); - SimulationEngine.this.taskChildren.computeIfAbsent(this.activeTask, $ -> new HashSet<>()).add(task); + SimulationEngine.this.tasks.put(task, new ExecutionState<>(taskSpan, this.caller, state.create(SimulationEngine.this.executor))); this.caller.ifPresent($ -> SimulationEngine.this.subtasks.get($).increment()); this.frame.signal(JobId.forTask(task)); } @@ -808,42 +745,10 @@ static ConditionJobId forCondition(final ConditionId condition) { } } - /** The lifecycle stages every task passes through. */ - private sealed interface ExecutionState { - /** The task is in its primary operational phase. */ - record InProgress(SpanId span, Duration startOffset, Optional caller, Task state) - implements ExecutionState - { - public AwaitingChildren completedAt( - final Duration endOffset, - final LinkedList remainingChildren) { - return new AwaitingChildren<>(this.span, this.startOffset, endOffset, remainingChildren); - } - - public InProgress continueWith(final Task newState) { - return new InProgress<>(this.span, this.startOffset, this.caller, newState); - } + /** The state of an executing task. */ + private record ExecutionState(SpanId span, Optional caller, Task state) { + public ExecutionState continueWith(final Task newState) { + return new ExecutionState<>(this.span, this.caller, newState); } - - /** The task has completed its primary operation, but has unfinished children. */ - record AwaitingChildren( - SpanId span, - Duration startOffset, - Duration endOffset, - LinkedList remainingChildren - ) implements ExecutionState - { - public Terminated joinedAt(final Duration joinOffset) { - return new Terminated<>(this.span, this.startOffset, this.endOffset, joinOffset); - } - } - - /** The task and all its delegated children have completed. */ - record Terminated( - SpanId span, - Duration startOffset, - Duration endOffset, - Duration joinOffset - ) implements ExecutionState {} } } From ae3295e2b3209ede22554aa6c11b7e3a3bf758fd Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 06:32:51 -0700 Subject: [PATCH 09/26] Remove obsolete TaskSignalId --- .../aerie/merlin/driver/engine/SignalId.java | 18 ------------- .../driver/engine/SimulationEngine.java | 27 +++++++------------ 2 files changed, 10 insertions(+), 35 deletions(-) delete mode 100644 merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SignalId.java diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SignalId.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SignalId.java deleted file mode 100644 index da6057ea63..0000000000 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SignalId.java +++ /dev/null @@ -1,18 +0,0 @@ -package gov.nasa.jpl.aerie.merlin.driver.engine; - -/** A typed wrapper for signal IDs. */ -public sealed interface SignalId { - /** A signal controlled by a task. */ - record TaskSignalId(TaskId id) implements SignalId {} - - /** A signal controlled by a condition. */ - record ConditionSignalId(ConditionId id) implements SignalId {} - - static TaskSignalId forTask(final TaskId task) { - return new TaskSignalId(task); - } - - static ConditionSignalId forCondition(final ConditionId condition) { - return new ConditionSignalId(condition); - } -} diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index f2f43461bd..db33d21454 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -53,8 +53,8 @@ public final class SimulationEngine implements AutoCloseable { /** The set of all jobs waiting for time to pass. */ private final JobSchedule scheduledJobs = new JobSchedule<>(); - /** The set of all jobs waiting on a given signal. */ - private final Subscriptions waitingTasks = new Subscriptions<>(); + /** The set of all jobs waiting on a condition. */ + private final Map waitingTasks = new HashMap<>(); /** The set of conditions depending on a given set of topics. */ private final Subscriptions, ConditionId> waitingConditions = new Subscriptions<>(); /** The set of queries depending on a given set of topics. */ @@ -124,7 +124,7 @@ public void invalidateTopic(final Topic topic, final Duration invalidationTim for (final var condition : conditions) { // If we were going to signal tasks on this condition, well, don't do that. // Schedule the condition to be rechecked ASAP. - this.scheduledJobs.unschedule(JobId.forSignal(SignalId.forCondition(condition))); + this.scheduledJobs.unschedule(JobId.forSignal(condition)); this.scheduledJobs.schedule(JobId.forCondition(condition), SubInstant.Conditions.at(invalidationTime)); } } @@ -138,8 +138,7 @@ public JobSchedule.Batch extractNextJobs(final Duration maximumTime) { // that the condition depends on, in which case we might accidentally schedule an update for a condition // that no longer exists. for (final var job : batch.jobs()) { - if (!(job instanceof JobId.SignalJobId j)) continue; - if (!(j.id() instanceof SignalId.ConditionSignalId s)) continue; + if (!(job instanceof JobId.SignalJobId s)) continue; this.conditions.remove(s.id()); this.waitingConditions.unsubscribeQuery(s.id()); @@ -175,7 +174,7 @@ public void performJob( if (job instanceof JobId.TaskJobId j) { this.stepTask(j.id(), frame, currentTime); } else if (job instanceof JobId.SignalJobId j) { - this.stepSignalledTasks(j.id(), frame); + this.stepTask(this.waitingTasks.remove(j.id()), frame, currentTime); } else if (job instanceof JobId.ConditionJobId j) { this.updateCondition(j.id(), frame, currentTime, maximumTime); } else if (job instanceof JobId.ResourceJobId j) { @@ -256,18 +255,12 @@ private void stepEffectModel( this.scheduledJobs.schedule(JobId.forCondition(condition), SubInstant.Conditions.at(currentTime)); this.tasks.put(task, progress.continueWith(s.continuation())); - this.waitingTasks.subscribeQuery(task, Set.of(SignalId.forCondition(condition))); + this.waitingTasks.put(condition, task); } else { throw new IllegalArgumentException("Unknown subclass of %s: %s".formatted(TaskStatus.class, status)); } } - /** Cause any tasks waiting on the given signal to be resumed concurrently with other jobs in the current frame. */ - public void stepSignalledTasks(final SignalId signal, final TaskFrame frame) { - final var tasks = this.waitingTasks.invalidateTopic(signal); - for (final var task : tasks) frame.signal(JobId.forTask(task)); - } - /** Determine when a condition is next true, and schedule a signal to be raised at that time. */ public void updateCondition( final ConditionId condition, @@ -285,7 +278,7 @@ public void updateCondition( final var expiry = querier.expiry.map(currentTime::plus); if (prediction.isPresent() && (expiry.isEmpty() || prediction.get().shorterThan(expiry.get()))) { - this.scheduledJobs.schedule(JobId.forSignal(SignalId.forCondition(condition)), SubInstant.Tasks.at(prediction.get())); + this.scheduledJobs.schedule(JobId.forSignal(condition), SubInstant.Tasks.at(prediction.get())); } else { // Try checking again later -- where "later" is in some non-zero amount of time! final var nextCheckTime = Duration.max(expiry.orElse(horizonTime), currentTime.plus(Duration.EPSILON)); @@ -719,8 +712,8 @@ public sealed interface JobId { /** A job to step a task. */ record TaskJobId(TaskId id) implements JobId {} - /** A job to step all tasks waiting on a signal. */ - record SignalJobId(SignalId id) implements JobId {} + /** A job to resume a task blocked on a condition. */ + record SignalJobId(ConditionId id) implements JobId {} /** A job to query a resource. */ record ResourceJobId(ResourceId id) implements JobId {} @@ -732,7 +725,7 @@ static TaskJobId forTask(final TaskId task) { return new TaskJobId(task); } - static SignalJobId forSignal(final SignalId signal) { + static SignalJobId forSignal(final ConditionId signal) { return new SignalJobId(signal); } From 26ea90a2f5cf862f05e2664207d809dc516dddd1 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 06:42:59 -0700 Subject: [PATCH 10/26] Reorganize fields and declarations --- .../driver/engine/SimulationEngine.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index db33d21454..8f22a0a530 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -55,6 +55,8 @@ public final class SimulationEngine implements AutoCloseable { private final JobSchedule scheduledJobs = new JobSchedule<>(); /** The set of all jobs waiting on a condition. */ private final Map waitingTasks = new HashMap<>(); + /** The set of all tasks blocked on some number of subtasks. */ + private final Map subtasks = new HashMap<>(); /** The set of conditions depending on a given set of topics. */ private final Subscriptions, ConditionId> waitingConditions = new Subscriptions<>(); /** The set of queries depending on a given set of topics. */ @@ -66,26 +68,12 @@ public final class SimulationEngine implements AutoCloseable { private final Map conditions = new HashMap<>(); /** The profiling state for each tracked resource. */ private final Map> resources = new HashMap<>(); - /** The number of subtasks remaining to complete before a task may resume. */ - private final Map subtasks = new HashMap<>(); + /** The set of all spans of work contributed to by modeled tasks. */ private final Map spans = new HashMap<>(); /** A count of the remaining live tasks (and other spans) under each span. */ private final Map spanTasks = new HashMap<>(); - /** The span of time over which a subtree of tasks has acted. */ - public record Span(Optional parent, Duration startOffset, Optional endOffset) { - /** Close out a span, marking it as inactive past the given time. */ - public Span close(final Duration endOffset) { - if (this.endOffset.isPresent()) throw new Error("Attempt to close an already-closed span"); - return new Span(this.parent, this.startOffset, Optional.of(endOffset)); - } - - public Optional duration() { - return this.endOffset.map($ -> $.minus(this.startOffset)); - } - } - /** A thread pool that modeled tasks can use to keep track of their state between steps. */ private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); @@ -744,4 +732,17 @@ public ExecutionState continueWith(final Task newState) { return new ExecutionState<>(this.span, this.caller, newState); } } + + /** The span of time over which a subtree of tasks has acted. */ + public record Span(Optional parent, Duration startOffset, Optional endOffset) { + /** Close out a span, marking it as inactive past the given time. */ + public Span close(final Duration endOffset) { + if (this.endOffset.isPresent()) throw new Error("Attempt to close an already-closed span"); + return new Span(this.parent, this.startOffset, Optional.of(endOffset)); + } + + public Optional duration() { + return this.endOffset.map($ -> $.minus(this.startOffset)); + } + } } From 7d10097012d334277e61dd53f8f874a7187daff5 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 06:45:00 -0700 Subject: [PATCH 11/26] Remove `isSpanComplete` --- .../nasa/jpl/aerie/merlin/driver/SimulationDriver.java | 2 +- .../jpl/aerie/merlin/driver/engine/SimulationEngine.java | 9 ++++----- .../scheduler/simulation/ResumableSimulationDriver.java | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java index fc906e3568..0859608108 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java @@ -169,7 +169,7 @@ void simulateTask(final MissionModel missionModel, final TaskFactory spanToPlannedDirective, Map input, @@ -744,5 +739,9 @@ public Span close(final Duration endOffset) { public Optional duration() { return this.endOffset.map($ -> $.minus(this.startOffset)); } + + public boolean isComplete() { + return this.endOffset.isPresent(); + } } } diff --git a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java index a8305913ec..a214abb44a 100644 --- a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java +++ b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java @@ -317,7 +317,7 @@ private void simulateSchedule(final Map if (!plannedDirectiveToTask.isEmpty() && plannedDirectiveToTask .values() .stream() - .allMatch(engine::isSpanComplete)) { + .allMatch($ -> engine.getSpan($).isComplete())) { allTaskFinished = true; } @@ -348,7 +348,7 @@ private Set getSuccessorsToSchedule(final SimulationEngine final var iterator = toCheckForDependencyScheduling.entrySet().iterator(); while(iterator.hasNext()){ final var taskToCheck = iterator.next(); - if(engine.isSpanComplete(taskToCheck.getValue())){ + if(engine.getSpan(taskToCheck.getValue()).isComplete()){ toSchedule.add(taskToCheck.getKey()); iterator.remove(); } From 37724846a39260d11c57fab7bdf9b7a016c162a1 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 06:46:24 -0700 Subject: [PATCH 12/26] Rename `subtasks` to `blockedTasks` --- .../aerie/merlin/driver/engine/SimulationEngine.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 549c430960..c3b4585899 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -56,7 +56,7 @@ public final class SimulationEngine implements AutoCloseable { /** The set of all jobs waiting on a condition. */ private final Map waitingTasks = new HashMap<>(); /** The set of all tasks blocked on some number of subtasks. */ - private final Map subtasks = new HashMap<>(); + private final Map blockedTasks = new HashMap<>(); /** The set of conditions depending on a given set of topics. */ private final Subscriptions, ConditionId> waitingConditions = new Subscriptions<>(); /** The set of queries depending on a given set of topics. */ @@ -214,8 +214,8 @@ private void stepEffectModel( // Notify any blocked caller of our completion. progress.caller().ifPresent($ -> { - if (this.subtasks.get($).decrementAndGet() == 0) { - this.subtasks.remove($); + if (this.blockedTasks.get($).decrementAndGet() == 0) { + this.blockedTasks.remove($); this.scheduledJobs.schedule(JobId.forTask($), SubInstant.Tasks.at(currentTime)); } }); @@ -233,7 +233,7 @@ private void stepEffectModel( final var target = TaskId.generate(); SimulationEngine.this.spanTasks.put(targetSpan, new MutableInt(1)); SimulationEngine.this.tasks.put(target, new ExecutionState<>(targetSpan, Optional.of(task), s.child().create(this.executor))); - SimulationEngine.this.subtasks.put(task, new MutableInt(1)); + SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); frame.signal(JobId.forTask(target)); this.tasks.put(task, progress.continueWith(s.continuation())); @@ -685,7 +685,7 @@ public void spawn(final TaskFactory state) { final var task = TaskId.generate(); SimulationEngine.this.spanTasks.put(taskSpan, new MutableInt(1)); SimulationEngine.this.tasks.put(task, new ExecutionState<>(taskSpan, this.caller, state.create(SimulationEngine.this.executor))); - this.caller.ifPresent($ -> SimulationEngine.this.subtasks.get($).increment()); + this.caller.ifPresent($ -> SimulationEngine.this.blockedTasks.get($).increment()); this.frame.signal(JobId.forTask(task)); } } From 4e134063b49f160a54bbc6770ecd3554f33a7c6b Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 07:56:13 -0700 Subject: [PATCH 13/26] Add an ability for tasks to descend into new spans --- .../driver/engine/SimulationEngine.java | 58 ++++++++++++++----- .../merlin/framework/ThreadedTaskTest.java | 10 ++++ .../merlin/protocol/driver/Scheduler.java | 4 ++ 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index c3b4585899..b951a2fa52 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -86,7 +86,7 @@ public SpanId scheduleTask(final Duration startTime, final TaskFactory< final var task = TaskId.generate(); this.spanTasks.put(span, new MutableInt(1)); - this.tasks.put(task, new ExecutionState<>(span, Optional.empty(), state.create(this.executor))); + this.tasks.put(task, new ExecutionState<>(span, 0, Optional.empty(), state.create(this.executor))); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime)); return span; @@ -189,7 +189,7 @@ private void stepEffectModel( final Duration currentTime ) { // Step the modeling state forward. - final var scheduler = new EngineScheduler(currentTime, progress.span(), progress.caller(), frame); + final var scheduler = new EngineScheduler(currentTime, progress.shadowedSpans(), progress.span(), progress.caller(), frame); final var status = progress.state().step(scheduler); // TODO: Report which topics this activity wrote to at this point in time. This is useful insight for any user. @@ -222,7 +222,7 @@ private void stepEffectModel( } else if (status instanceof TaskStatus.Delayed s) { if (s.delay().isNegative()) throw new IllegalArgumentException("Cannot schedule a task in the past"); - this.tasks.put(task, progress.continueWith(s.continuation())); + this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); } else if (status instanceof TaskStatus.CallingTask s) { // TODO: Not *every* task should get a new span. Allow the model to decide where spans go. @@ -232,17 +232,17 @@ private void stepEffectModel( final var target = TaskId.generate(); SimulationEngine.this.spanTasks.put(targetSpan, new MutableInt(1)); - SimulationEngine.this.tasks.put(target, new ExecutionState<>(targetSpan, Optional.of(task), s.child().create(this.executor))); + SimulationEngine.this.tasks.put(target, new ExecutionState<>(targetSpan, 0, Optional.of(task), s.child().create(this.executor))); SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); frame.signal(JobId.forTask(target)); - this.tasks.put(task, progress.continueWith(s.continuation())); + this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); } else if (status instanceof TaskStatus.AwaitingCondition s) { final var condition = ConditionId.generate(); this.conditions.put(condition, s.condition()); this.scheduledJobs.schedule(JobId.forCondition(condition), SubInstant.Conditions.at(currentTime)); - this.tasks.put(task, progress.continueWith(s.continuation())); + this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); this.waitingTasks.put(condition, task); } else { throw new IllegalArgumentException("Unknown subclass of %s: %s".formatted(TaskStatus.class, status)); @@ -639,17 +639,20 @@ private static Optional min(final Optional a, final Optional /** A handle for processing requests and effects from a modeled task. */ private final class EngineScheduler implements Scheduler { private final Duration currentTime; - private final SpanId span; + private int shadowedSpans; + private SpanId span; private final Optional caller; private final TaskFrame frame; public EngineScheduler( final Duration currentTime, + final int shadowedSpans, final SpanId span, final Optional caller, final TaskFrame frame) { this.currentTime = Objects.requireNonNull(currentTime); + this.shadowedSpans = shadowedSpans; this.span = Objects.requireNonNull(span); this.caller = Objects.requireNonNull(caller); this.frame = Objects.requireNonNull(frame); @@ -680,14 +683,43 @@ public void spawn(final TaskFactory state) { // TODO: Not *every* task should get a new span. Allow the model to decide where spans go. final var taskSpan = SpanId.generate(); SimulationEngine.this.spans.put(taskSpan, new Span(Optional.of(this.span), this.currentTime, Optional.empty())); - SimulationEngine.this.spanTasks.get(this.span).increment(); + SimulationEngine.this.spanTasks.put(taskSpan, new MutableInt(1)); final var task = TaskId.generate(); - SimulationEngine.this.spanTasks.put(taskSpan, new MutableInt(1)); - SimulationEngine.this.tasks.put(task, new ExecutionState<>(taskSpan, this.caller, state.create(SimulationEngine.this.executor))); + SimulationEngine.this.spanTasks.get(this.span).increment(); + SimulationEngine.this.tasks.put(task, new ExecutionState<>(taskSpan, 0, this.caller, state.create(SimulationEngine.this.executor))); this.caller.ifPresent($ -> SimulationEngine.this.blockedTasks.get($).increment()); this.frame.signal(JobId.forTask(task)); } + + @Override + public void pushSpan() { + final var parentSpan = this.span; + this.shadowedSpans += 1; + this.span = SpanId.generate(); + + SimulationEngine.this.spans.put(this.span, new Span(Optional.of(parentSpan), this.currentTime, Optional.empty())); + SimulationEngine.this.spanTasks.put(this.span, new MutableInt(1)); + + SimulationEngine.this.spanTasks.get(parentSpan).increment(); + } + + @Override + public void popSpan() { + // TODO: Do we want to throw an error instead? + if (this.shadowedSpans == 0) return; + + if (SimulationEngine.this.spanTasks.get(this.span).decrementAndGet() == 0) { + SimulationEngine.this.spanTasks.remove(this.span); + SimulationEngine.this.spans.compute(this.span, (_id, $) -> $.close(currentTime)); + } + // NOTE: We don't need to propagate completion any further, because the next shadowed span + // has by definition not been completed: this task may still contribute to it, and this task + // has not terminated. + + this.shadowedSpans -= 1; + this.span = SimulationEngine.this.spans.get(this.span).parent().orElseThrow(); + } } /** A representation of a job processable by the {@link SimulationEngine}. */ @@ -722,9 +754,9 @@ static ConditionJobId forCondition(final ConditionId condition) { } /** The state of an executing task. */ - private record ExecutionState(SpanId span, Optional caller, Task state) { - public ExecutionState continueWith(final Task newState) { - return new ExecutionState<>(this.span, this.caller, newState); + private record ExecutionState(SpanId span, int shadowedSpans, Optional caller, Task state) { + public ExecutionState continueWith(final SpanId span, final int shadowedSpans, final Task newState) { + return new ExecutionState<>(span, shadowedSpans, this.caller, newState); } } diff --git a/merlin-framework/src/test/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTaskTest.java b/merlin-framework/src/test/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTaskTest.java index ac3942dcd0..b961a41be8 100644 --- a/merlin-framework/src/test/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTaskTest.java +++ b/merlin-framework/src/test/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTaskTest.java @@ -31,6 +31,16 @@ public void emit(final Event event, final Topic topic) { public void spawn(final TaskFactory task) { throw new UnsupportedOperationException(); } + + @Override + public void pushSpan() { + throw new UnsupportedOperationException(); + } + + @Override + public void popSpan() { + throw new UnsupportedOperationException(); + } }; final var pool = Executors.newCachedThreadPool(); diff --git a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/driver/Scheduler.java b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/driver/Scheduler.java index 035658c136..c788a508d5 100644 --- a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/driver/Scheduler.java +++ b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/driver/Scheduler.java @@ -8,4 +8,8 @@ public interface Scheduler { void emit(Event event, Topic topic); void spawn(TaskFactory task); + + void pushSpan(); + + void popSpan(); } From 2df4088f473e594de188296e5bc5ba37f15fd502 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 08:20:27 -0700 Subject: [PATCH 14/26] Wrap every directive instance in the scope of a new span --- .../generator/MissionModelGenerator.java | 5 ++++- .../nasa/jpl/aerie/merlin/framework/Context.java | 2 ++ .../merlin/framework/InitializationContext.java | 10 ++++++++++ .../jpl/aerie/merlin/framework/ModelActions.java | 16 ++++++++++++++++ .../jpl/aerie/merlin/framework/QueryContext.java | 10 ++++++++++ .../framework/ReplayingReactionContext.java | 14 ++++++++++++++ .../framework/ThreadedReactionContext.java | 10 ++++++++++ 7 files changed, 66 insertions(+), 1 deletion(-) diff --git a/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java b/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java index be17603c33..08af87e754 100644 --- a/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java +++ b/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java @@ -808,12 +808,14 @@ public Optional generateActivityMapper(final MissionModelRecord missio .map(effectModel -> CodeBlock .builder() .add( - "return $T.$L(() -> {$>\n$L$<});\n", + "return $T.$L(() -> $T.$L(() -> {$>\n$L$<}));\n", ModelActions.class, switch (effectModel.executor()) { case Threaded -> "threaded"; case Replaying -> "replaying"; }, + ModelActions.class, + "scoped", effectModel.returnType() .map(returnType -> CodeBlock .builder() @@ -835,6 +837,7 @@ public Optional generateActivityMapper(final MissionModelRecord missio .add( "return executor -> scheduler -> {$>\n$L$<};\n", CodeBlock.builder() + .addStatement("scheduler.pushSpan()") .addStatement("scheduler.emit($L, this.$L)", "activity", "inputTopic") .addStatement("scheduler.emit($T.UNIT, this.$L)", Unit.class, "outputTopic") .addStatement("return $T.completed($T.UNIT)", TaskStatus.class, Unit.class) diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java index 6738237a37..65a457d1b4 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java @@ -31,6 +31,8 @@ enum ContextType { Initializing, Reacting, Querying } void spawn(TaskFactory task); void call(TaskFactory task); + void pushSpan(); + void popSpan(); void delay(Duration duration); void waitUntil(Condition condition); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java index 9615830a69..5529a6dd6d 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java @@ -61,6 +61,16 @@ public void call(final TaskFactory task) { throw new IllegalStateException("Cannot yield during initialization"); } + @Override + public void pushSpan() { + // Do nothing. + } + + @Override + public void popSpan() { + // Do nothing. + } + @Override public void delay(final Duration duration) { throw new IllegalStateException("Cannot yield during initialization"); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java index 1c9019f328..bdbbabd3db 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java @@ -36,6 +36,22 @@ public static TaskFactory replaying(final Runnable task) { }); } + public static T scoped(final Supplier block) { + context.get().pushSpan(); + try { + return block.get(); + } finally { + context.get().popSpan(); + } + } + + public static void scoped(final Runnable block) { + scoped(() -> { + block.run(); + return Unit.UNIT; + }); + } + public static void emit(final T event, final Topic topic) { context.get().emit(event, topic); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java index 2425a9ab77..e2c0bf169f 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java @@ -52,6 +52,16 @@ public void call(final TaskFactory task) { throw new IllegalStateException("Cannot schedule tasks in a query-only context"); } + @Override + public void pushSpan() { + // Do nothing. + } + + @Override + public void popSpan() { + // Do nothing. + } + @Override public void delay(final Duration duration) { throw new IllegalStateException("Cannot yield in a query-only context"); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java index e9998ff256..e6e8d5bbfb 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java @@ -78,6 +78,20 @@ public void call(final TaskFactory task) { }); } + @Override + public void pushSpan() { + this.memory.doOnce(() -> { + this.scheduler.pushSpan(); + }); + } + + @Override + public void popSpan() { + this.memory.doOnce(() -> { + this.scheduler.popSpan(); + }); + } + @Override public void delay(final Duration duration) { this.memory.doOnce(() -> { diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java index 3edd568d9e..0a4984bda0 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java @@ -63,6 +63,16 @@ public void call(final TaskFactory task) { this.scheduler = this.handle.call(task); } + @Override + public void pushSpan() { + this.scheduler.pushSpan(); + } + + @Override + public void popSpan() { + this.scheduler.popSpan(); + } + @Override public void delay(final Duration duration) { this.scheduler = null; // Relinquish the current scheduler before yielding, in case an exception is thrown. From 65b9774d08bebe46cd1b4c4390f02a4ac9e61435 Mon Sep 17 00:00:00 2001 From: Jonathan Castello Date: Wed, 6 Sep 2023 08:24:53 -0700 Subject: [PATCH 15/26] Remove spans around regular spawns and calls --- .../merlin/driver/engine/SimulationEngine.java | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index b951a2fa52..0999200466 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -225,14 +225,9 @@ private void stepEffectModel( this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); } else if (status instanceof TaskStatus.CallingTask s) { - // TODO: Not *every* task should get a new span. Allow the model to decide where spans go. - final var targetSpan = SpanId.generate(); - SimulationEngine.this.spans.put(targetSpan, new Span(Optional.of(progress.span()), currentTime, Optional.empty())); - SimulationEngine.this.spanTasks.get(progress.span()).increment(); - final var target = TaskId.generate(); - SimulationEngine.this.spanTasks.put(targetSpan, new MutableInt(1)); - SimulationEngine.this.tasks.put(target, new ExecutionState<>(targetSpan, 0, Optional.of(task), s.child().create(this.executor))); + SimulationEngine.this.spanTasks.put(scheduler.span, new MutableInt(1)); + SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor))); SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); frame.signal(JobId.forTask(target)); @@ -680,14 +675,9 @@ public void emit(final EventType event, final Topic topic @Override public void spawn(final TaskFactory state) { - // TODO: Not *every* task should get a new span. Allow the model to decide where spans go. - final var taskSpan = SpanId.generate(); - SimulationEngine.this.spans.put(taskSpan, new Span(Optional.of(this.span), this.currentTime, Optional.empty())); - SimulationEngine.this.spanTasks.put(taskSpan, new MutableInt(1)); - final var task = TaskId.generate(); SimulationEngine.this.spanTasks.get(this.span).increment(); - SimulationEngine.this.tasks.put(task, new ExecutionState<>(taskSpan, 0, this.caller, state.create(SimulationEngine.this.executor))); + SimulationEngine.this.tasks.put(task, new ExecutionState<>(this.span, 0, this.caller, state.create(SimulationEngine.this.executor))); this.caller.ifPresent($ -> SimulationEngine.this.blockedTasks.get($).increment()); this.frame.signal(JobId.forTask(task)); } From 009127eb28a595acd7d4ee60a46440e8ce78da0d Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Thu, 21 Mar 2024 09:50:17 -0700 Subject: [PATCH 16/26] Fix span counting behavior --- .../driver/engine/SimulationEngine.java | 72 +++++++++++-------- 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 0999200466..9ba8279743 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -226,7 +226,7 @@ private void stepEffectModel( this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); } else if (status instanceof TaskStatus.CallingTask s) { final var target = TaskId.generate(); - SimulationEngine.this.spanTasks.put(scheduler.span, new MutableInt(1)); + SimulationEngine.this.spanTasks.get(scheduler.span).increment(); SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor))); SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); frame.signal(JobId.forTask(target)); @@ -309,6 +309,14 @@ public boolean isActivity(final SpanId id) { return this.input.containsKey(id); } + public boolean isDirective(SpanId id) { + return this.spanToPlannedDirective.containsKey(id); + } + + public ActivityDirectiveId getDirective(SpanId id) { + return this.spanToPlannedDirective.get(id); + } + public record Trait(Iterable> topics, Topic activityTopic) implements EffectTrait> { @Override public Consumer empty() { @@ -428,50 +436,54 @@ public static SimulationResults computeResults( } } - - // Give every task corresponding to a child activity an ID that doesn't conflict with any root activity. - final var spanToSimulatedActivityId = new HashMap(spanInfo.spanToPlannedDirective.size()); - final var usedSimulatedActivityIds = new HashSet<>(); - for (final var entry : spanInfo.spanToPlannedDirective.entrySet()) { - spanToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); - usedSimulatedActivityIds.add(entry.getValue().id()); - } - long counter = 1L; - for (final var span : engine.spans.keySet()) { - if (!spanInfo.isActivity(span)) continue; - if (spanToSimulatedActivityId.containsKey(span)) continue; - - while (usedSimulatedActivityIds.contains(counter)) counter++; - spanToSimulatedActivityId.put(span, new SimulatedActivityId(counter++)); - } - // Identify the nearest ancestor *activity* (excluding intermediate anonymous tasks). - final var activityParents = new HashMap(); + final var activityParents = new HashMap(); + final var activityDirectiveIds = new HashMap(); engine.spans.forEach((span, state) -> { if (!spanInfo.isActivity(span)) return; var parent = state.parent(); - while (parent.isPresent() && !spanInfo.isActivity(parent.get())) { + while (parent.isPresent() && !spanInfo.isActivity(parent.get()) && !spanInfo.isDirective(parent.get())) { parent = engine.spans.get(parent.get()).parent(); } if (parent.isPresent()) { - activityParents.put(spanToSimulatedActivityId.get(span), spanToSimulatedActivityId.get(parent.get())); + if (spanInfo.isActivity(parent.get())) { + activityParents.put(span, parent.get()); + } else if (spanInfo.isDirective(parent.get())) { + activityDirectiveIds.put(span, spanInfo.getDirective(parent.get())); + } } }); - final var activityChildren = new HashMap>(); + final var activityChildren = new HashMap>(); activityParents.forEach((activity, parent) -> { activityChildren.computeIfAbsent(parent, $ -> new LinkedList<>()).add(activity); }); + // Give every task corresponding to a child activity an ID that doesn't conflict with any root activity. + final var spanToSimulatedActivityId = new HashMap(activityDirectiveIds.size()); + final var usedSimulatedActivityIds = new HashSet<>(); + for (final var entry : activityDirectiveIds.entrySet()) { + spanToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); + usedSimulatedActivityIds.add(entry.getValue().id()); + } + long counter = 1L; + for (final var span : engine.spans.keySet()) { + if (!spanInfo.isActivity(span)) continue; + if (spanToSimulatedActivityId.containsKey(span)) continue; + + while (usedSimulatedActivityIds.contains(counter)) counter++; + spanToSimulatedActivityId.put(span, new SimulatedActivityId(counter++)); + } + final var simulatedActivities = new HashMap(); final var unfinishedActivities = new HashMap(); engine.spans.forEach((span, state) -> { if (!spanInfo.isActivity(span)) return; final var activityId = spanToSimulatedActivityId.get(span); - final var directiveId = spanInfo.spanToPlannedDirective.get(span); // will be null for non-directives + final var directiveId = activityDirectiveIds.get(span); if (state.endOffset().isPresent()) { final var inputAttributes = spanInfo.input().get(span); @@ -482,9 +494,9 @@ public static SimulationResults computeResults( inputAttributes.getArguments(), startTime.plus(state.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), state.endOffset().get().minus(state.startOffset()), - activityParents.get(activityId), - activityChildren.getOrDefault(activityId, Collections.emptyList()), - (activityParents.containsKey(activityId)) ? Optional.empty() : Optional.of(directiveId), + spanToSimulatedActivityId.get(activityParents.get(span)), + activityChildren.getOrDefault(span, Collections.emptyList()).stream().map(spanToSimulatedActivityId::get).toList(), + (activityParents.containsKey(span)) ? Optional.empty() : Optional.of(directiveId), outputAttributes )); } else { @@ -493,9 +505,9 @@ public static SimulationResults computeResults( inputAttributes.getTypeName(), inputAttributes.getArguments(), startTime.plus(state.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), - activityParents.get(activityId), - activityChildren.getOrDefault(activityId, Collections.emptyList()), - (activityParents.containsKey(activityId)) ? Optional.empty() : Optional.of(directiveId) + spanToSimulatedActivityId.get(activityParents.get(span)), + activityChildren.getOrDefault(span, Collections.emptyList()).stream().map(spanToSimulatedActivityId::get).toList(), + (activityParents.containsKey(span)) ? Optional.empty() : Optional.of(directiveId) )); } }); @@ -690,8 +702,6 @@ public void pushSpan() { SimulationEngine.this.spans.put(this.span, new Span(Optional.of(parentSpan), this.currentTime, Optional.empty())); SimulationEngine.this.spanTasks.put(this.span, new MutableInt(1)); - - SimulationEngine.this.spanTasks.get(parentSpan).increment(); } @Override From ef675db0ddc856ba4d185668a52ca7b2ba2e09d9 Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Thu, 21 Mar 2024 09:51:18 -0700 Subject: [PATCH 17/26] Rename spanTasks to spanContributors --- .../driver/engine/SimulationEngine.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 9ba8279743..528baa5fd0 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -71,8 +71,8 @@ public final class SimulationEngine implements AutoCloseable { /** The set of all spans of work contributed to by modeled tasks. */ private final Map spans = new HashMap<>(); - /** A count of the remaining live tasks (and other spans) under each span. */ - private final Map spanTasks = new HashMap<>(); + /** A count of the direct contributors to each span, including child spans and tasks. */ + private final Map spanContributors = new HashMap<>(); /** A thread pool that modeled tasks can use to keep track of their state between steps. */ private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); @@ -85,7 +85,7 @@ public SpanId scheduleTask(final Duration startTime, final TaskFactory< this.spans.put(span, new Span(Optional.empty(), startTime, Optional.empty())); final var task = TaskId.generate(); - this.spanTasks.put(span, new MutableInt(1)); + this.spanContributors.put(span, new MutableInt(1)); this.tasks.put(task, new ExecutionState<>(span, 0, Optional.empty(), state.create(this.executor))); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime)); @@ -201,8 +201,8 @@ private void stepEffectModel( // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. var span = progress.span(); while (true) { - if (this.spanTasks.get(span).decrementAndGet() > 0) break; - this.spanTasks.remove(span); + if (this.spanContributors.get(span).decrementAndGet() > 0) break; + this.spanContributors.remove(span); this.spans.compute(span, (_id, $) -> $.close(currentTime)); @@ -226,7 +226,7 @@ private void stepEffectModel( this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); } else if (status instanceof TaskStatus.CallingTask s) { final var target = TaskId.generate(); - SimulationEngine.this.spanTasks.get(scheduler.span).increment(); + SimulationEngine.this.spanContributors.get(scheduler.span).increment(); SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor))); SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); frame.signal(JobId.forTask(target)); @@ -688,7 +688,7 @@ public void emit(final EventType event, final Topic topic @Override public void spawn(final TaskFactory state) { final var task = TaskId.generate(); - SimulationEngine.this.spanTasks.get(this.span).increment(); + SimulationEngine.this.spanContributors.get(this.span).increment(); SimulationEngine.this.tasks.put(task, new ExecutionState<>(this.span, 0, this.caller, state.create(SimulationEngine.this.executor))); this.caller.ifPresent($ -> SimulationEngine.this.blockedTasks.get($).increment()); this.frame.signal(JobId.forTask(task)); @@ -701,7 +701,7 @@ public void pushSpan() { this.span = SpanId.generate(); SimulationEngine.this.spans.put(this.span, new Span(Optional.of(parentSpan), this.currentTime, Optional.empty())); - SimulationEngine.this.spanTasks.put(this.span, new MutableInt(1)); + SimulationEngine.this.spanContributors.put(this.span, new MutableInt(1)); } @Override @@ -709,8 +709,8 @@ public void popSpan() { // TODO: Do we want to throw an error instead? if (this.shadowedSpans == 0) return; - if (SimulationEngine.this.spanTasks.get(this.span).decrementAndGet() == 0) { - SimulationEngine.this.spanTasks.remove(this.span); + if (SimulationEngine.this.spanContributors.get(this.span).decrementAndGet() == 0) { + SimulationEngine.this.spanContributors.remove(this.span); SimulationEngine.this.spans.compute(this.span, (_id, $) -> $.close(currentTime)); } // NOTE: We don't need to propagate completion any further, because the next shadowed span From 0ee175a77d07f3d3eb2af5c64787da9f10a4582b Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Thu, 21 Mar 2024 09:51:41 -0700 Subject: [PATCH 18/26] Update implementations of makeTaskFactory to push and pop spans --- .../java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java | 2 ++ .../gov/nasa/jpl/aerie/merlin/driver/AnchorSimulationTest.java | 2 ++ .../aerie/scheduler/simulation/ResumableSimulationDriver.java | 1 + .../jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java | 2 ++ 4 files changed, 7 insertions(+) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java index 0859608108..bf11a8bac4 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java @@ -233,9 +233,11 @@ private static TaskFactory makeTaskFactory( { // Emit the current activity (defined by directiveId) return executor -> scheduler0 -> TaskStatus.calling((TaskFactory) (executor1 -> scheduler1 -> { + scheduler1.pushSpan(); scheduler1.emit(directiveId, activityTopic); return task.create(executor1).step(scheduler1); }), scheduler2 -> { + scheduler2.popSpan(); // When the current activity finishes, get the list of the activities that needed this activity to finish to know their start time final List> dependents = resolved.get(directiveId) == null ? List.of() : resolved.get(directiveId); // Iterate over the dependents diff --git a/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/AnchorSimulationTest.java b/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/AnchorSimulationTest.java index fac9709efb..748256a98b 100644 --- a/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/AnchorSimulationTest.java +++ b/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/AnchorSimulationTest.java @@ -1083,6 +1083,7 @@ public OutputType getOutputType() { @Override public TaskFactory getTaskFactory(final Object o, final Object o2) { return executor -> $ -> { + $.pushSpan(); $.emit(this, delayedActivityDirectiveInputTopic); return TaskStatus.delayed(oneMinute, $$ -> { $$.emit(Unit.UNIT, delayedActivityDirectiveOutputTopic); @@ -1108,6 +1109,7 @@ public OutputType getOutputType() { @Override public TaskFactory getTaskFactory(final Object o, final Object o2) { return executor -> scheduler -> { + scheduler.pushSpan(); scheduler.emit(this, decomposingActivityDirectiveInputTopic); return TaskStatus.delayed( Duration.ZERO, diff --git a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java index a214abb44a..5c6112af55 100644 --- a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java +++ b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java @@ -395,6 +395,7 @@ private static TaskFactory makeTaskFactory( final TaskFactory task, final Topic activityTopic) { return executor -> scheduler -> { + scheduler.pushSpan(); scheduler.emit(directiveId, activityTopic); return task.create(executor).step(scheduler); }; diff --git a/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java b/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java index 24f668ef9f..c419caedd3 100644 --- a/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java +++ b/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java @@ -666,6 +666,7 @@ public OutputType getOutputType() { @Override public TaskFactory getTaskFactory(final Object o, final Object o2) { return executor -> $ -> { + $.pushSpan(); $.emit(this, delayedActivityDirectiveInputTopic); return TaskStatus.delayed(oneMinute, $$ -> { $$.emit(Unit.UNIT, delayedActivityDirectiveOutputTopic); @@ -691,6 +692,7 @@ public OutputType getOutputType() { @Override public TaskFactory getTaskFactory(final Object o, final Object o2) { return executor -> scheduler -> { + scheduler.pushSpan(); scheduler.emit(this, decomposingActivityDirectiveInputTopic); return TaskStatus.delayed( Duration.ZERO, From 6a51f16a1d9a16536aac04db71b1ebe3cae899d0 Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Thu, 21 Mar 2024 12:25:38 -0700 Subject: [PATCH 19/26] Use scheduler.span instead of progress.span The span that a task contributes to can, and will change across a step. --- .../nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 528baa5fd0..6e5c503f60 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -199,7 +199,7 @@ private void stepEffectModel( if (status instanceof TaskStatus.Completed) { // Propagate completion up the span hierarchy. // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. - var span = progress.span(); + var span = scheduler.span; while (true) { if (this.spanContributors.get(span).decrementAndGet() > 0) break; this.spanContributors.remove(span); From 9a212877458fb1de8339ba8a0dbaabafdb69ecdd Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Tue, 26 Mar 2024 20:13:07 -0700 Subject: [PATCH 20/26] Use set-based implementation as an oracle --- .../driver/engine/SimulationEngine.java | 194 ++++++++++++++++-- 1 file changed, 177 insertions(+), 17 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 6e5c503f60..29cf8664e5 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -36,6 +36,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -72,7 +74,8 @@ public final class SimulationEngine implements AutoCloseable { /** The set of all spans of work contributed to by modeled tasks. */ private final Map spans = new HashMap<>(); /** A count of the direct contributors to each span, including child spans and tasks. */ - private final Map spanContributors = new HashMap<>(); + private final Map spanContributorCount = new HashMap<>(); + private final SpanContributors spanContributors = new SpanContributors(); /** A thread pool that modeled tasks can use to keep track of their state between steps. */ private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); @@ -85,10 +88,14 @@ public SpanId scheduleTask(final Duration startTime, final TaskFactory< this.spans.put(span, new Span(Optional.empty(), startTime, Optional.empty())); final var task = TaskId.generate(); - this.spanContributors.put(span, new MutableInt(1)); + this.spanContributors.newSpan(span); + this.spanContributors.addContributor(span, task); + this.spanContributorCount.put(span, new MutableInt(1)); this.tasks.put(task, new ExecutionState<>(span, 0, Optional.empty(), state.create(this.executor))); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime)); + this.spanContributors.checkIntegrity(spanContributorCount); + return span; } @@ -189,7 +196,7 @@ private void stepEffectModel( final Duration currentTime ) { // Step the modeling state forward. - final var scheduler = new EngineScheduler(currentTime, progress.shadowedSpans(), progress.span(), progress.caller(), frame); + final var scheduler = new EngineScheduler(currentTime, progress.shadowedSpans(), progress.span(), task, progress.caller(), frame); final var status = progress.state().step(scheduler); // TODO: Report which topics this activity wrote to at this point in time. This is useful insight for any user. @@ -199,19 +206,45 @@ private void stepEffectModel( if (status instanceof TaskStatus.Completed) { // Propagate completion up the span hierarchy. // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. - var span = scheduler.span; - while (true) { - if (this.spanContributors.get(span).decrementAndGet() > 0) break; - this.spanContributors.remove(span); + { + var span = scheduler.span; + while (true) { + if (this.spanContributorCount.get(span).decrementAndGet() > 0) break; + this.spanContributorCount.remove(span); + + this.spans.compute(span, (_id, $) -> $.close(currentTime)); - this.spans.compute(span, (_id, $) -> $.close(currentTime)); + final var span$ = this.spans.get(span).parent; + if (span$.isEmpty()) break; - final var span$ = this.spans.get(span).parent; - if (span$.isEmpty()) break; + span = span$.get(); + } + } - span = span$.get(); + { + var span = scheduler.span; + this.spanContributors.removeContributor(span, task); + + if (!this.spanContributors.spanHasContributors(span)) { + this.spanContributors.removeSpan(span); + + if (this.spans.get(span).parent.isPresent()) { + var parentSpan = this.spans.get(span).parent.get(); + while (true) { + this.spanContributors.removeContributor(parentSpan, span); + if (this.spanContributors.spanHasContributors(parentSpan)) break; + this.spanContributors.removeSpan(parentSpan); + span = parentSpan; + final var parentSpan$ = this.spans.get(span).parent; + if (parentSpan$.isEmpty()) break; + parentSpan = parentSpan$.get(); + } + } + } } + this.spanContributors.checkIntegrity(this.spanContributorCount); + // Notify any blocked caller of our completion. progress.caller().ifPresent($ -> { if (this.blockedTasks.get($).decrementAndGet() == 0) { @@ -226,12 +259,15 @@ private void stepEffectModel( this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); } else if (status instanceof TaskStatus.CallingTask s) { final var target = TaskId.generate(); - SimulationEngine.this.spanContributors.get(scheduler.span).increment(); + SimulationEngine.this.spanContributorCount.get(scheduler.span).increment(); + SimulationEngine.this.spanContributors.addContributor(scheduler.span, target); SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor))); SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); frame.signal(JobId.forTask(target)); this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); + + SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); } else if (status instanceof TaskStatus.AwaitingCondition s) { final var condition = ConditionId.generate(); this.conditions.put(condition, s.condition()); @@ -648,6 +684,7 @@ private final class EngineScheduler implements Scheduler { private final Duration currentTime; private int shadowedSpans; private SpanId span; + private final TaskId taskId; private final Optional caller; private final TaskFrame frame; @@ -655,12 +692,14 @@ public EngineScheduler( final Duration currentTime, final int shadowedSpans, final SpanId span, + final TaskId taskId, final Optional caller, final TaskFrame frame) { this.currentTime = Objects.requireNonNull(currentTime); this.shadowedSpans = shadowedSpans; this.span = Objects.requireNonNull(span); + this.taskId = Objects.requireNonNull(taskId); this.caller = Objects.requireNonNull(caller); this.frame = Objects.requireNonNull(frame); } @@ -688,10 +727,13 @@ public void emit(final EventType event, final Topic topic @Override public void spawn(final TaskFactory state) { final var task = TaskId.generate(); - SimulationEngine.this.spanContributors.get(this.span).increment(); + SimulationEngine.this.spanContributors.addContributor(this.span, task); + SimulationEngine.this.spanContributorCount.get(this.span).increment(); SimulationEngine.this.tasks.put(task, new ExecutionState<>(this.span, 0, this.caller, state.create(SimulationEngine.this.executor))); this.caller.ifPresent($ -> SimulationEngine.this.blockedTasks.get($).increment()); this.frame.signal(JobId.forTask(task)); + + SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); } @Override @@ -701,24 +743,44 @@ public void pushSpan() { this.span = SpanId.generate(); SimulationEngine.this.spans.put(this.span, new Span(Optional.of(parentSpan), this.currentTime, Optional.empty())); - SimulationEngine.this.spanContributors.put(this.span, new MutableInt(1)); + SimulationEngine.this.spanContributors.removeContributor(parentSpan, this.taskId); + SimulationEngine.this.spanContributors.addContributor(parentSpan, this.span); + SimulationEngine.this.spanContributors.newSpan(span); + SimulationEngine.this.spanContributors.addContributor(span, this.taskId); + SimulationEngine.this.spanContributorCount.put(this.span, new MutableInt(1)); + + SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); } @Override public void popSpan() { + SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); + // TODO: Do we want to throw an error instead? if (this.shadowedSpans == 0) return; + final SpanId parentSpan = SimulationEngine.this.spans.get(this.span).parent().orElseThrow(); - if (SimulationEngine.this.spanContributors.get(this.span).decrementAndGet() == 0) { - SimulationEngine.this.spanContributors.remove(this.span); + SimulationEngine.this.spanContributors.removeContributor(this.span, this.taskId); + + if (SimulationEngine.this.spanContributorCount.get(this.span).decrementAndGet() == 0) { + SimulationEngine.this.spanContributorCount.remove(this.span); SimulationEngine.this.spans.compute(this.span, (_id, $) -> $.close(currentTime)); } + + if (!SimulationEngine.this.spanContributors.spanHasContributors(this.span)) { + SimulationEngine.this.spanContributors.removeContributor(parentSpan, this.span); + SimulationEngine.this.spanContributors.removeSpan(this.span); + } + // NOTE: We don't need to propagate completion any further, because the next shadowed span // has by definition not been completed: this task may still contribute to it, and this task // has not terminated. this.shadowedSpans -= 1; - this.span = SimulationEngine.this.spans.get(this.span).parent().orElseThrow(); + this.span = parentSpan; + SimulationEngine.this.spanContributors.addContributor(this.span, this.taskId); + + SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); } } @@ -776,4 +838,102 @@ public boolean isComplete() { return this.endOffset.isPresent(); } } + + sealed interface TaskOrSpanId { + record Task(TaskId id) implements TaskOrSpanId {} + record Span(SpanId id) implements TaskOrSpanId {} + + static TaskOrSpanId of(SpanId spanId) { + return new Span(spanId); + } + + static TaskOrSpanId of(TaskId taskId) { + return new Task(taskId); + } + } + + public static class SpanContributors { + private final Map> spanContributors; + private final List operations = new ArrayList<>(); + + public SpanContributors() { + this.spanContributors = new LinkedHashMap<>(); + } + + public void newSpan(SpanId span) { + if (this.spanContributors.containsKey(span)) throw new IllegalStateException("New span clashes with existing span"); + this.spanContributors.put(span, new LinkedHashSet<>()); + operations.add("newSpan " + span); + } + + public void removeContributor(SpanId span, TaskId taskId) { + removeContributor(span, TaskOrSpanId.of(taskId)); + } + + public void removeContributor(SpanId span, SpanId spanId) { + removeContributor(span, TaskOrSpanId.of(spanId)); + } + + private void removeContributor(SpanId span, TaskOrSpanId contributor) { + if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span"); + if (!this.spanContributors.get(span).contains(contributor)) throw new IllegalArgumentException("Span did not have this contributor"); + this.spanContributors.get(span).remove(contributor); + operations.add("removeContributor " + span + " " + contributor); + } + + public boolean spanHasContributors(SpanId span) { + if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span"); + return !spanContributors.get(span).isEmpty(); + } + + public void removeSpan(SpanId span) { + if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span"); + if (!this.spanContributors.get(span).isEmpty()) throw new IllegalStateException("Cannot remove span that still has contributors"); + + this.spanContributors.remove(span); + operations.add("removeSpan " + span); + } + + public void checkIntegrity(final Map spanContributorsCount) { + Set contributors = new LinkedHashSet<>(); + for (final var entry : this.spanContributors.entrySet()) { + if (entry.getValue().isEmpty()) throw new IllegalStateException("Empty contributor not removed"); + for (final var contributor : entry.getValue()) { + if (contributors.contains(contributor)) throw new IllegalStateException("Contributor appears twice"); + contributors.add(contributor); + if (!(contributor instanceof TaskOrSpanId.Span c)) continue; + if (!this.spanContributors.containsKey(c.id())) { + throw new IllegalStateException("Orphaned contributor"); + } + } + } + + for (final var entry : spanContributorsCount.entrySet()) { + if (!this.spanContributors.containsKey(entry.getKey())) throw new IllegalArgumentException("Mismatch!"); + if (!entry.getValue().getValue().equals(this.spanContributors.get(entry.getKey()).size())) { + throw new IllegalArgumentException("Mismatch! " + entry.getValue().getValue() + " != " + this.spanContributors.get(entry.getKey()).size()); + } + } + + for (final var key : this.spanContributors.keySet()) { + if (!spanContributorsCount.containsKey(key)) throw new IllegalArgumentException("Mismatch!"); + } + + operations.clear(); + } + + public void addContributor(SpanId span, TaskId contributor) { + addContributor(span, TaskOrSpanId.of(contributor)); + } + + public void addContributor(SpanId span, SpanId contributor) { + addContributor(span, TaskOrSpanId.of(contributor)); + } + + private void addContributor(SpanId span, TaskOrSpanId contributor) { + if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span"); + this.spanContributors.get(span).add(contributor); + operations.add("addContributor " + span + " " + contributor); + } + } } From 256f36d355b76b1c7ea9c15f5a0a17a569d9619a Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Tue, 26 Mar 2024 20:30:43 -0700 Subject: [PATCH 21/26] Increment parent count if child hasn't closed --- .../nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 29cf8664e5..06cad067f0 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -765,6 +765,10 @@ public void popSpan() { if (SimulationEngine.this.spanContributorCount.get(this.span).decrementAndGet() == 0) { SimulationEngine.this.spanContributorCount.remove(this.span); SimulationEngine.this.spans.compute(this.span, (_id, $) -> $.close(currentTime)); + // Parent span contributor count remains constant, because this.span is removed, and this task is added + } else { + // Parent span contributor count increases by one, because this task is added without removing this.span + SimulationEngine.this.spanContributorCount.get(parentSpan).increment(); } if (!SimulationEngine.this.spanContributors.spanHasContributors(this.span)) { From 74c7e7f4194a9294dcfed045f01acaa2b2fa6773 Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Tue, 26 Mar 2024 20:18:55 -0700 Subject: [PATCH 22/26] Remove set-based implementation --- .../driver/engine/SimulationEngine.java | 174 +----------------- 1 file changed, 9 insertions(+), 165 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 06cad067f0..a436dda8f5 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -75,7 +75,6 @@ public final class SimulationEngine implements AutoCloseable { private final Map spans = new HashMap<>(); /** A count of the direct contributors to each span, including child spans and tasks. */ private final Map spanContributorCount = new HashMap<>(); - private final SpanContributors spanContributors = new SpanContributors(); /** A thread pool that modeled tasks can use to keep track of their state between steps. */ private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); @@ -88,14 +87,10 @@ public SpanId scheduleTask(final Duration startTime, final TaskFactory< this.spans.put(span, new Span(Optional.empty(), startTime, Optional.empty())); final var task = TaskId.generate(); - this.spanContributors.newSpan(span); - this.spanContributors.addContributor(span, task); this.spanContributorCount.put(span, new MutableInt(1)); this.tasks.put(task, new ExecutionState<>(span, 0, Optional.empty(), state.create(this.executor))); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime)); - this.spanContributors.checkIntegrity(spanContributorCount); - return span; } @@ -196,7 +191,7 @@ private void stepEffectModel( final Duration currentTime ) { // Step the modeling state forward. - final var scheduler = new EngineScheduler(currentTime, progress.shadowedSpans(), progress.span(), task, progress.caller(), frame); + final var scheduler = new EngineScheduler(currentTime, progress.shadowedSpans(), progress.span(), progress.caller(), frame); final var status = progress.state().step(scheduler); // TODO: Report which topics this activity wrote to at this point in time. This is useful insight for any user. @@ -206,45 +201,19 @@ private void stepEffectModel( if (status instanceof TaskStatus.Completed) { // Propagate completion up the span hierarchy. // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. - { - var span = scheduler.span; - while (true) { - if (this.spanContributorCount.get(span).decrementAndGet() > 0) break; - this.spanContributorCount.remove(span); - - this.spans.compute(span, (_id, $) -> $.close(currentTime)); + var span = scheduler.span; + while (true) { + if (this.spanContributorCount.get(span).decrementAndGet() > 0) break; + this.spanContributorCount.remove(span); - final var span$ = this.spans.get(span).parent; - if (span$.isEmpty()) break; + this.spans.compute(span, (_id, $) -> $.close(currentTime)); - span = span$.get(); - } - } + final var span$ = this.spans.get(span).parent; + if (span$.isEmpty()) break; - { - var span = scheduler.span; - this.spanContributors.removeContributor(span, task); - - if (!this.spanContributors.spanHasContributors(span)) { - this.spanContributors.removeSpan(span); - - if (this.spans.get(span).parent.isPresent()) { - var parentSpan = this.spans.get(span).parent.get(); - while (true) { - this.spanContributors.removeContributor(parentSpan, span); - if (this.spanContributors.spanHasContributors(parentSpan)) break; - this.spanContributors.removeSpan(parentSpan); - span = parentSpan; - final var parentSpan$ = this.spans.get(span).parent; - if (parentSpan$.isEmpty()) break; - parentSpan = parentSpan$.get(); - } - } - } + span = span$.get(); } - this.spanContributors.checkIntegrity(this.spanContributorCount); - // Notify any blocked caller of our completion. progress.caller().ifPresent($ -> { if (this.blockedTasks.get($).decrementAndGet() == 0) { @@ -260,14 +229,11 @@ private void stepEffectModel( } else if (status instanceof TaskStatus.CallingTask s) { final var target = TaskId.generate(); SimulationEngine.this.spanContributorCount.get(scheduler.span).increment(); - SimulationEngine.this.spanContributors.addContributor(scheduler.span, target); SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor))); SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); frame.signal(JobId.forTask(target)); this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); - - SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); } else if (status instanceof TaskStatus.AwaitingCondition s) { final var condition = ConditionId.generate(); this.conditions.put(condition, s.condition()); @@ -684,7 +650,6 @@ private final class EngineScheduler implements Scheduler { private final Duration currentTime; private int shadowedSpans; private SpanId span; - private final TaskId taskId; private final Optional caller; private final TaskFrame frame; @@ -692,14 +657,12 @@ public EngineScheduler( final Duration currentTime, final int shadowedSpans, final SpanId span, - final TaskId taskId, final Optional caller, final TaskFrame frame) { this.currentTime = Objects.requireNonNull(currentTime); this.shadowedSpans = shadowedSpans; this.span = Objects.requireNonNull(span); - this.taskId = Objects.requireNonNull(taskId); this.caller = Objects.requireNonNull(caller); this.frame = Objects.requireNonNull(frame); } @@ -727,13 +690,10 @@ public void emit(final EventType event, final Topic topic @Override public void spawn(final TaskFactory state) { final var task = TaskId.generate(); - SimulationEngine.this.spanContributors.addContributor(this.span, task); SimulationEngine.this.spanContributorCount.get(this.span).increment(); SimulationEngine.this.tasks.put(task, new ExecutionState<>(this.span, 0, this.caller, state.create(SimulationEngine.this.executor))); this.caller.ifPresent($ -> SimulationEngine.this.blockedTasks.get($).increment()); this.frame.signal(JobId.forTask(task)); - - SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); } @Override @@ -743,25 +703,15 @@ public void pushSpan() { this.span = SpanId.generate(); SimulationEngine.this.spans.put(this.span, new Span(Optional.of(parentSpan), this.currentTime, Optional.empty())); - SimulationEngine.this.spanContributors.removeContributor(parentSpan, this.taskId); - SimulationEngine.this.spanContributors.addContributor(parentSpan, this.span); - SimulationEngine.this.spanContributors.newSpan(span); - SimulationEngine.this.spanContributors.addContributor(span, this.taskId); SimulationEngine.this.spanContributorCount.put(this.span, new MutableInt(1)); - - SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); } @Override public void popSpan() { - SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); - // TODO: Do we want to throw an error instead? if (this.shadowedSpans == 0) return; final SpanId parentSpan = SimulationEngine.this.spans.get(this.span).parent().orElseThrow(); - SimulationEngine.this.spanContributors.removeContributor(this.span, this.taskId); - if (SimulationEngine.this.spanContributorCount.get(this.span).decrementAndGet() == 0) { SimulationEngine.this.spanContributorCount.remove(this.span); SimulationEngine.this.spans.compute(this.span, (_id, $) -> $.close(currentTime)); @@ -771,20 +721,12 @@ public void popSpan() { SimulationEngine.this.spanContributorCount.get(parentSpan).increment(); } - if (!SimulationEngine.this.spanContributors.spanHasContributors(this.span)) { - SimulationEngine.this.spanContributors.removeContributor(parentSpan, this.span); - SimulationEngine.this.spanContributors.removeSpan(this.span); - } - // NOTE: We don't need to propagate completion any further, because the next shadowed span // has by definition not been completed: this task may still contribute to it, and this task // has not terminated. this.shadowedSpans -= 1; this.span = parentSpan; - SimulationEngine.this.spanContributors.addContributor(this.span, this.taskId); - - SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount); } } @@ -842,102 +784,4 @@ public boolean isComplete() { return this.endOffset.isPresent(); } } - - sealed interface TaskOrSpanId { - record Task(TaskId id) implements TaskOrSpanId {} - record Span(SpanId id) implements TaskOrSpanId {} - - static TaskOrSpanId of(SpanId spanId) { - return new Span(spanId); - } - - static TaskOrSpanId of(TaskId taskId) { - return new Task(taskId); - } - } - - public static class SpanContributors { - private final Map> spanContributors; - private final List operations = new ArrayList<>(); - - public SpanContributors() { - this.spanContributors = new LinkedHashMap<>(); - } - - public void newSpan(SpanId span) { - if (this.spanContributors.containsKey(span)) throw new IllegalStateException("New span clashes with existing span"); - this.spanContributors.put(span, new LinkedHashSet<>()); - operations.add("newSpan " + span); - } - - public void removeContributor(SpanId span, TaskId taskId) { - removeContributor(span, TaskOrSpanId.of(taskId)); - } - - public void removeContributor(SpanId span, SpanId spanId) { - removeContributor(span, TaskOrSpanId.of(spanId)); - } - - private void removeContributor(SpanId span, TaskOrSpanId contributor) { - if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span"); - if (!this.spanContributors.get(span).contains(contributor)) throw new IllegalArgumentException("Span did not have this contributor"); - this.spanContributors.get(span).remove(contributor); - operations.add("removeContributor " + span + " " + contributor); - } - - public boolean spanHasContributors(SpanId span) { - if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span"); - return !spanContributors.get(span).isEmpty(); - } - - public void removeSpan(SpanId span) { - if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span"); - if (!this.spanContributors.get(span).isEmpty()) throw new IllegalStateException("Cannot remove span that still has contributors"); - - this.spanContributors.remove(span); - operations.add("removeSpan " + span); - } - - public void checkIntegrity(final Map spanContributorsCount) { - Set contributors = new LinkedHashSet<>(); - for (final var entry : this.spanContributors.entrySet()) { - if (entry.getValue().isEmpty()) throw new IllegalStateException("Empty contributor not removed"); - for (final var contributor : entry.getValue()) { - if (contributors.contains(contributor)) throw new IllegalStateException("Contributor appears twice"); - contributors.add(contributor); - if (!(contributor instanceof TaskOrSpanId.Span c)) continue; - if (!this.spanContributors.containsKey(c.id())) { - throw new IllegalStateException("Orphaned contributor"); - } - } - } - - for (final var entry : spanContributorsCount.entrySet()) { - if (!this.spanContributors.containsKey(entry.getKey())) throw new IllegalArgumentException("Mismatch!"); - if (!entry.getValue().getValue().equals(this.spanContributors.get(entry.getKey()).size())) { - throw new IllegalArgumentException("Mismatch! " + entry.getValue().getValue() + " != " + this.spanContributors.get(entry.getKey()).size()); - } - } - - for (final var key : this.spanContributors.keySet()) { - if (!spanContributorsCount.containsKey(key)) throw new IllegalArgumentException("Mismatch!"); - } - - operations.clear(); - } - - public void addContributor(SpanId span, TaskId contributor) { - addContributor(span, TaskOrSpanId.of(contributor)); - } - - public void addContributor(SpanId span, SpanId contributor) { - addContributor(span, TaskOrSpanId.of(contributor)); - } - - private void addContributor(SpanId span, TaskOrSpanId contributor) { - if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span"); - this.spanContributors.get(span).add(contributor); - operations.add("addContributor " + span + " " + contributor); - } - } } From 34eb4e24dfc7e4b76f2b0c62a551aa14f850bbe4 Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Thu, 28 Mar 2024 09:48:56 -0700 Subject: [PATCH 23/26] Use switch for TaskStatus matching --- .../driver/engine/SimulationEngine.java | 85 ++++++++++--------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index a436dda8f5..d46eacfca0 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -198,52 +198,55 @@ private void stepEffectModel( // TODO: Report which cells this activity read from at this point in time. This is useful insight for any user. // Based on the task's return status, update its execution state and schedule its resumption. - if (status instanceof TaskStatus.Completed) { - // Propagate completion up the span hierarchy. - // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. - var span = scheduler.span; - while (true) { - if (this.spanContributorCount.get(span).decrementAndGet() > 0) break; - this.spanContributorCount.remove(span); + switch (status) { + case TaskStatus.Completed s -> { + // Propagate completion up the span hierarchy. + // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. + var span = scheduler.span; + while (true) { + if (this.spanContributorCount.get(span).decrementAndGet() > 0) break; + this.spanContributorCount.remove(span); - this.spans.compute(span, (_id, $) -> $.close(currentTime)); + this.spans.compute(span, (_id, $) -> $.close(currentTime)); - final var span$ = this.spans.get(span).parent; - if (span$.isEmpty()) break; + final var span$ = this.spans.get(span).parent; + if (span$.isEmpty()) break; - span = span$.get(); - } + span = span$.get(); + } - // Notify any blocked caller of our completion. - progress.caller().ifPresent($ -> { - if (this.blockedTasks.get($).decrementAndGet() == 0) { - this.blockedTasks.remove($); - this.scheduledJobs.schedule(JobId.forTask($), SubInstant.Tasks.at(currentTime)); + // Notify any blocked caller of our completion. + progress.caller().ifPresent($ -> { + if (this.blockedTasks.get($).decrementAndGet() == 0) { + this.blockedTasks.remove($); + this.scheduledJobs.schedule(JobId.forTask($), SubInstant.Tasks.at(currentTime)); + } + }); } - }); - } else if (status instanceof TaskStatus.Delayed s) { - if (s.delay().isNegative()) throw new IllegalArgumentException("Cannot schedule a task in the past"); - - this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); - this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); - } else if (status instanceof TaskStatus.CallingTask s) { - final var target = TaskId.generate(); - SimulationEngine.this.spanContributorCount.get(scheduler.span).increment(); - SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor))); - SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); - frame.signal(JobId.forTask(target)); - - this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); - } else if (status instanceof TaskStatus.AwaitingCondition s) { - final var condition = ConditionId.generate(); - this.conditions.put(condition, s.condition()); - this.scheduledJobs.schedule(JobId.forCondition(condition), SubInstant.Conditions.at(currentTime)); - - this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); - this.waitingTasks.put(condition, task); - } else { - throw new IllegalArgumentException("Unknown subclass of %s: %s".formatted(TaskStatus.class, status)); - } + case TaskStatus.Delayed s -> { + if (s.delay().isNegative()) throw new IllegalArgumentException("Cannot schedule a task in the past"); + + this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); + this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); + } + case TaskStatus.CallingTask s -> { + final var target = TaskId.generate(); + SimulationEngine.this.spanContributorCount.get(scheduler.span).increment(); + SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor))); + SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); + frame.signal(JobId.forTask(target)); + + this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); + } + case TaskStatus.AwaitingCondition s -> { + final var condition = ConditionId.generate(); + this.conditions.put(condition, s.condition()); + this.scheduledJobs.schedule(JobId.forCondition(condition), SubInstant.Conditions.at(currentTime)); + + this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); + this.waitingTasks.put(condition, task); + } + } } /** Determine when a condition is next true, and schedule a signal to be raised at that time. */ From b0f880933b9b01561ff053c5f0e1f1e957408575 Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Thu, 28 Mar 2024 09:57:55 -0700 Subject: [PATCH 24/26] Use the term Output instead of Return for task output generic --- .../merlin/driver/engine/SimulationEngine.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index d46eacfca0..e3df17dd47 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -80,7 +80,7 @@ public final class SimulationEngine implements AutoCloseable { private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); /** Schedule a new task to be performed at the given time. */ - public SpanId scheduleTask(final Duration startTime, final TaskFactory state) { + public SpanId scheduleTask(final Duration startTime, final TaskFactory state) { if (startTime.isNegative()) throw new IllegalArgumentException("Cannot schedule a task before the start time of the simulation"); final var span = SpanId.generate(); @@ -184,9 +184,9 @@ public void stepTask(final TaskId task, final TaskFrame frame, final Dura } /** Make progress in a task by stepping its associated effect model forward. */ - private void stepEffectModel( + private void stepEffectModel( final TaskId task, - final ExecutionState progress, + final ExecutionState progress, final TaskFrame frame, final Duration currentTime ) { @@ -199,7 +199,7 @@ private void stepEffectModel( // Based on the task's return status, update its execution state and schedule its resumption. switch (status) { - case TaskStatus.Completed s -> { + case TaskStatus.Completed s -> { // Propagate completion up the span hierarchy. // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. var span = scheduler.span; @@ -223,13 +223,13 @@ private void stepEffectModel( } }); } - case TaskStatus.Delayed s -> { + case TaskStatus.Delayed s -> { if (s.delay().isNegative()) throw new IllegalArgumentException("Cannot schedule a task in the past"); this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); } - case TaskStatus.CallingTask s -> { + case TaskStatus.CallingTask s -> { final var target = TaskId.generate(); SimulationEngine.this.spanContributorCount.get(scheduler.span).increment(); SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor))); @@ -238,7 +238,7 @@ private void stepEffectModel( this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); } - case TaskStatus.AwaitingCondition s -> { + case TaskStatus.AwaitingCondition s -> { final var condition = ConditionId.generate(); this.conditions.put(condition, s.condition()); this.scheduledJobs.schedule(JobId.forCondition(condition), SubInstant.Conditions.at(currentTime)); @@ -765,8 +765,8 @@ static ConditionJobId forCondition(final ConditionId condition) { } /** The state of an executing task. */ - private record ExecutionState(SpanId span, int shadowedSpans, Optional caller, Task state) { - public ExecutionState continueWith(final SpanId span, final int shadowedSpans, final Task newState) { + private record ExecutionState(SpanId span, int shadowedSpans, Optional caller, Task state) { + public ExecutionState continueWith(final SpanId span, final int shadowedSpans, final Task newState) { return new ExecutionState<>(span, shadowedSpans, this.caller, newState); } } From aae5f1acded4f1ce8335501058a944764d49d086 Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Thu, 28 Mar 2024 12:19:26 -0700 Subject: [PATCH 25/26] Include full stack trace in test failures --- constraints/build.gradle | 3 +++ contrib/build.gradle | 3 +++ db-tests/build.gradle | 3 +++ e2e-tests/build.gradle | 4 ++++ examples/banananation/build.gradle | 3 +++ examples/config-with-defaults/build.gradle | 3 +++ examples/config-without-defaults/build.gradle | 3 +++ examples/foo-missionmodel/build.gradle | 3 +++ examples/minimal-mission-model/build.gradle | 3 +++ examples/streamline-demo/build.gradle | 3 +++ merlin-driver/build.gradle | 3 +++ merlin-framework/build.gradle | 3 +++ merlin-sdk/build.gradle | 3 +++ merlin-server/build.gradle | 4 ++++ parsing-utilities/build.gradle | 3 +++ scheduler-driver/build.gradle | 3 +++ 16 files changed, 50 insertions(+) diff --git a/constraints/build.gradle b/constraints/build.gradle index 8a219a80a4..5daa876e15 100644 --- a/constraints/build.gradle +++ b/constraints/build.gradle @@ -12,6 +12,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/contrib/build.gradle b/contrib/build.gradle index ebb6146fb6..673231ed0f 100644 --- a/contrib/build.gradle +++ b/contrib/build.gradle @@ -15,6 +15,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/db-tests/build.gradle b/db-tests/build.gradle index 24d3960f58..c4420085f9 100644 --- a/db-tests/build.gradle +++ b/db-tests/build.gradle @@ -18,6 +18,9 @@ jacocoTestReport { task e2eTest(type: Test) { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } dependencies { diff --git a/e2e-tests/build.gradle b/e2e-tests/build.gradle index 441935ae5d..68e0a9392e 100644 --- a/e2e-tests/build.gradle +++ b/e2e-tests/build.gradle @@ -43,6 +43,10 @@ task e2eTest(type: Test) { // Run the tests in parallel to improve performance maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 useJUnitPlatform() + + testLogging { + exceptionFormat = 'full' + } } dependencies { diff --git a/examples/banananation/build.gradle b/examples/banananation/build.gradle index 402c81ca9e..3637a0ecd4 100644 --- a/examples/banananation/build.gradle +++ b/examples/banananation/build.gradle @@ -12,6 +12,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/config-with-defaults/build.gradle b/examples/config-with-defaults/build.gradle index 2fa0e7e215..1e745a759f 100644 --- a/examples/config-with-defaults/build.gradle +++ b/examples/config-with-defaults/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/config-without-defaults/build.gradle b/examples/config-without-defaults/build.gradle index 2fa0e7e215..1e745a759f 100644 --- a/examples/config-without-defaults/build.gradle +++ b/examples/config-without-defaults/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/foo-missionmodel/build.gradle b/examples/foo-missionmodel/build.gradle index 2fa0e7e215..1e745a759f 100644 --- a/examples/foo-missionmodel/build.gradle +++ b/examples/foo-missionmodel/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/minimal-mission-model/build.gradle b/examples/minimal-mission-model/build.gradle index 2fa0e7e215..1e745a759f 100644 --- a/examples/minimal-mission-model/build.gradle +++ b/examples/minimal-mission-model/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/streamline-demo/build.gradle b/examples/streamline-demo/build.gradle index b0679446c5..48afd9f8c3 100644 --- a/examples/streamline-demo/build.gradle +++ b/examples/streamline-demo/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/merlin-driver/build.gradle b/merlin-driver/build.gradle index 76cbca76bb..acc07bd493 100644 --- a/merlin-driver/build.gradle +++ b/merlin-driver/build.gradle @@ -17,6 +17,9 @@ test { useJUnitPlatform { includeEngines 'jqwik', 'junit-jupiter' } + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/merlin-framework/build.gradle b/merlin-framework/build.gradle index a52dde0379..66a07a16c6 100644 --- a/merlin-framework/build.gradle +++ b/merlin-framework/build.gradle @@ -15,6 +15,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/merlin-sdk/build.gradle b/merlin-sdk/build.gradle index 908b26fad7..15cd7aa7bc 100644 --- a/merlin-sdk/build.gradle +++ b/merlin-sdk/build.gradle @@ -15,6 +15,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/merlin-server/build.gradle b/merlin-server/build.gradle index 7e4557f5b6..d13902ecff 100644 --- a/merlin-server/build.gradle +++ b/merlin-server/build.gradle @@ -60,6 +60,10 @@ test { environment "CONSTRAINTS_DSL_COMPILER_ROOT", projectDir.toPath().resolve('constraints-dsl-compiler') environment "CONSTRAINTS_DSL_COMPILER_COMMAND", './build/main.js' + + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/parsing-utilities/build.gradle b/parsing-utilities/build.gradle index 3eaa311b8b..cb2790ca01 100644 --- a/parsing-utilities/build.gradle +++ b/parsing-utilities/build.gradle @@ -12,6 +12,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/scheduler-driver/build.gradle b/scheduler-driver/build.gradle index fef5cca0f9..77686981ed 100644 --- a/scheduler-driver/build.gradle +++ b/scheduler-driver/build.gradle @@ -12,6 +12,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { From e5ae0f5d887717c623203d84b35de7f2343412a8 Mon Sep 17 00:00:00 2001 From: Matthew Dailis Date: Thu, 28 Mar 2024 12:25:02 -0700 Subject: [PATCH 26/26] Use unique plan name in Scheduling e2e test --- .../src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java index 2955842836..ada11145be 100644 --- a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java +++ b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java @@ -736,7 +736,7 @@ void beforeEach() throws IOException, InterruptedException { // Insert the Plan fooPlan = hasura.createPlan( fooId, - "Foo Plan - Simulation Tests", + "Foo Plan - Scheduling Tests", "720:00:00", planStartTimestamp);