diff --git a/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java b/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java index be17603c33..08af87e754 100644 --- a/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java +++ b/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MissionModelGenerator.java @@ -808,12 +808,14 @@ public Optional generateActivityMapper(final MissionModelRecord missio .map(effectModel -> CodeBlock .builder() .add( - "return $T.$L(() -> {$>\n$L$<});\n", + "return $T.$L(() -> $T.$L(() -> {$>\n$L$<}));\n", ModelActions.class, switch (effectModel.executor()) { case Threaded -> "threaded"; case Replaying -> "replaying"; }, + ModelActions.class, + "scoped", effectModel.returnType() .map(returnType -> CodeBlock .builder() @@ -835,6 +837,7 @@ public Optional generateActivityMapper(final MissionModelRecord missio .add( "return executor -> scheduler -> {$>\n$L$<};\n", CodeBlock.builder() + .addStatement("scheduler.pushSpan()") .addStatement("scheduler.emit($L, this.$L)", "activity", "inputTopic") .addStatement("scheduler.emit($T.UNIT, this.$L)", Unit.class, "outputTopic") .addStatement("return $T.completed($T.UNIT)", TaskStatus.class, Unit.class) diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java index 6738237a37..65a457d1b4 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java @@ -31,6 +31,8 @@ enum ContextType { Initializing, Reacting, Querying } void spawn(TaskFactory task); void call(TaskFactory task); + void pushSpan(); + void popSpan(); void delay(Duration duration); void waitUntil(Condition condition); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java index 9615830a69..5529a6dd6d 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java @@ -61,6 +61,16 @@ public void call(final TaskFactory task) { throw new IllegalStateException("Cannot yield during initialization"); } + @Override + public void pushSpan() { + // Do nothing. + } + + @Override + public void popSpan() { + // Do nothing. + } + @Override public void delay(final Duration duration) { throw new IllegalStateException("Cannot yield during initialization"); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java index 1c9019f328..bdbbabd3db 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java @@ -36,6 +36,22 @@ public static TaskFactory replaying(final Runnable task) { }); } + public static T scoped(final Supplier block) { + context.get().pushSpan(); + try { + return block.get(); + } finally { + context.get().popSpan(); + } + } + + public static void scoped(final Runnable block) { + scoped(() -> { + block.run(); + return Unit.UNIT; + }); + } + public static void emit(final T event, final Topic topic) { context.get().emit(event, topic); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java index 2425a9ab77..e2c0bf169f 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java @@ -52,6 +52,16 @@ public void call(final TaskFactory task) { throw new IllegalStateException("Cannot schedule tasks in a query-only context"); } + @Override + public void pushSpan() { + // Do nothing. + } + + @Override + public void popSpan() { + // Do nothing. + } + @Override public void delay(final Duration duration) { throw new IllegalStateException("Cannot yield in a query-only context"); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java index e9998ff256..e6e8d5bbfb 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java @@ -78,6 +78,20 @@ public void call(final TaskFactory task) { }); } + @Override + public void pushSpan() { + this.memory.doOnce(() -> { + this.scheduler.pushSpan(); + }); + } + + @Override + public void popSpan() { + this.memory.doOnce(() -> { + this.scheduler.popSpan(); + }); + } + @Override public void delay(final Duration duration) { this.memory.doOnce(() -> { diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java index 3edd568d9e..0a4984bda0 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java @@ -63,6 +63,16 @@ public void call(final TaskFactory task) { this.scheduler = this.handle.call(task); } + @Override + public void pushSpan() { + this.scheduler.pushSpan(); + } + + @Override + public void popSpan() { + this.scheduler.popSpan(); + } + @Override public void delay(final Duration duration) { this.scheduler = null; // Relinquish the current scheduler before yielding, in case an exception is thrown.