Skip to content

Commit

Permalink
Remove set-based implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdailis committed Mar 27, 2024
1 parent a8c1dda commit b701007
Showing 1 changed file with 9 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public final class SimulationEngine implements AutoCloseable {
private final Map<SpanId, Span> spans = new HashMap<>();
/** A count of the direct contributors to each span, including child spans and tasks. */
private final Map<SpanId, MutableInt> spanContributorCount = new HashMap<>();
private final SpanContributors spanContributors = new SpanContributors();

/** A thread pool that modeled tasks can use to keep track of their state between steps. */
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
Expand All @@ -88,14 +87,10 @@ public <Return> SpanId scheduleTask(final Duration startTime, final TaskFactory<
this.spans.put(span, new Span(Optional.empty(), startTime, Optional.empty()));

final var task = TaskId.generate();
this.spanContributors.newSpan(span);
this.spanContributors.addContributor(span, task);
this.spanContributorCount.put(span, new MutableInt(1));
this.tasks.put(task, new ExecutionState<>(span, 0, Optional.empty(), state.create(this.executor)));
this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(startTime));

this.spanContributors.checkIntegrity(spanContributorCount);

return span;
}

Expand Down Expand Up @@ -196,7 +191,7 @@ private <Return> void stepEffectModel(
final Duration currentTime
) {
// Step the modeling state forward.
final var scheduler = new EngineScheduler(currentTime, progress.shadowedSpans(), progress.span(), task, progress.caller(), frame);
final var scheduler = new EngineScheduler(currentTime, progress.shadowedSpans(), progress.span(), progress.caller(), frame);
final var status = progress.state().step(scheduler);

// TODO: Report which topics this activity wrote to at this point in time. This is useful insight for any user.
Expand All @@ -206,44 +201,19 @@ private <Return> void stepEffectModel(
if (status instanceof TaskStatus.Completed<Return>) {
// Propagate completion up the span hierarchy.
// TERMINATION: The span hierarchy is a finite tree, so eventually we find a parentless span.
{
var span = scheduler.span;
while (true) {
if (this.spanContributorCount.get(span).decrementAndGet() > 0) break;
this.spanContributorCount.remove(span);

this.spans.compute(span, (_id, $) -> $.close(currentTime));
var span = scheduler.span;
while (true) {
if (this.spanContributorCount.get(span).decrementAndGet() > 0) break;
this.spanContributorCount.remove(span);

final var span$ = this.spans.get(span).parent;
if (span$.isEmpty()) break;
this.spans.compute(span, (_id, $) -> $.close(currentTime));

span = span$.get();
}
}
final var span$ = this.spans.get(span).parent;
if (span$.isEmpty()) break;

{
var span = scheduler.span;
this.spanContributors.removeContributor(span, task);

if (!this.spanContributors.spanHasContributors(span)) {
this.spanContributors.removeSpan(span);

if (this.spans.get(span).parent.isPresent()) {
var parentSpan = this.spans.get(span).parent.get();
while (true) {
this.spanContributors.removeContributor(parentSpan, span);
if (this.spanContributors.spanHasContributors(parentSpan)) break;
this.spanContributors.removeSpan(parentSpan);
span = parentSpan;
final var parentSpan$ = this.spans.get(span).parent;
if (parentSpan$.isEmpty()) break;
parentSpan = parentSpan$.get();
}
}
}
span = span$.get();
}

this.spanContributors.checkIntegrity(this.spanContributorCount);

// Notify any blocked caller of our completion.
progress.caller().ifPresent($ -> {
Expand All @@ -260,14 +230,11 @@ private <Return> void stepEffectModel(
} else if (status instanceof TaskStatus.CallingTask<Return> s) {
final var target = TaskId.generate();
SimulationEngine.this.spanContributorCount.get(scheduler.span).increment();
SimulationEngine.this.spanContributors.addContributor(scheduler.span, target);
SimulationEngine.this.tasks.put(target, new ExecutionState<>(scheduler.span, 0, Optional.of(task), s.child().create(this.executor)));
SimulationEngine.this.blockedTasks.put(task, new MutableInt(1));
frame.signal(JobId.forTask(target));

this.tasks.put(task, progress.continueWith(scheduler.span, scheduler.shadowedSpans, s.continuation()));

SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount);
} else if (status instanceof TaskStatus.AwaitingCondition<Return> s) {
final var condition = ConditionId.generate();
this.conditions.put(condition, s.condition());
Expand Down Expand Up @@ -684,22 +651,19 @@ private final class EngineScheduler implements Scheduler {
private final Duration currentTime;
private int shadowedSpans;
private SpanId span;
private final TaskId taskId;
private final Optional<TaskId> caller;
private final TaskFrame<JobId> frame;

public EngineScheduler(
final Duration currentTime,
final int shadowedSpans,
final SpanId span,
final TaskId taskId,
final Optional<TaskId> caller,
final TaskFrame<JobId> frame)
{
this.currentTime = Objects.requireNonNull(currentTime);
this.shadowedSpans = shadowedSpans;
this.span = Objects.requireNonNull(span);
this.taskId = Objects.requireNonNull(taskId);
this.caller = Objects.requireNonNull(caller);
this.frame = Objects.requireNonNull(frame);
}
Expand Down Expand Up @@ -727,13 +691,10 @@ public <EventType> void emit(final EventType event, final Topic<EventType> topic
@Override
public void spawn(final TaskFactory<?> state) {
final var task = TaskId.generate();
SimulationEngine.this.spanContributors.addContributor(this.span, task);
SimulationEngine.this.spanContributorCount.get(this.span).increment();
SimulationEngine.this.tasks.put(task, new ExecutionState<>(this.span, 0, this.caller, state.create(SimulationEngine.this.executor)));
this.caller.ifPresent($ -> SimulationEngine.this.blockedTasks.get($).increment());
this.frame.signal(JobId.forTask(task));

SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount);
}

@Override
Expand All @@ -743,25 +704,15 @@ public void pushSpan() {
this.span = SpanId.generate();

SimulationEngine.this.spans.put(this.span, new Span(Optional.of(parentSpan), this.currentTime, Optional.empty()));
SimulationEngine.this.spanContributors.removeContributor(parentSpan, this.taskId);
SimulationEngine.this.spanContributors.addContributor(parentSpan, this.span);
SimulationEngine.this.spanContributors.newSpan(span);
SimulationEngine.this.spanContributors.addContributor(span, this.taskId);
SimulationEngine.this.spanContributorCount.put(this.span, new MutableInt(1));

SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount);
}

@Override
public void popSpan() {
SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount);

// TODO: Do we want to throw an error instead?
if (this.shadowedSpans == 0) return;
final SpanId parentSpan = SimulationEngine.this.spans.get(this.span).parent().orElseThrow();

SimulationEngine.this.spanContributors.removeContributor(this.span, this.taskId);

if (SimulationEngine.this.spanContributorCount.get(this.span).decrementAndGet() == 0) {
SimulationEngine.this.spanContributorCount.remove(this.span);
SimulationEngine.this.spans.compute(this.span, (_id, $) -> $.close(currentTime));
Expand All @@ -771,20 +722,12 @@ public void popSpan() {
SimulationEngine.this.spanContributorCount.get(parentSpan).increment();
}

if (!SimulationEngine.this.spanContributors.spanHasContributors(this.span)) {
SimulationEngine.this.spanContributors.removeContributor(parentSpan, this.span);
SimulationEngine.this.spanContributors.removeSpan(this.span);
}

// NOTE: We don't need to propagate completion any further, because the next shadowed span
// has by definition not been completed: this task may still contribute to it, and this task
// has not terminated.

this.shadowedSpans -= 1;
this.span = parentSpan;
SimulationEngine.this.spanContributors.addContributor(this.span, this.taskId);

SimulationEngine.this.spanContributors.checkIntegrity(spanContributorCount);
}
}

Expand Down Expand Up @@ -842,102 +785,4 @@ public boolean isComplete() {
return this.endOffset.isPresent();
}
}

sealed interface TaskOrSpanId {
record Task(TaskId id) implements TaskOrSpanId {}
record Span(SpanId id) implements TaskOrSpanId {}

static TaskOrSpanId of(SpanId spanId) {
return new Span(spanId);
}

static TaskOrSpanId of(TaskId taskId) {
return new Task(taskId);
}
}

public static class SpanContributors {
private final Map<SpanId, Set<TaskOrSpanId>> spanContributors;
private final List<String> operations = new ArrayList<>();

public SpanContributors() {
this.spanContributors = new LinkedHashMap<>();
}

public void newSpan(SpanId span) {
if (this.spanContributors.containsKey(span)) throw new IllegalStateException("New span clashes with existing span");
this.spanContributors.put(span, new LinkedHashSet<>());
operations.add("newSpan " + span);
}

public void removeContributor(SpanId span, TaskId taskId) {
removeContributor(span, TaskOrSpanId.of(taskId));
}

public void removeContributor(SpanId span, SpanId spanId) {
removeContributor(span, TaskOrSpanId.of(spanId));
}

private void removeContributor(SpanId span, TaskOrSpanId contributor) {
if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span");
if (!this.spanContributors.get(span).contains(contributor)) throw new IllegalArgumentException("Span did not have this contributor");
this.spanContributors.get(span).remove(contributor);
operations.add("removeContributor " + span + " " + contributor);
}

public boolean spanHasContributors(SpanId span) {
if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span");
return !spanContributors.get(span).isEmpty();
}

public void removeSpan(SpanId span) {
if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span");
if (!this.spanContributors.get(span).isEmpty()) throw new IllegalStateException("Cannot remove span that still has contributors");

this.spanContributors.remove(span);
operations.add("removeSpan " + span);
}

public void checkIntegrity(final Map<SpanId, MutableInt> spanContributorsCount) {
Set<TaskOrSpanId> contributors = new LinkedHashSet<>();
for (final var entry : this.spanContributors.entrySet()) {
if (entry.getValue().isEmpty()) throw new IllegalStateException("Empty contributor not removed");
for (final var contributor : entry.getValue()) {
if (contributors.contains(contributor)) throw new IllegalStateException("Contributor appears twice");
contributors.add(contributor);
if (!(contributor instanceof TaskOrSpanId.Span c)) continue;
if (!this.spanContributors.containsKey(c.id())) {
throw new IllegalStateException("Orphaned contributor");
}
}
}

for (final var entry : spanContributorsCount.entrySet()) {
if (!this.spanContributors.containsKey(entry.getKey())) throw new IllegalArgumentException("Mismatch!");
if (!entry.getValue().getValue().equals(this.spanContributors.get(entry.getKey()).size())) {
throw new IllegalArgumentException("Mismatch! " + entry.getValue().getValue() + " != " + this.spanContributors.get(entry.getKey()).size());
}
}

for (final var key : this.spanContributors.keySet()) {
if (!spanContributorsCount.containsKey(key)) throw new IllegalArgumentException("Mismatch!");
}

operations.clear();
}

public void addContributor(SpanId span, TaskId contributor) {
addContributor(span, TaskOrSpanId.of(contributor));
}

public void addContributor(SpanId span, SpanId contributor) {
addContributor(span, TaskOrSpanId.of(contributor));
}

private void addContributor(SpanId span, TaskOrSpanId contributor) {
if (!this.spanContributors.containsKey(span)) throw new IllegalArgumentException("No such span");
this.spanContributors.get(span).add(contributor);
operations.add("addContributor " + span + " " + contributor);
}
}
}

0 comments on commit b701007

Please sign in to comment.