From 990edf5106165543e9d984191ff17c789f31cc79 Mon Sep 17 00:00:00 2001 From: David Legg Date: Mon, 23 Oct 2023 12:43:04 -0700 Subject: [PATCH 1/2] Use skip-list multi-map to speed up job queue --- .../merlin/driver/engine/JobSchedule.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/JobSchedule.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/JobSchedule.java index d4ffdd731c..28a740fb13 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/JobSchedule.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/JobSchedule.java @@ -1,15 +1,13 @@ package gov.nasa.jpl.aerie.merlin.driver.engine; import gov.nasa.jpl.aerie.merlin.protocol.types.Duration; -import org.apache.commons.lang3.tuple.Pair; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; public final class JobSchedule { /** The scheduled time for each upcoming job. */ @@ -17,43 +15,45 @@ public final class JobSchedule { /** A time-ordered queue of all tasks whose resumption time is concretely known. */ @DerivedFrom("scheduledJobs") - private final PriorityQueue> queue = new PriorityQueue<>(Comparator.comparing(Pair::getLeft)); + private final ConcurrentSkipListMap> queue = new ConcurrentSkipListMap<>(); public void schedule(final JobRef job, final TimeRef time) { final var oldTime = this.scheduledJobs.put(job, time); - if (oldTime != null) this.queue.remove(Pair.of(oldTime, job)); - this.queue.add(Pair.of(time, job)); + if (oldTime != null) removeJobFromQueue(oldTime, job); + + this.queue.compute(time, (t, jobsAtNewTime) -> { + if (jobsAtNewTime == null) jobsAtNewTime = new HashSet<>(); + jobsAtNewTime.add(job); + return jobsAtNewTime; + }); } public void unschedule(final JobRef job) { final var oldTime = this.scheduledJobs.remove(job); + if (oldTime != null) removeJobFromQueue(oldTime, job); + } - if (oldTime != null) this.queue.remove(Pair.of(oldTime, job)); + private void removeJobFromQueue(TimeRef time, JobRef job) { + var jobsAtOldTime = this.queue.get(time); + jobsAtOldTime.remove(job); + if (jobsAtOldTime.isEmpty()) { + this.queue.remove(time); + } } public Batch extractNextJobs(final Duration maximumTime) { if (this.queue.isEmpty()) return new Batch<>(maximumTime, Collections.emptySet()); - final var time = this.queue.peek().getKey(); + final var time = this.queue.firstKey(); if (time.project().longerThan(maximumTime)) { return new Batch<>(maximumTime, Collections.emptySet()); } // Ready all tasks at the soonest task time. - final var readyJobs = new HashSet(); - while (true) { - final var entry = this.queue.peek(); - if (entry == null) break; - if (entry.getLeft().compareTo(time) > 0) break; - - this.scheduledJobs.remove(entry.getRight()); - this.queue.remove(); - - readyJobs.add(entry.getRight()); - } - - return new Batch<>(time.project(), readyJobs); + final var entry = this.queue.pollFirstEntry(); + entry.getValue().forEach(this.scheduledJobs::remove); + return new Batch<>(entry.getKey().project(), entry.getValue()); } public void clear() { From 7a3b8ff082e354e0df481bc75fe02e171c63180a Mon Sep 17 00:00:00 2001 From: DavidLegg Date: Tue, 14 Nov 2023 11:56:41 -0800 Subject: [PATCH 2/2] Clarify "add job" expression Co-authored-by: Matt Dailis --- .../nasa/jpl/aerie/merlin/driver/engine/JobSchedule.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/JobSchedule.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/JobSchedule.java index 28a740fb13..4f7f7bda02 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/JobSchedule.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/JobSchedule.java @@ -22,11 +22,7 @@ public void schedule(final JobRef job, final TimeRef time) { if (oldTime != null) removeJobFromQueue(oldTime, job); - this.queue.compute(time, (t, jobsAtNewTime) -> { - if (jobsAtNewTime == null) jobsAtNewTime = new HashSet<>(); - jobsAtNewTime.add(job); - return jobsAtNewTime; - }); + this.queue.computeIfAbsent(time, $ -> new HashSet<>()).add(job); } public void unschedule(final JobRef job) {