Skip to content

Commit

Permalink
implement plan.events(...) procedural scheduling query
Browse files Browse the repository at this point in the history
  • Loading branch information
pranav-super committed Nov 4, 2024
1 parent 8118519 commit d2096f6
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import gov.nasa.ammos.aerie.procedural.timeline.Interval;
import gov.nasa.ammos.aerie.procedural.timeline.collections.Directives;
import gov.nasa.ammos.aerie.procedural.timeline.collections.ExternalEvents;
import gov.nasa.ammos.aerie.procedural.timeline.ops.SerialSegmentOps;
import gov.nasa.ammos.aerie.procedural.timeline.payloads.Segment;
import gov.nasa.ammos.aerie.procedural.timeline.plan.EventQuery;
import gov.nasa.ammos.aerie.procedural.timeline.plan.Plan;
import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;
import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue;
Expand Down Expand Up @@ -51,4 +53,10 @@ public <V, TL extends SerialSegmentOps<V, TL>> TL resource(
{
throw new NotImplementedError();
}

@NotNull
@Override
public ExternalEvents events(@NotNull final EventQuery query) {
throw new NotImplementedError();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package gov.nasa.ammos.aerie.procedural.timeline.collections

import gov.nasa.ammos.aerie.procedural.timeline.Timeline
import gov.nasa.ammos.aerie.procedural.timeline.BaseTimeline
import gov.nasa.ammos.aerie.procedural.timeline.payloads.activities.Instance
import gov.nasa.ammos.aerie.procedural.timeline.ops.*
import gov.nasa.ammos.aerie.procedural.timeline.ops.coalesce.CoalesceNoOp
import gov.nasa.ammos.aerie.procedural.timeline.payloads.ExternalEvent
import gov.nasa.ammos.aerie.procedural.timeline.payloads.ExternalSource
import gov.nasa.ammos.aerie.procedural.timeline.util.preprocessList

/**
* A timeline of external events.
*/
data class ExternalEvents(private val timeline: Timeline<ExternalEvent, ExternalEvents>):
Timeline<ExternalEvent, ExternalEvents> by timeline,
NonZeroDurationOps<ExternalEvent, ExternalEvents>,
ParallelOps<ExternalEvent, ExternalEvents>
{
constructor(vararg events: ExternalEvent): this(events.asList())
constructor(events: List<ExternalEvent>): this(BaseTimeline(::ExternalEvents, preprocessList(events, null)))

/** Filter by one or more types. */
fun filterByType(vararg types: String) = filter { it.type in types }

/** Filter by one or more event sources. */
fun filterBySource(vararg sources: ExternalSource) = filter { it.source in sources }

/** Filter by one or more derivation groups. */
fun filterByDerivationGroup(vararg groups: String) = filter { it.source.derivationGroup in groups }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package gov.nasa.ammos.aerie.procedural.timeline.payloads

import gov.nasa.ammos.aerie.procedural.timeline.Interval
import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue

/** An external event instance. */
data class ExternalEvent(
/** The string name of this event. */
@JvmField
val key: String,
/** The type of the event. */
@JvmField
val type: String,
/** The source this event comes from. */
@JvmField
val source: ExternalSource,
override val interval: Interval,
): IntervalLike<ExternalEvent> {
override fun withNewInterval(i: Interval) = ExternalEvent(key, type, source, i)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gov.nasa.ammos.aerie.procedural.timeline.payloads

/**
* An external source instance. Used for querying purposes - see EventQuery.kt.
* The included fields represent the primary key used to identify External Sources.
*/
data class ExternalSource(
/** The string name of this source. */
@JvmField
val key: String,
/** The derivation group that this source is a member of. */
@JvmField
val derivationGroup: String,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package gov.nasa.ammos.aerie.procedural.timeline.plan

import gov.nasa.ammos.aerie.procedural.timeline.payloads.ExternalSource

/** Fields for filtering events as they are queried. */
data class EventQuery(
/**
* A nullable list of derivation groups; the event must belong to one of them if present.
*
* If null, all derivation groups are allowed.
*/
val derivationGroups: List<String>?,

/**
* A nullable list of eventTypes; the event must belong to one of them if present.
*
* If null, all types are allowed.
*/
val eventTypes: List<String>?,

/**
* A nullable list of sources (described as a tuple of the source's (key, derivation group name)); the event must
* belong to one of them if present.
*
* If null, all sources are allowed.
*/
val sources: List<ExternalSource>?,
) {
constructor(derivationGroup: String?, eventType: String?, source: ExternalSource?): this(
derivationGroup?.let { listOf(it) },
eventType?.let { listOf(it) },
source?.let { listOf(it) }
)
constructor(): this(null as String?, null, null)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import gov.nasa.ammos.aerie.procedural.timeline.payloads.activities.AnyDirective
import gov.nasa.ammos.aerie.procedural.timeline.collections.Directives
import gov.nasa.ammos.aerie.procedural.timeline.ops.SerialSegmentOps
import gov.nasa.ammos.aerie.procedural.timeline.payloads.Segment
import gov.nasa.ammos.aerie.procedural.timeline.collections.ExternalEvents
import java.time.Instant

/** An interface for querying plan information and simulation results. */
Expand Down Expand Up @@ -41,4 +42,9 @@ interface Plan {
* @param name string name of the resource
*/
fun <V: Any, TL: SerialSegmentOps<V, TL>> resource(name: String, deserializer: (List<Segment<SerializedValue>>) -> TL): TL

/** Get external events associated with this plan. */
fun events(query: EventQuery): ExternalEvents
/** Get all external events across all derivation groups associated with this plan. */
fun events() = events(EventQuery())
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.nasa.jpl.aerie.scheduler.goals;

import gov.nasa.ammos.aerie.procedural.timeline.payloads.ExternalEvent;
import gov.nasa.jpl.aerie.merlin.driver.MissionModel;
import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue;
import gov.nasa.ammos.aerie.procedural.scheduling.ProcedureMapper;
Expand Down Expand Up @@ -43,7 +44,8 @@ public void run(
final MissionModel<?> missionModel,
final Function<String, ActivityType> lookupActivityType,
final SimulationFacade simulationFacade,
final DirectiveIdGenerator idGenerator
final DirectiveIdGenerator idGenerator,
Map<String, List<ExternalEvent>> eventsByDerivationGroup
) {
final ProcedureMapper<?> procedureMapper;
try {
Expand All @@ -57,6 +59,7 @@ public void run(
final var planAdapter = new SchedulerToProcedurePlanAdapter(
plan,
planHorizon,
eventsByDerivationGroup,
problem.getDiscreteExternalProfiles(),
problem.getRealExternalProfiles()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.nasa.jpl.aerie.scheduler.model;

import gov.nasa.ammos.aerie.procedural.timeline.payloads.ExternalEvent;
import gov.nasa.jpl.aerie.constraints.model.DiscreteProfile;
import gov.nasa.jpl.aerie.constraints.model.LinearProfile;
import gov.nasa.jpl.aerie.merlin.driver.MissionModel;
Expand Down Expand Up @@ -47,6 +48,7 @@ public class Problem {

private final Map<String, LinearProfile> realExternalProfiles = new HashMap<>();
private final Map<String, DiscreteProfile> discreteExternalProfiles = new HashMap<>();
private Map<String, List<ExternalEvent>> eventsByDerivationGroup = new HashMap<>();

/**
* the initial seed plan to start scheduling from
Expand Down Expand Up @@ -171,6 +173,10 @@ public void setExternalProfile(final Map<String, LinearProfile> realExternalProf
this.discreteExternalProfiles.putAll(discreteExternalProfiles);
}

public void setEventsByDerivationGroup(final Map<String, List<ExternalEvent>> events) {
this.eventsByDerivationGroup = events;
}

public Map<String, LinearProfile> getRealExternalProfiles(){
return this.realExternalProfiles;
}
Expand All @@ -179,6 +185,8 @@ public Map<String, DiscreteProfile> getDiscreteExternalProfiles(){
return this.discreteExternalProfiles;
}

public Map<String, List<ExternalEvent>> getEventsByDerivationGroup() { return this.eventsByDerivationGroup; }

public void setGoals(List<Goal> goals){
goalsOrderedByPriority.clear();
goalsOrderedByPriority.addAll(goals);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static gov.nasa.jpl.aerie.merlin.protocol.types.Duration.MICROSECOND;
import static gov.nasa.jpl.aerie.merlin.protocol.types.Duration.ZERO;
import static gov.nasa.jpl.aerie.merlin.protocol.types.Duration.min;

/**
* prototype scheduling algorithm that schedules activities for a plan
Expand Down Expand Up @@ -323,7 +322,16 @@ private void satisfyGoal(Goal goal) throws SchedulingInterruptedException{
satisfyOptionGoal(optionGoal);
} else if (goal instanceof Procedure procedure) {
if (!analysisOnly) {
procedure.run(problem, plan.getEvaluation(), plan, problem.getMissionModel(), this.problem::getActivityType, this.simulationFacade, this.idGenerator);
procedure.run(
problem,
plan.getEvaluation(),
plan,
problem.getMissionModel(),
this.problem::getActivityType,
this.simulationFacade,
this.idGenerator,
this.problem.getEventsByDerivationGroup()
);
}
} else {
satisfyGoalGeneral(goal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package gov.nasa.jpl.aerie.scheduler.plan

import gov.nasa.ammos.aerie.procedural.timeline.Interval
import gov.nasa.ammos.aerie.procedural.timeline.collections.Directives
import gov.nasa.ammos.aerie.procedural.timeline.collections.ExternalEvents
import gov.nasa.ammos.aerie.procedural.timeline.payloads.ExternalEvent
import gov.nasa.ammos.aerie.procedural.timeline.ops.SerialSegmentOps
import gov.nasa.ammos.aerie.procedural.timeline.payloads.Segment
import gov.nasa.ammos.aerie.procedural.timeline.payloads.activities.Directive
import gov.nasa.ammos.aerie.procedural.timeline.payloads.activities.DirectiveStart
import gov.nasa.ammos.aerie.procedural.timeline.payloads.activities.DirectiveStart.Anchor.AnchorPoint.Companion.anchorToStart
import gov.nasa.ammos.aerie.procedural.timeline.plan.EventQuery
import gov.nasa.ammos.aerie.procedural.timeline.util.duration.minus
import gov.nasa.ammos.aerie.procedural.timeline.util.duration.plus
import gov.nasa.jpl.aerie.constraints.model.DiscreteProfile
Expand All @@ -23,8 +26,9 @@ import gov.nasa.jpl.aerie.scheduler.model.Plan as SchedulerPlan
data class SchedulerToProcedurePlanAdapter(
private val schedulerPlan: SchedulerPlan,
private val planningHorizon: PlanningHorizon,
private val eventsByDerivationGroup: Map<String, List<ExternalEvent>>,
private val discreteExternalResources: Map<String, DiscreteProfile>,
private val realExternalResources: Map<String, LinearProfile>,
private val realExternalResources: Map<String, LinearProfile>
): TimelinePlan, SchedulerPlan by schedulerPlan {
override fun totalBounds() = Interval.between(Duration.ZERO, planningHorizon.aerieHorizonDuration)

Expand Down Expand Up @@ -55,6 +59,16 @@ data class SchedulerToProcedurePlanAdapter(
return Directives(result)
}

override fun events(query: EventQuery): ExternalEvents {
var result = if (query.derivationGroups != null) query.derivationGroups!!.flatMap {
eventsByDerivationGroup[it]
?: throw Error("derivation group either doesn't exist or isn't associated with plan: $it")
}
else eventsByDerivationGroup.values.flatten()
if (query.eventTypes != null) result = result.filter { it.type in query.eventTypes!! }
if (query.sources != null) result = result.filter { it.source in query.sources!! }
return ExternalEvents(result)
}
override fun <V : Any, TL : SerialSegmentOps<V, TL>> resource(
name: String,
deserializer: (List<Segment<SerializedValue>>) -> TL
Expand Down
1 change: 1 addition & 0 deletions scheduler-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
implementation project(':permissions')
implementation project(':constraints')
implementation project(':scheduler-driver')
implementation project(':procedural:timeline')
implementation project(':procedural:scheduling')
implementation project(':type-utils')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package gov.nasa.jpl.aerie.scheduler.server.services;

import gov.nasa.ammos.aerie.procedural.timeline.Interval;
import gov.nasa.ammos.aerie.procedural.timeline.payloads.ExternalEvent;
import gov.nasa.ammos.aerie.procedural.timeline.payloads.ExternalSource;
import gov.nasa.jpl.aerie.constraints.model.DiscreteProfile;
import gov.nasa.jpl.aerie.constraints.model.LinearProfile;
import gov.nasa.jpl.aerie.json.BasicParsers;
Expand Down Expand Up @@ -60,7 +63,9 @@
import java.nio.file.Path;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -979,8 +984,7 @@ public Optional<List<DatasetMetadata>> getExternalDatasets(final PlanId planId)

@Override
public ExternalProfiles getExternalProfiles(final PlanId planId)
throws MerlinServiceException, IOException
{
throws MerlinServiceException, IOException {
final Map<String, LinearProfile> realProfiles = new HashMap<>();
final Map<String, DiscreteProfile> discreteProfiles = new HashMap<>();
final var resourceTypes = new ArrayList<ResourceType>();
Expand All @@ -1004,7 +1008,48 @@ public ExternalProfiles getExternalProfiles(final PlanId planId)
}
}
return new ExternalProfiles(realProfiles, discreteProfiles, resourceTypes);
}
}

@Override
public Map<String, List<ExternalEvent>> getExternalEvents(final PlanId planId, final Instant horizonStart)
throws MerlinServiceException, IOException {
final var derivationGroupsRequest = """
query DerivationGroupsForPlan {
plan_derivation_group(where: {plan_id: {_eq: %d}}) {
derivation_group_name
}
}
""".formatted(planId.id());
final JsonObject derivationGroupsResponse = postRequest(derivationGroupsRequest).get();
final var derivationGroups = Json.createArrayBuilder(
derivationGroupsResponse.getJsonObject("data").getJsonArray("plan_derivation_group")
.stream().map($ -> $.asJsonObject().getString("derivation_group_name")).toList()
).build();

final var eventsRequest = """
query DerivedEventsForPlan {
derived_events(where: {derivation_group_name: {_in: %s}}) {
source_key
event_type_name
event_key
duration
derivation_group_name
source_range
start_time
valid_at
}
}""".formatted(derivationGroups);
final JsonObject eventsResponse = postRequest(eventsRequest).get();

final var data = eventsResponse.getJsonObject("data").getJsonArray("derived_events");
final var unorganized = parseExternalEvents(data, horizonStart);
final var result = new HashMap<String, List<ExternalEvent>>();
for (final var event: unorganized) {
final var list = result.computeIfAbsent(event.source.derivationGroup, $ -> new ArrayList<>());
list.add(event);
}
return result;
}

private Collection<ResourceType> extractResourceTypes(final ProfileSet profileSet){
final var resourceTypes = new ArrayList<ResourceType>();
Expand Down Expand Up @@ -1085,7 +1130,7 @@ private ProfileSet parseProfiles(JsonArray dataset){
return new ProfileSet(realProfiles, discreteProfiles);
}

public <Dynamics> ResourceProfile<Optional<Dynamics>> parseProfile(JsonObject profile, JsonParser<Dynamics> dynamicsParser){
private <Dynamics> ResourceProfile<Optional<Dynamics>> parseProfile(JsonObject profile, JsonParser<Dynamics> dynamicsParser){
// Profile segments are stored with their start offset relative to simulation start
// We must convert these to durations describing how long each segment lasts
final var type = chooseP(discreteValueSchemaTypeP, realValueSchemaTypeP).parse(profile.getJsonObject("type")).getSuccessOrThrow();
Expand Down Expand Up @@ -1130,6 +1175,27 @@ public <Dynamics> ResourceProfile<Optional<Dynamics>> parseProfile(JsonObject pr
return ResourceProfile.of(type, segments);
}

private List<ExternalEvent> parseExternalEvents(final JsonArray eventsJson, final Instant horizonStart) {
final var result = new ArrayList<ExternalEvent>();
for (final var eventJson : eventsJson) {
final var e = eventJson.asJsonObject();
final var start = new Duration(
horizonStart.until(ZonedDateTime.parse(e.getString("start_time")).toInstant(), ChronoUnit.MICROS)
);
final var end = start.plus(Duration.fromString(e.getString("duration")));
result.add(new ExternalEvent(
e.getString("event_key"),
e.getString("event_type_name"),
new ExternalSource(
e.getString("source_key"),
e.getString("derivation_group_name")
),
Interval.between(start, end)
));
}
return result;
}

private Map<ActivityInstanceId, ActivityInstance> parseSimulatedActivities(JsonArray simulatedActivitiesArray, Instant simulationStart)
throws InvalidJsonException
{
Expand Down
Loading

0 comments on commit d2096f6

Please sign in to comment.