diff --git a/constraints/build.gradle b/constraints/build.gradle index 8a219a80a4..5daa876e15 100644 --- a/constraints/build.gradle +++ b/constraints/build.gradle @@ -12,6 +12,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/contrib/build.gradle b/contrib/build.gradle index ebb6146fb6..673231ed0f 100644 --- a/contrib/build.gradle +++ b/contrib/build.gradle @@ -15,6 +15,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/db-tests/build.gradle b/db-tests/build.gradle index 24d3960f58..c4420085f9 100644 --- a/db-tests/build.gradle +++ b/db-tests/build.gradle @@ -18,6 +18,9 @@ jacocoTestReport { task e2eTest(type: Test) { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } dependencies { diff --git a/e2e-tests/build.gradle b/e2e-tests/build.gradle index 441935ae5d..68e0a9392e 100644 --- a/e2e-tests/build.gradle +++ b/e2e-tests/build.gradle @@ -43,6 +43,10 @@ task e2eTest(type: Test) { // Run the tests in parallel to improve performance maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 useJUnitPlatform() + + testLogging { + exceptionFormat = 'full' + } } dependencies { diff --git a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java index 2955842836..ada11145be 100644 --- a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java +++ b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java @@ -736,7 +736,7 @@ void beforeEach() throws IOException, InterruptedException { // Insert the Plan fooPlan = hasura.createPlan( fooId, - "Foo Plan - Simulation Tests", + "Foo Plan - Scheduling Tests", "720:00:00", planStartTimestamp); diff --git a/examples/banananation/build.gradle b/examples/banananation/build.gradle index 402c81ca9e..3637a0ecd4 100644 --- a/examples/banananation/build.gradle +++ b/examples/banananation/build.gradle @@ -12,6 +12,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/config-with-defaults/build.gradle b/examples/config-with-defaults/build.gradle index 2fa0e7e215..1e745a759f 100644 --- a/examples/config-with-defaults/build.gradle +++ b/examples/config-with-defaults/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/config-without-defaults/build.gradle b/examples/config-without-defaults/build.gradle index 2fa0e7e215..1e745a759f 100644 --- a/examples/config-without-defaults/build.gradle +++ b/examples/config-without-defaults/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/foo-missionmodel/build.gradle b/examples/foo-missionmodel/build.gradle index 2fa0e7e215..1e745a759f 100644 --- a/examples/foo-missionmodel/build.gradle +++ b/examples/foo-missionmodel/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/minimal-mission-model/build.gradle b/examples/minimal-mission-model/build.gradle index 2fa0e7e215..1e745a759f 100644 --- a/examples/minimal-mission-model/build.gradle +++ b/examples/minimal-mission-model/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/examples/streamline-demo/build.gradle b/examples/streamline-demo/build.gradle index b0679446c5..48afd9f8c3 100644 --- a/examples/streamline-demo/build.gradle +++ b/examples/streamline-demo/build.gradle @@ -19,6 +19,9 @@ jar { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/merlin-driver/build.gradle b/merlin-driver/build.gradle index 76cbca76bb..acc07bd493 100644 --- a/merlin-driver/build.gradle +++ b/merlin-driver/build.gradle @@ -17,6 +17,9 @@ test { useJUnitPlatform { includeEngines 'jqwik', 'junit-jupiter' } + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java index 9608639f2d..bf11a8bac4 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java @@ -165,11 +165,11 @@ void simulateTask(final MissionModel missionModel, final TaskFactory TaskFactory makeTaskFactory( { // Emit the current activity (defined by directiveId) return executor -> scheduler0 -> TaskStatus.calling((TaskFactory) (executor1 -> scheduler1 -> { + scheduler1.pushSpan(); scheduler1.emit(directiveId, activityTopic); return task.create(executor1).step(scheduler1); }), scheduler2 -> { + scheduler2.popSpan(); // When the current activity finishes, get the list of the activities that needed this activity to finish to know their start time final List> dependents = resolved.get(directiveId) == null ? List.of() : resolved.get(directiveId); // Iterate over the dependents diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SignalId.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SignalId.java deleted file mode 100644 index da6057ea63..0000000000 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SignalId.java +++ /dev/null @@ -1,18 +0,0 @@ -package gov.nasa.jpl.aerie.merlin.driver.engine; - -/** A typed wrapper for signal IDs. */ -public sealed interface SignalId { - /** A signal controlled by a task. */ - record TaskSignalId(TaskId id) implements SignalId {} - - /** A signal controlled by a condition. */ - record ConditionSignalId(ConditionId id) implements SignalId {} - - static TaskSignalId forTask(final TaskId task) { - return new TaskSignalId(task); - } - - static ConditionSignalId forCondition(final ConditionId condition) { - return new ConditionSignalId(condition); - } -} diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index efbddb54f2..e3df17dd47 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -25,6 +25,7 @@ import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue; import gov.nasa.jpl.aerie.merlin.protocol.types.TaskStatus; import gov.nasa.jpl.aerie.merlin.protocol.types.ValueSchema; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; @@ -35,6 +36,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -52,8 +55,10 @@ public final class SimulationEngine implements AutoCloseable { /** The set of all jobs waiting for time to pass. */ private final JobSchedule scheduledJobs = new JobSchedule<>(); - /** The set of all jobs waiting on a given signal. */ - private final Subscriptions waitingTasks = new Subscriptions<>(); + /** The set of all jobs waiting on a condition. */ + private final Map waitingTasks = new HashMap<>(); + /** The set of all tasks blocked on some number of subtasks. */ + private final Map blockedTasks = new HashMap<>(); /** The set of conditions depending on a given set of topics. */ private final Subscriptions, ConditionId> waitingConditions = new Subscriptions<>(); /** The set of queries depending on a given set of topics. */ @@ -66,23 +71,27 @@ public final class SimulationEngine implements AutoCloseable { /** The profiling state for each tracked resource. */ private final Map> resources = new HashMap<>(); - /** The task that spawned a given task (if any). */ - private final Map taskParent = new HashMap<>(); - /** The set of children for each task (if any). */ - @DerivedFrom("taskParent") - private final Map> taskChildren = new HashMap<>(); + /** The set of all spans of work contributed to by modeled tasks. */ + private final Map spans = new HashMap<>(); + /** A count of the direct contributors to each span, including child spans and tasks. */ + private final Map spanContributorCount = new HashMap<>(); /** A thread pool that modeled tasks can use to keep track of their state between steps. */ private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); /** Schedule a new task to be performed at the given time. */ - public TaskId scheduleTask(final Duration startTime, final TaskFactory state) { + public SpanId scheduleTask(final Duration startTime, final TaskFactory state) { if (startTime.isNegative()) throw new IllegalArgumentException("Cannot schedule a task before the start time of the simulation"); + final var span = SpanId.generate(); + this.spans.put(span, new Span(Optional.empty(), startTime, Optional.empty())); + final var task = TaskId.generate(); - this.tasks.put(task, new ExecutionState.InProgress<>(startTime, state.create(this.executor))); + this.spanContributorCount.put(span, new MutableInt(1)); + this.tasks.put(task, new ExecutionState<>(span, 0, Optional.empty(), state.create(this.executor))); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime)); - return task; + + return span; } /** Register a resource whose profile should be accumulated over time. */ @@ -105,7 +114,7 @@ public void invalidateTopic(final Topic topic, final Duration invalidationTim for (final var condition : conditions) { // If we were going to signal tasks on this condition, well, don't do that. // Schedule the condition to be rechecked ASAP. - this.scheduledJobs.unschedule(JobId.forSignal(SignalId.forCondition(condition))); + this.scheduledJobs.unschedule(JobId.forSignal(condition)); this.scheduledJobs.schedule(JobId.forCondition(condition), SubInstant.Conditions.at(invalidationTime)); } } @@ -119,8 +128,7 @@ public JobSchedule.Batch extractNextJobs(final Duration maximumTime) { // that the condition depends on, in which case we might accidentally schedule an update for a condition // that no longer exists. for (final var job : batch.jobs()) { - if (!(job instanceof JobId.SignalJobId j)) continue; - if (!(j.id() instanceof SignalId.ConditionSignalId s)) continue; + if (!(job instanceof JobId.SignalJobId s)) continue; this.conditions.remove(s.id()); this.waitingConditions.unsubscribeQuery(s.id()); @@ -156,7 +164,7 @@ public void performJob( if (job instanceof JobId.TaskJobId j) { this.stepTask(j.id(), frame, currentTime); } else if (job instanceof JobId.SignalJobId j) { - this.stepSignalledTasks(j.id(), frame); + this.stepTask(this.waitingTasks.remove(j.id()), frame, currentTime); } else if (job instanceof JobId.ConditionJobId j) { this.updateCondition(j.id(), frame, currentTime, maximumTime); } else if (job instanceof JobId.ResourceJobId j) { @@ -168,108 +176,77 @@ public void performJob( /** Perform the next step of a modeled task. */ public void stepTask(final TaskId task, final TaskFrame frame, final Duration currentTime) { - // The handler for each individual task stage is responsible - // for putting an updated lifecycle back into the task set. - var lifecycle = this.tasks.remove(task); + // The handler for the next status of the task is responsible + // for putting an updated state back into the task set. + var state = this.tasks.remove(task); - stepTaskHelper(task, frame, currentTime, lifecycle); - } - - private void stepTaskHelper( - final TaskId task, - final TaskFrame frame, - final Duration currentTime, - final ExecutionState lifecycle) - { - // Extract the current modeling state. - if (lifecycle instanceof ExecutionState.InProgress e) { - stepEffectModel(task, e, frame, currentTime); - } else if (lifecycle instanceof ExecutionState.AwaitingChildren e) { - stepWaitingTask(task, e, frame, currentTime); - } else { - // TODO: Log this issue to somewhere more general than stderr. - System.err.println("Task %s is ready but in unexpected execution state %s".formatted(task, lifecycle)); - } + stepEffectModel(task, state, frame, currentTime); } /** Make progress in a task by stepping its associated effect model forward. */ - private void stepEffectModel( + private void stepEffectModel( final TaskId task, - final ExecutionState.InProgress progress, + final ExecutionState progress, final TaskFrame frame, final Duration currentTime ) { // Step the modeling state forward. - final var scheduler = new EngineScheduler(currentTime, task, frame); + final var scheduler = new EngineScheduler(currentTime, progress.shadowedSpans(), progress.span(), progress.caller(), frame); final var status = progress.state().step(scheduler); // TODO: Report which topics this activity wrote to at this point in time. This is useful insight for any user. // TODO: Report which cells this activity read from at this point in time. This is useful insight for any user. // Based on the task's return status, update its execution state and schedule its resumption. - if (status instanceof TaskStatus.Completed) { - final var children = new LinkedList<>(this.taskChildren.getOrDefault(task, Collections.emptySet())); - - this.tasks.put(task, progress.completedAt(currentTime, children)); - this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime)); - } else if (status instanceof TaskStatus.Delayed s) { - if (s.delay().isNegative()) throw new IllegalArgumentException("Cannot schedule a task in the past"); - - this.tasks.put(task, progress.continueWith(s.continuation())); - this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); - } else if (status instanceof TaskStatus.CallingTask s) { - final var target = TaskId.generate(); - SimulationEngine.this.tasks.put(target, new ExecutionState.InProgress<>(currentTime, s.child().create(this.executor))); - SimulationEngine.this.taskParent.put(target, task); - SimulationEngine.this.taskChildren.computeIfAbsent(task, $ -> new HashSet<>()).add(target); - frame.signal(JobId.forTask(target)); - - this.tasks.put(task, progress.continueWith(s.continuation())); - this.waitingTasks.subscribeQuery(task, Set.of(SignalId.forTask(target))); - } else if (status instanceof TaskStatus.AwaitingCondition s) { - final var condition = ConditionId.generate(); - this.conditions.put(condition, s.condition()); - this.scheduledJobs.schedule(JobId.forCondition(condition), SubInstant.Conditions.at(currentTime)); - - this.tasks.put(task, progress.continueWith(s.continuation())); - this.waitingTasks.subscribeQuery(task, Set.of(SignalId.forCondition(condition))); - } else { - throw new IllegalArgumentException("Unknown subclass of %s: %s".formatted(TaskStatus.class, status)); - } - } + switch (status) { + case TaskStatus.Completed s -> { + // Propagate completion up the span hierarchy. + // TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span. + var span = scheduler.span; + while (true) { + if (this.spanContributorCount.get(span).decrementAndGet() > 0) break; + this.spanContributorCount.remove(span); - /** Make progress in a task by checking if all of the tasks it's waiting on have completed. */ - private void stepWaitingTask( - final TaskId task, - final ExecutionState.AwaitingChildren awaiting, - final TaskFrame frame, - final Duration currentTime - ) { - // TERMINATION: We break when there are no remaining children, - // and we always remove one if we don't break for other reasons. - while (true) { - if (awaiting.remainingChildren().isEmpty()) { - this.tasks.put(task, awaiting.joinedAt(currentTime)); - frame.signal(JobId.forSignal(SignalId.forTask(task))); - break; - } + this.spans.compute(span, (_id, $) -> $.close(currentTime)); - final var nextChild = awaiting.remainingChildren().getFirst(); - if (!(this.tasks.get(nextChild) instanceof ExecutionState.Terminated)) { - this.tasks.put(task, awaiting); - this.waitingTasks.subscribeQuery(task, Set.of(SignalId.forTask(nextChild))); - break; - } + final var span$ = this.spans.get(span).parent; + if (span$.isEmpty()) break; - // This child is complete, so skip checking it next time; move to the next one. - awaiting.remainingChildren().removeFirst(); - } - } + span = span$.get(); + } + + // Notify any blocked caller of our completion. + progress.caller().ifPresent($ -> { + if (this.blockedTasks.get($).decrementAndGet() == 0) { + this.blockedTasks.remove($); + this.scheduledJobs.schedule(JobId.forTask($), SubInstant.Tasks.at(currentTime)); + } + }); + } + case TaskStatus.Delayed s -> { + if (s.delay().isNegative()) throw new IllegalArgumentException("Cannot schedule a task in the past"); + + this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); + this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); + } + case TaskStatus.CallingTask s -> { + final var target = TaskId.generate(); + SimulationEngine.this.spanContributorCount.get(scheduler.span).increment(); + SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor))); + SimulationEngine.this.blockedTasks.put(task, new MutableInt(1)); + frame.signal(JobId.forTask(target)); + + this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); + } + case TaskStatus.AwaitingCondition s -> { + final var condition = ConditionId.generate(); + this.conditions.put(condition, s.condition()); + this.scheduledJobs.schedule(JobId.forCondition(condition), SubInstant.Conditions.at(currentTime)); - /** Cause any tasks waiting on the given signal to be resumed concurrently with other jobs in the current frame. */ - public void stepSignalledTasks(final SignalId signal, final TaskFrame frame) { - final var tasks = this.waitingTasks.invalidateTopic(signal); - for (final var task : tasks) frame.signal(JobId.forTask(task)); + this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation())); + this.waitingTasks.put(condition, task); + } + } } /** Determine when a condition is next true, and schedule a signal to be raised at that time. */ @@ -289,7 +266,7 @@ public void updateCondition( final var expiry = querier.expiry.map(currentTime::plus); if (prediction.isPresent() && (expiry.isEmpty() || prediction.get().shorterThan(expiry.get()))) { - this.scheduledJobs.schedule(JobId.forSignal(SignalId.forCondition(condition)), SubInstant.Tasks.at(prediction.get())); + this.scheduledJobs.schedule(JobId.forSignal(condition), SubInstant.Tasks.at(prediction.get())); } else { // Try checking again later -- where "later" is in some non-zero amount of time! final var nextCheckTime = Duration.max(expiry.orElse(horizonTime), currentTime.plus(Duration.EPSILON)); @@ -318,85 +295,91 @@ public void updateResource( @Override public void close() { for (final var task : this.tasks.values()) { - if (task instanceof ExecutionState.InProgress r) { - r.state.release(); - } + task.state().release(); } this.executor.shutdownNow(); } - /** Determine if a given task has fully completed. */ - public boolean isTaskComplete(final TaskId task) { - return (this.tasks.get(task) instanceof ExecutionState.Terminated); - } - - private record TaskInfo( - Map taskToPlannedDirective, - Map input, - Map output + private record SpanInfo( + Map spanToPlannedDirective, + Map input, + Map output ) { - public TaskInfo() { + public SpanInfo() { this(new HashMap<>(), new HashMap<>(), new HashMap<>()); } - public boolean isActivity(final TaskId id) { - return this.input.containsKey(id.id()); + public boolean isActivity(final SpanId id) { + return this.input.containsKey(id); + } + + public boolean isDirective(SpanId id) { + return this.spanToPlannedDirective.containsKey(id); + } + + public ActivityDirectiveId getDirective(SpanId id) { + return this.spanToPlannedDirective.get(id); } - public record Trait(Iterable> topics, Topic activityTopic) implements EffectTrait> { + public record Trait(Iterable> topics, Topic activityTopic) implements EffectTrait> { @Override - public Consumer empty() { - return taskInfo -> {}; + public Consumer empty() { + return spanInfo -> {}; } @Override - public Consumer sequentially(final Consumer prefix, final Consumer suffix) { - return taskInfo -> { prefix.accept(taskInfo); suffix.accept(taskInfo); }; + public Consumer sequentially(final Consumer prefix, final Consumer suffix) { + return spanInfo -> { prefix.accept(spanInfo); suffix.accept(spanInfo); }; } @Override - public Consumer concurrently(final Consumer left, final Consumer right) { - // SAFETY: For each task, `left` commutes with `right`, because no task runs concurrently with itself. - return taskInfo -> { left.accept(taskInfo); right.accept(taskInfo); }; + public Consumer concurrently(final Consumer left, final Consumer right) { + // SAFETY: `left` and `right` should commute. HOWEVER, if a span happens to directly contain two activities + // -- that is, two activities both contribute events under the same span's provenance -- then this + // does not actually commute. + // Arguably, this is a model-specific analysis anyway, since we're looking for specific events + // and inferring model structure from them, and at this time we're only working with models + // for which every activity has a span to itself. + return spanInfo -> { left.accept(spanInfo); right.accept(spanInfo); }; } - public Consumer atom(final Event ev) { - return taskInfo -> { + public Consumer atom(final Event ev) { + return spanInfo -> { // Identify activities. ev.extract(this.activityTopic) - .ifPresent(directiveId -> taskInfo.taskToPlannedDirective.put(ev.provenance().id(), directiveId)); + .ifPresent(directiveId -> spanInfo.spanToPlannedDirective.put(ev.provenance(), directiveId)); for (final var topic : this.topics) { // Identify activity inputs. - extractInput(topic, ev, taskInfo); + extractInput(topic, ev, spanInfo); // Identify activity outputs. - extractOutput(topic, ev, taskInfo); + extractOutput(topic, ev, spanInfo); } }; } private static - void extractInput(final SerializableTopic topic, final Event ev, final TaskInfo taskInfo) { + void extractInput(final SerializableTopic topic, final Event ev, final SpanInfo spanInfo) { if (!topic.name().startsWith("ActivityType.Input.")) return; ev.extract(topic.topic()).ifPresent(input -> { final var activityType = topic.name().substring("ActivityType.Input.".length()); - taskInfo.input.put( - ev.provenance().id(), + spanInfo.input.put( + ev.provenance(), new SerializedActivity(activityType, topic.outputType().serialize(input).asMap().orElseThrow())); }); } private static - void extractOutput(final SerializableTopic topic, final Event ev, final TaskInfo taskInfo) { + void extractOutput(final SerializableTopic topic, final Event ev, final SpanInfo spanInfo) { if (!topic.name().startsWith("ActivityType.Output.")) return; ev.extract(topic.topic()).ifPresent(output -> { - taskInfo.output.put( - ev.provenance().id(), + spanInfo.output.put( + ev.provenance(), topic.outputType().serialize(output)); }); } @@ -418,14 +401,14 @@ public static SimulationResults computeResults( final TemporalEventSource timeline, final Iterable> serializableTopics ) { - // Collect per-task information from the event graph. - final var taskInfo = new TaskInfo(); + // Collect per-span information from the event graph. + final var spanInfo = new SpanInfo(); for (final var point : timeline) { if (!(point instanceof TemporalEventSource.TimePoint.Commit p)) continue; - final var trait = new TaskInfo.Trait(serializableTopics, activityTopic); - p.events().evaluate(trait, trait::atom).accept(taskInfo); + final var trait = new SpanInfo.Trait(serializableTopics, activityTopic); + p.events().evaluate(trait, trait::atom).accept(spanInfo); } // Extract profiles for every resource. @@ -458,87 +441,79 @@ public static SimulationResults computeResults( } } + // Identify the nearest ancestor *activity* (excluding intermediate anonymous tasks). + final var activityParents = new HashMap(); + final var activityDirectiveIds = new HashMap(); + engine.spans.forEach((span, state) -> { + if (!spanInfo.isActivity(span)) return; + + var parent = state.parent(); + while (parent.isPresent() && !spanInfo.isActivity(parent.get()) && !spanInfo.isDirective(parent.get())) { + parent = engine.spans.get(parent.get()).parent(); + } + + if (parent.isPresent()) { + if (spanInfo.isActivity(parent.get())) { + activityParents.put(span, parent.get()); + } else if (spanInfo.isDirective(parent.get())) { + activityDirectiveIds.put(span, spanInfo.getDirective(parent.get())); + } + } + }); + + final var activityChildren = new HashMap>(); + activityParents.forEach((activity, parent) -> { + activityChildren.computeIfAbsent(parent, $ -> new LinkedList<>()).add(activity); + }); // Give every task corresponding to a child activity an ID that doesn't conflict with any root activity. - final var taskToSimulatedActivityId = new HashMap(taskInfo.taskToPlannedDirective.size()); + final var spanToSimulatedActivityId = new HashMap(activityDirectiveIds.size()); final var usedSimulatedActivityIds = new HashSet<>(); - for (final var entry : taskInfo.taskToPlannedDirective.entrySet()) { - taskToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); + for (final var entry : activityDirectiveIds.entrySet()) { + spanToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); usedSimulatedActivityIds.add(entry.getValue().id()); } long counter = 1L; - for (final var task : engine.tasks.keySet()) { - if (!taskInfo.isActivity(task)) continue; - if (taskToSimulatedActivityId.containsKey(task.id())) continue; + for (final var span : engine.spans.keySet()) { + if (!spanInfo.isActivity(span)) continue; + if (spanToSimulatedActivityId.containsKey(span)) continue; while (usedSimulatedActivityIds.contains(counter)) counter++; - taskToSimulatedActivityId.put(task.id(), new SimulatedActivityId(counter++)); + spanToSimulatedActivityId.put(span, new SimulatedActivityId(counter++)); } - // Identify the nearest ancestor *activity* (excluding intermediate anonymous tasks). - final var activityParents = new HashMap(); - engine.tasks.forEach((task, state) -> { - if (!taskInfo.isActivity(task)) return; - - var parent = engine.taskParent.get(task); - while (parent != null && !taskInfo.isActivity(parent)) { - parent = engine.taskParent.get(parent); - } - - if (parent != null) { - activityParents.put(taskToSimulatedActivityId.get(task.id()), taskToSimulatedActivityId.get(parent.id())); - } - }); - - final var activityChildren = new HashMap>(); - activityParents.forEach((task, parent) -> { - activityChildren.computeIfAbsent(parent, $ -> new LinkedList<>()).add(task); - }); - final var simulatedActivities = new HashMap(); final var unfinishedActivities = new HashMap(); - engine.tasks.forEach((task, state) -> { - if (!taskInfo.isActivity(task)) return; + engine.spans.forEach((span, state) -> { + if (!spanInfo.isActivity(span)) return; - final var activityId = taskToSimulatedActivityId.get(task.id()); - final var directiveId = taskInfo.taskToPlannedDirective.get(task.id()); // will be null for non-directives + final var activityId = spanToSimulatedActivityId.get(span); + final var directiveId = activityDirectiveIds.get(span); - if (state instanceof ExecutionState.Terminated e) { - final var inputAttributes = taskInfo.input().get(task.id()); - final var outputAttributes = taskInfo.output().get(task.id()); + if (state.endOffset().isPresent()) { + final var inputAttributes = spanInfo.input().get(span); + final var outputAttributes = spanInfo.output().get(span); simulatedActivities.put(activityId, new SimulatedActivity( inputAttributes.getTypeName(), inputAttributes.getArguments(), - startTime.plus(e.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), - e.joinOffset().minus(e.startOffset()), - activityParents.get(activityId), - activityChildren.getOrDefault(activityId, Collections.emptyList()), - (activityParents.containsKey(activityId)) ? Optional.empty() : Optional.of(directiveId), + startTime.plus(state.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), + state.endOffset().get().minus(state.startOffset()), + spanToSimulatedActivityId.get(activityParents.get(span)), + activityChildren.getOrDefault(span, Collections.emptyList()).stream().map(spanToSimulatedActivityId::get).toList(), + (activityParents.containsKey(span)) ? Optional.empty() : Optional.of(directiveId), outputAttributes )); - } else if (state instanceof ExecutionState.InProgress e){ - final var inputAttributes = taskInfo.input().get(task.id()); - unfinishedActivities.put(activityId, new UnfinishedActivity( - inputAttributes.getTypeName(), - inputAttributes.getArguments(), - startTime.plus(e.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), - activityParents.get(activityId), - activityChildren.getOrDefault(activityId, Collections.emptyList()), - (activityParents.containsKey(activityId)) ? Optional.empty() : Optional.of(directiveId) - )); - } else if (state instanceof ExecutionState.AwaitingChildren e){ - final var inputAttributes = taskInfo.input().get(task.id()); + } else { + final var inputAttributes = spanInfo.input().get(span); unfinishedActivities.put(activityId, new UnfinishedActivity( inputAttributes.getTypeName(), inputAttributes.getArguments(), - startTime.plus(e.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), - activityParents.get(activityId), - activityChildren.getOrDefault(activityId, Collections.emptyList()), - (activityParents.containsKey(activityId)) ? Optional.empty() : Optional.of(directiveId) + startTime.plus(state.startOffset().in(Duration.MICROSECONDS), ChronoUnit.MICROS), + spanToSimulatedActivityId.get(activityParents.get(span)), + activityChildren.getOrDefault(span, Collections.emptyList()).stream().map(spanToSimulatedActivityId::get).toList(), + (activityParents.containsKey(span)) ? Optional.empty() : Optional.of(directiveId) )); - } else { - throw new Error("Unexpected subtype of %s: %s".formatted(ExecutionState.class, state.getClass())); } }); @@ -585,12 +560,8 @@ public static SimulationResults computeResults( serializedTimeline); } - public Optional getTaskDuration(TaskId taskId){ - final var state = tasks.get(taskId); - if (state instanceof ExecutionState.Terminated e) { - return Optional.of(e.joinOffset().minus(e.startOffset())); - } - return Optional.empty(); + public Span getSpan(SpanId spanId) { + return this.spans.get(spanId); } @@ -680,12 +651,22 @@ private static Optional min(final Optional a, final Optional /** A handle for processing requests and effects from a modeled task. */ private final class EngineScheduler implements Scheduler { private final Duration currentTime; - private final TaskId activeTask; + private int shadowedSpans; + private SpanId span; + private final Optional caller; private final TaskFrame frame; - public EngineScheduler(final Duration currentTime, final TaskId activeTask, final TaskFrame frame) { + public EngineScheduler( + final Duration currentTime, + final int shadowedSpans, + final SpanId span, + final Optional caller, + final TaskFrame frame) + { this.currentTime = Objects.requireNonNull(currentTime); - this.activeTask = Objects.requireNonNull(activeTask); + this.shadowedSpans = shadowedSpans; + this.span = Objects.requireNonNull(span); + this.caller = Objects.requireNonNull(caller); this.frame = Objects.requireNonNull(frame); } @@ -704,7 +685,7 @@ public State get(final CellId token) { @Override public void emit(final EventType event, final Topic topic) { // Append this event to the timeline. - this.frame.emit(Event.create(topic, event, this.activeTask)); + this.frame.emit(Event.create(topic, event, this.span)); SimulationEngine.this.invalidateTopic(topic, this.currentTime); } @@ -712,11 +693,44 @@ public void emit(final EventType event, final Topic topic @Override public void spawn(final TaskFactory state) { final var task = TaskId.generate(); - SimulationEngine.this.tasks.put(task, new ExecutionState.InProgress<>(this.currentTime, state.create(SimulationEngine.this.executor))); - SimulationEngine.this.taskParent.put(task, this.activeTask); - SimulationEngine.this.taskChildren.computeIfAbsent(this.activeTask, $ -> new HashSet<>()).add(task); + SimulationEngine.this.spanContributorCount.get(this.span).increment(); + SimulationEngine.this.tasks.put(task, new ExecutionState<>(this.span, 0, this.caller, state.create(SimulationEngine.this.executor))); + this.caller.ifPresent($ -> SimulationEngine.this.blockedTasks.get($).increment()); this.frame.signal(JobId.forTask(task)); } + + @Override + public void pushSpan() { + final var parentSpan = this.span; + this.shadowedSpans += 1; + this.span = SpanId.generate(); + + SimulationEngine.this.spans.put(this.span, new Span(Optional.of(parentSpan), this.currentTime, Optional.empty())); + SimulationEngine.this.spanContributorCount.put(this.span, new MutableInt(1)); + } + + @Override + public void popSpan() { + // TODO: Do we want to throw an error instead? + if (this.shadowedSpans == 0) return; + final SpanId parentSpan = SimulationEngine.this.spans.get(this.span).parent().orElseThrow(); + + if (SimulationEngine.this.spanContributorCount.get(this.span).decrementAndGet() == 0) { + SimulationEngine.this.spanContributorCount.remove(this.span); + SimulationEngine.this.spans.compute(this.span, (_id, $) -> $.close(currentTime)); + // Parent span contributor count remains constant, because this.span is removed, and this task is added + } else { + // Parent span contributor count increases by one, because this task is added without removing this.span + SimulationEngine.this.spanContributorCount.get(parentSpan).increment(); + } + + // NOTE: We don't need to propagate completion any further, because the next shadowed span + // has by definition not been completed: this task may still contribute to it, and this task + // has not terminated. + + this.shadowedSpans -= 1; + this.span = parentSpan; + } } /** A representation of a job processable by the {@link SimulationEngine}. */ @@ -724,8 +738,8 @@ public sealed interface JobId { /** A job to step a task. */ record TaskJobId(TaskId id) implements JobId {} - /** A job to step all tasks waiting on a signal. */ - record SignalJobId(SignalId id) implements JobId {} + /** A job to resume a task blocked on a condition. */ + record SignalJobId(ConditionId id) implements JobId {} /** A job to query a resource. */ record ResourceJobId(ResourceId id) implements JobId {} @@ -737,7 +751,7 @@ static TaskJobId forTask(final TaskId task) { return new TaskJobId(task); } - static SignalJobId forSignal(final SignalId signal) { + static SignalJobId forSignal(final ConditionId signal) { return new SignalJobId(signal); } @@ -750,40 +764,27 @@ static ConditionJobId forCondition(final ConditionId condition) { } } - /** The lifecycle stages every task passes through. */ - private sealed interface ExecutionState { - /** The task is in its primary operational phase. */ - record InProgress(Duration startOffset, Task state) - implements ExecutionState - { - public AwaitingChildren completedAt( - final Duration endOffset, - final LinkedList remainingChildren) { - return new AwaitingChildren<>(this.startOffset, endOffset, remainingChildren); - } + /** The state of an executing task. */ + private record ExecutionState(SpanId span, int shadowedSpans, Optional caller, Task state) { + public ExecutionState continueWith(final SpanId span, final int shadowedSpans, final Task newState) { + return new ExecutionState<>(span, shadowedSpans, this.caller, newState); + } + } - public InProgress continueWith(final Task newState) { - return new InProgress<>(this.startOffset, newState); - } + /** The span of time over which a subtree of tasks has acted. */ + public record Span(Optional parent, Duration startOffset, Optional endOffset) { + /** Close out a span, marking it as inactive past the given time. */ + public Span close(final Duration endOffset) { + if (this.endOffset.isPresent()) throw new Error("Attempt to close an already-closed span"); + return new Span(this.parent, this.startOffset, Optional.of(endOffset)); } - /** The task has completed its primary operation, but has unfinished children. */ - record AwaitingChildren( - Duration startOffset, - Duration endOffset, - LinkedList remainingChildren - ) implements ExecutionState - { - public Terminated joinedAt(final Duration joinOffset) { - return new Terminated<>(this.startOffset, this.endOffset, joinOffset); - } + public Optional duration() { + return this.endOffset.map($ -> $.minus(this.startOffset)); } - /** The task and all its delegated children have completed. */ - record Terminated( - Duration startOffset, - Duration endOffset, - Duration joinOffset - ) implements ExecutionState {} + public boolean isComplete() { + return this.endOffset.isPresent(); + } } } diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SpanId.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SpanId.java new file mode 100644 index 0000000000..96f5795b0c --- /dev/null +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SpanId.java @@ -0,0 +1,10 @@ +package gov.nasa.jpl.aerie.merlin.driver.engine; + +import java.util.UUID; + +/** A typed wrapper for span IDs. */ +public record SpanId(String id) { + public static SpanId generate() { + return new SpanId(UUID.randomUUID().toString()); + } +} diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/timeline/Event.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/timeline/Event.java index e9f334a7da..7fd8840319 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/timeline/Event.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/timeline/Event.java @@ -1,6 +1,6 @@ package gov.nasa.jpl.aerie.merlin.driver.timeline; -import gov.nasa.jpl.aerie.merlin.driver.engine.TaskId; +import gov.nasa.jpl.aerie.merlin.driver.engine.SpanId; import gov.nasa.jpl.aerie.merlin.protocol.driver.Topic; import java.util.Objects; @@ -16,7 +16,7 @@ private Event(final Event.GenericEvent inner) { } public static - Event create(final Topic topic, final EventType event, final TaskId provenance) { + Event create(final Topic topic, final EventType event, final SpanId provenance) { return new Event(new Event.GenericEvent<>(topic, event, provenance)); } @@ -34,7 +34,7 @@ public Topic topic() { return this.inner.topic(); } - public TaskId provenance() { + public SpanId provenance() { return this.inner.provenance(); } @@ -43,7 +43,7 @@ public String toString() { return "<@%s, %s>".formatted(System.identityHashCode(this.inner.topic), this.inner.event); } - private record GenericEvent(Topic topic, EventType event, TaskId provenance) { + private record GenericEvent(Topic topic, EventType event, SpanId provenance) { private GenericEvent { Objects.requireNonNull(topic); Objects.requireNonNull(event); diff --git a/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/AnchorSimulationTest.java b/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/AnchorSimulationTest.java index fac9709efb..748256a98b 100644 --- a/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/AnchorSimulationTest.java +++ b/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/AnchorSimulationTest.java @@ -1083,6 +1083,7 @@ public OutputType getOutputType() { @Override public TaskFactory getTaskFactory(final Object o, final Object o2) { return executor -> $ -> { + $.pushSpan(); $.emit(this, delayedActivityDirectiveInputTopic); return TaskStatus.delayed(oneMinute, $$ -> { $$.emit(Unit.UNIT, delayedActivityDirectiveOutputTopic); @@ -1108,6 +1109,7 @@ public OutputType getOutputType() { @Override public TaskFactory getTaskFactory(final Object o, final Object o2) { return executor -> scheduler -> { + scheduler.pushSpan(); scheduler.emit(this, decomposingActivityDirectiveInputTopic); return TaskStatus.delayed( Duration.ZERO, diff --git a/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/engine/TaskFrameTest.java b/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/engine/TaskFrameTest.java index 44e8cbed5e..1b435dfbce 100644 --- a/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/engine/TaskFrameTest.java +++ b/merlin-driver/src/test/java/gov/nasa/jpl/aerie/merlin/driver/engine/TaskFrameTest.java @@ -28,7 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public final class TaskFrameTest { - private static final TaskId ORIGIN = TaskId.generate(); + private static final SpanId ORIGIN = SpanId.generate(); // This regression test identified a bug in the LiveCells-chain-avoidance optimization in TaskFrame. @Test diff --git a/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java b/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java index be17603c33..08af87e754 100644 --- a/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java +++ b/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java @@ -808,12 +808,14 @@ public Optional generateActivityMapper(final MissionModelRecord missio .map(effectModel -> CodeBlock .builder() .add( - "return $T.$L(() -> {$>\n$L$<});\n", + "return $T.$L(() -> $T.$L(() -> {$>\n$L$<}));\n", ModelActions.class, switch (effectModel.executor()) { case Threaded -> "threaded"; case Replaying -> "replaying"; }, + ModelActions.class, + "scoped", effectModel.returnType() .map(returnType -> CodeBlock .builder() @@ -835,6 +837,7 @@ public Optional generateActivityMapper(final MissionModelRecord missio .add( "return executor -> scheduler -> {$>\n$L$<};\n", CodeBlock.builder() + .addStatement("scheduler.pushSpan()") .addStatement("scheduler.emit($L, this.$L)", "activity", "inputTopic") .addStatement("scheduler.emit($T.UNIT, this.$L)", Unit.class, "outputTopic") .addStatement("return $T.completed($T.UNIT)", TaskStatus.class, Unit.class) diff --git a/merlin-framework/build.gradle b/merlin-framework/build.gradle index a52dde0379..66a07a16c6 100644 --- a/merlin-framework/build.gradle +++ b/merlin-framework/build.gradle @@ -15,6 +15,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java index 6738237a37..65a457d1b4 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java @@ -31,6 +31,8 @@ enum ContextType { Initializing, Reacting, Querying } void spawn(TaskFactory task); void call(TaskFactory task); + void pushSpan(); + void popSpan(); void delay(Duration duration); void waitUntil(Condition condition); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java index 9615830a69..5529a6dd6d 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java @@ -61,6 +61,16 @@ public void call(final TaskFactory task) { throw new IllegalStateException("Cannot yield during initialization"); } + @Override + public void pushSpan() { + // Do nothing. + } + + @Override + public void popSpan() { + // Do nothing. + } + @Override public void delay(final Duration duration) { throw new IllegalStateException("Cannot yield during initialization"); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java index 1c9019f328..bdbbabd3db 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java @@ -36,6 +36,22 @@ public static TaskFactory replaying(final Runnable task) { }); } + public static T scoped(final Supplier block) { + context.get().pushSpan(); + try { + return block.get(); + } finally { + context.get().popSpan(); + } + } + + public static void scoped(final Runnable block) { + scoped(() -> { + block.run(); + return Unit.UNIT; + }); + } + public static void emit(final T event, final Topic topic) { context.get().emit(event, topic); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java index 2425a9ab77..e2c0bf169f 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java @@ -52,6 +52,16 @@ public void call(final TaskFactory task) { throw new IllegalStateException("Cannot schedule tasks in a query-only context"); } + @Override + public void pushSpan() { + // Do nothing. + } + + @Override + public void popSpan() { + // Do nothing. + } + @Override public void delay(final Duration duration) { throw new IllegalStateException("Cannot yield in a query-only context"); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java index e9998ff256..e6e8d5bbfb 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java @@ -78,6 +78,20 @@ public void call(final TaskFactory task) { }); } + @Override + public void pushSpan() { + this.memory.doOnce(() -> { + this.scheduler.pushSpan(); + }); + } + + @Override + public void popSpan() { + this.memory.doOnce(() -> { + this.scheduler.popSpan(); + }); + } + @Override public void delay(final Duration duration) { this.memory.doOnce(() -> { diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java index 3edd568d9e..0a4984bda0 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java @@ -63,6 +63,16 @@ public void call(final TaskFactory task) { this.scheduler = this.handle.call(task); } + @Override + public void pushSpan() { + this.scheduler.pushSpan(); + } + + @Override + public void popSpan() { + this.scheduler.popSpan(); + } + @Override public void delay(final Duration duration) { this.scheduler = null; // Relinquish the current scheduler before yielding, in case an exception is thrown. diff --git a/merlin-framework/src/test/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTaskTest.java b/merlin-framework/src/test/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTaskTest.java index ac3942dcd0..b961a41be8 100644 --- a/merlin-framework/src/test/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTaskTest.java +++ b/merlin-framework/src/test/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTaskTest.java @@ -31,6 +31,16 @@ public void emit(final Event event, final Topic topic) { public void spawn(final TaskFactory task) { throw new UnsupportedOperationException(); } + + @Override + public void pushSpan() { + throw new UnsupportedOperationException(); + } + + @Override + public void popSpan() { + throw new UnsupportedOperationException(); + } }; final var pool = Executors.newCachedThreadPool(); diff --git a/merlin-sdk/build.gradle b/merlin-sdk/build.gradle index 908b26fad7..15cd7aa7bc 100644 --- a/merlin-sdk/build.gradle +++ b/merlin-sdk/build.gradle @@ -15,6 +15,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/driver/Scheduler.java b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/driver/Scheduler.java index 035658c136..c788a508d5 100644 --- a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/driver/Scheduler.java +++ b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/driver/Scheduler.java @@ -8,4 +8,8 @@ public interface Scheduler { void emit(Event event, Topic topic); void spawn(TaskFactory task); + + void pushSpan(); + + void popSpan(); } diff --git a/merlin-server/build.gradle b/merlin-server/build.gradle index 7e4557f5b6..d13902ecff 100644 --- a/merlin-server/build.gradle +++ b/merlin-server/build.gradle @@ -60,6 +60,10 @@ test { environment "CONSTRAINTS_DSL_COMPILER_ROOT", projectDir.toPath().resolve('constraints-dsl-compiler') environment "CONSTRAINTS_DSL_COMPILER_COMMAND", './build/main.js' + + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/parsing-utilities/build.gradle b/parsing-utilities/build.gradle index 3eaa311b8b..cb2790ca01 100644 --- a/parsing-utilities/build.gradle +++ b/parsing-utilities/build.gradle @@ -12,6 +12,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/scheduler-driver/build.gradle b/scheduler-driver/build.gradle index fef5cca0f9..77686981ed 100644 --- a/scheduler-driver/build.gradle +++ b/scheduler-driver/build.gradle @@ -12,6 +12,9 @@ java { test { useJUnitPlatform() + testLogging { + exceptionFormat = 'full' + } } jacocoTestReport { diff --git a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java index 2cf23d0897..5c6112af55 100644 --- a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java +++ b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java @@ -8,7 +8,7 @@ import gov.nasa.jpl.aerie.merlin.driver.StartOffsetReducer; import gov.nasa.jpl.aerie.merlin.driver.engine.JobSchedule; import gov.nasa.jpl.aerie.merlin.driver.engine.SimulationEngine; -import gov.nasa.jpl.aerie.merlin.driver.engine.TaskId; +import gov.nasa.jpl.aerie.merlin.driver.engine.SpanId; import gov.nasa.jpl.aerie.merlin.driver.timeline.LiveCells; import gov.nasa.jpl.aerie.merlin.driver.timeline.TemporalEventSource; import gov.nasa.jpl.aerie.merlin.protocol.driver.Topic; @@ -52,10 +52,10 @@ public class ResumableSimulationDriver implements AutoCloseable { private final Topic activityTopic = new Topic<>(); //mapping each activity name to its task id (in String form) in the simulation engine - private final Map plannedDirectiveToTask; + private final Map plannedDirectiveToTask; //subset of plannedDirectiveToTask to check for scheduling dependent tasks - private final Map toCheckForDependencyScheduling; + private final Map toCheckForDependencyScheduling; //simulation results so far private SimulationResults lastSimResults; @@ -317,7 +317,7 @@ private void simulateSchedule(final Map if (!plannedDirectiveToTask.isEmpty() && plannedDirectiveToTask .values() .stream() - .allMatch(engine::isTaskComplete)) { + .allMatch($ -> engine.getSpan($).isComplete())) { allTaskFinished = true; } @@ -340,7 +340,7 @@ private void simulateSchedule(final Map public Optional getActivityDuration(ActivityDirectiveId activityDirectiveId){ //potential cause of non presence: (1) activity is outside plan bounds (2) activity has not been simulated yet if(!plannedDirectiveToTask.containsKey(activityDirectiveId)) return Optional.empty(); - return engine.getTaskDuration(plannedDirectiveToTask.get(activityDirectiveId)); + return engine.getSpan(plannedDirectiveToTask.get(activityDirectiveId)).duration(); } private Set getSuccessorsToSchedule(final SimulationEngine engine) { @@ -348,7 +348,7 @@ private Set getSuccessorsToSchedule(final SimulationEngine final var iterator = toCheckForDependencyScheduling.entrySet().iterator(); while(iterator.hasNext()){ final var taskToCheck = iterator.next(); - if(engine.isTaskComplete(taskToCheck.getValue())){ + if(engine.getSpan(taskToCheck.getValue()).isComplete()){ toSchedule.add(taskToCheck.getKey()); iterator.remove(); } @@ -395,6 +395,7 @@ private static TaskFactory makeTaskFactory( final TaskFactory task, final Topic activityTopic) { return executor -> scheduler -> { + scheduler.pushSpan(); scheduler.emit(directiveId, activityTopic); return task.create(executor).step(scheduler); }; diff --git a/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java b/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java index 24f668ef9f..c419caedd3 100644 --- a/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java +++ b/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java @@ -666,6 +666,7 @@ public OutputType getOutputType() { @Override public TaskFactory getTaskFactory(final Object o, final Object o2) { return executor -> $ -> { + $.pushSpan(); $.emit(this, delayedActivityDirectiveInputTopic); return TaskStatus.delayed(oneMinute, $$ -> { $$.emit(Unit.UNIT, delayedActivityDirectiveOutputTopic); @@ -691,6 +692,7 @@ public OutputType getOutputType() { @Override public TaskFactory getTaskFactory(final Object o, final Object o2) { return executor -> scheduler -> { + scheduler.pushSpan(); scheduler.emit(this, decomposingActivityDirectiveInputTopic); return TaskStatus.delayed( Duration.ZERO,