Skip to content

Commit

Permalink
Merge pull request #1212 from DavidLegg/feature--job-schedule-speedup
Browse files Browse the repository at this point in the history
Use skip-list multi-map to speed up job queue
  • Loading branch information
mattdailis authored Feb 12, 2024
2 parents bdcc6b0 + 7a3b8ff commit 8adac55
Showing 1 changed file with 17 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,59 +1,55 @@
package gov.nasa.jpl.aerie.merlin.driver.engine;

import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;
import org.apache.commons.lang3.tuple.Pair;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;

public final class JobSchedule<JobRef, TimeRef extends SchedulingInstant> {
/** The scheduled time for each upcoming job. */
private final Map<JobRef, TimeRef> scheduledJobs = new HashMap<>();

/** A time-ordered queue of all tasks whose resumption time is concretely known. */
@DerivedFrom("scheduledJobs")
private final PriorityQueue<Pair<TimeRef, JobRef>> queue = new PriorityQueue<>(Comparator.comparing(Pair::getLeft));
private final ConcurrentSkipListMap<TimeRef, Set<JobRef>> queue = new ConcurrentSkipListMap<>();

public void schedule(final JobRef job, final TimeRef time) {
final var oldTime = this.scheduledJobs.put(job, time);

if (oldTime != null) this.queue.remove(Pair.of(oldTime, job));
this.queue.add(Pair.of(time, job));
if (oldTime != null) removeJobFromQueue(oldTime, job);

this.queue.computeIfAbsent(time, $ -> new HashSet<>()).add(job);
}

public void unschedule(final JobRef job) {
final var oldTime = this.scheduledJobs.remove(job);
if (oldTime != null) removeJobFromQueue(oldTime, job);
}

if (oldTime != null) this.queue.remove(Pair.of(oldTime, job));
private void removeJobFromQueue(TimeRef time, JobRef job) {
var jobsAtOldTime = this.queue.get(time);
jobsAtOldTime.remove(job);
if (jobsAtOldTime.isEmpty()) {
this.queue.remove(time);
}
}

public Batch<JobRef> extractNextJobs(final Duration maximumTime) {
if (this.queue.isEmpty()) return new Batch<>(maximumTime, Collections.emptySet());

final var time = this.queue.peek().getKey();
final var time = this.queue.firstKey();
if (time.project().longerThan(maximumTime)) {
return new Batch<>(maximumTime, Collections.emptySet());
}

// Ready all tasks at the soonest task time.
final var readyJobs = new HashSet<JobRef>();
while (true) {
final var entry = this.queue.peek();
if (entry == null) break;
if (entry.getLeft().compareTo(time) > 0) break;

this.scheduledJobs.remove(entry.getRight());
this.queue.remove();

readyJobs.add(entry.getRight());
}

return new Batch<>(time.project(), readyJobs);
final var entry = this.queue.pollFirstEntry();
entry.getValue().forEach(this.scheduledJobs::remove);
return new Batch<>(entry.getKey().project(), entry.getValue());
}

public void clear() {
Expand Down

0 comments on commit 8adac55

Please sign in to comment.