diff --git a/deployment/hasura/metadata/databases/tables/merlin/dataset/span.yaml b/deployment/hasura/metadata/databases/tables/merlin/dataset/span.yaml index 7d286e08c8..95bd612db3 100644 --- a/deployment/hasura/metadata/databases/tables/merlin/dataset/span.yaml +++ b/deployment/hasura/metadata/databases/tables/merlin/dataset/span.yaml @@ -9,7 +9,7 @@ object_relationships: manual_configuration: column_mapping: dataset_id: dataset_id - parent_id: id + parent_id: span_id insertion_order: null remote_table: name: span @@ -20,7 +20,7 @@ array_relationships: manual_configuration: column_mapping: dataset_id: dataset_id - id: parent_id + span_id: parent_id insertion_order: null remote_table: name: span diff --git a/deployment/hasura/metadata/databases/tables/sequencing/activity_instance_commands.yaml b/deployment/hasura/metadata/databases/tables/sequencing/activity_instance_commands.yaml index 01f9d33795..4f393e17dc 100644 --- a/deployment/hasura/metadata/databases/tables/sequencing/activity_instance_commands.yaml +++ b/deployment/hasura/metadata/databases/tables/sequencing/activity_instance_commands.yaml @@ -11,7 +11,7 @@ object_relationships: using: manual_configuration: column_mapping: - activity_instance_id: id + activity_instance_id: span_id remote_table: name: span schema: merlin diff --git a/deployment/hasura/migrations/Aerie/8_span_event_linkage/down.sql b/deployment/hasura/migrations/Aerie/8_span_event_linkage/down.sql new file mode 100644 index 0000000000..372f81022e --- /dev/null +++ b/deployment/hasura/migrations/Aerie/8_span_event_linkage/down.sql @@ -0,0 +1,51 @@ +-- event table revert +alter table merlin.event + drop column span_id; + +-- span table revert +alter table merlin.span + alter column span_id add generated by default as identity; + +select setval(pg_get_serial_sequence('merlin.span', 'span_id'), coalesce(max(span_id),0) + 1, false) FROM merlin.span; + +with conflicts as ( + with subTable as ( + select p.span_id, + p.dataset_id, + ROW_NUMBER() over(partition by p.span_id order by p.dataset_id) as rk + from merlin.span p) + select s.* + from subTable s + where s.rk > 1) +update merlin.span s + set span_id = nextval(pg_get_serial_sequence('merlin.span', 'span_id')) + from conflicts c + where (c.span_id, c.dataset_id) = (s.span_id, s.dataset_id); + +alter table merlin.span + rename column span_id to id; + +alter table merlin.span + alter column id set generated always; + +drop view merlin.simulated_activity; +create view merlin.simulated_activity as +( + select span.id as id, + sd.id as simulation_dataset_id, + span.parent_id as parent_id, + span.start_offset as start_offset, + span.duration as duration, + span.attributes as attributes, + span.type as activity_type_name, + (span.attributes#>>'{directiveId}')::integer as directive_id, + sd.simulation_start_time + span.start_offset as start_time, + sd.simulation_start_time + span.start_offset + span.duration as end_time + from merlin.span span + join merlin.dataset d on span.dataset_id = d.id + join merlin.simulation_dataset sd on d.id = sd.dataset_id + join merlin.simulation s on s.id = sd.simulation_id +); + +call migrations.mark_migration_rolled_back('8') + diff --git a/deployment/hasura/migrations/Aerie/8_span_event_linkage/up.sql b/deployment/hasura/migrations/Aerie/8_span_event_linkage/up.sql new file mode 100644 index 0000000000..f7609c2f29 --- /dev/null +++ b/deployment/hasura/migrations/Aerie/8_span_event_linkage/up.sql @@ -0,0 +1,31 @@ +-- Span Table update +alter table merlin.span + rename column id to span_id; + +alter table merlin.span + alter column span_id drop identity; + +drop view merlin.simulated_activity; +create view merlin.simulated_activity as +( +select span.span_id as id, + sd.id as simulation_dataset_id, + span.parent_id as parent_id, + span.start_offset as start_offset, + span.duration as duration, + span.attributes as attributes, + span.type as activity_type_name, + (span.attributes#>>'{directiveId}')::integer as directive_id, + sd.simulation_start_time + span.start_offset as start_time, + sd.simulation_start_time + span.start_offset + span.duration as end_time +from merlin.span span + join merlin.dataset d on span.dataset_id = d.id + join merlin.simulation_dataset sd on d.id = sd.dataset_id + join merlin.simulation s on s.id = sd.simulation_id + ); + +-- event table update +alter table merlin.event + add column span_id integer; + +call migrations.mark_migration_applied('8') diff --git a/deployment/postgres-init-db/sql/applied_migrations.sql b/deployment/postgres-init-db/sql/applied_migrations.sql index ce2b97f16e..064eafae07 100644 --- a/deployment/postgres-init-db/sql/applied_migrations.sql +++ b/deployment/postgres-init-db/sql/applied_migrations.sql @@ -10,3 +10,4 @@ call migrations.mark_migration_applied('4'); call migrations.mark_migration_applied('5'); call migrations.mark_migration_applied('6'); call migrations.mark_migration_applied('7'); +call migrations.mark_migration_applied('8'); diff --git a/deployment/postgres-init-db/sql/tables/merlin/dataset/event.sql b/deployment/postgres-init-db/sql/tables/merlin/dataset/event.sql index 25918edf25..23354f767b 100644 --- a/deployment/postgres-init-db/sql/tables/merlin/dataset/event.sql +++ b/deployment/postgres-init-db/sql/tables/merlin/dataset/event.sql @@ -6,6 +6,7 @@ create table merlin.event ( value jsonb, topic_index integer not null, + span_id integer, constraint event_natural_key primary key (dataset_id, real_time, transaction_index, causal_time) @@ -26,6 +27,8 @@ comment on column merlin.event.value is e'' 'The value of this event as a json blob'; comment on column merlin.event.topic_index is e'' 'The topic of this event'; +comment on column merlin.event.span_id is e'' + 'The span of this event'; create function merlin.event_integrity_function() returns trigger diff --git a/deployment/postgres-init-db/sql/tables/merlin/dataset/span.sql b/deployment/postgres-init-db/sql/tables/merlin/dataset/span.sql index 944c459643..be4ff29071 100644 --- a/deployment/postgres-init-db/sql/tables/merlin/dataset/span.sql +++ b/deployment/postgres-init-db/sql/tables/merlin/dataset/span.sql @@ -1,5 +1,5 @@ create table merlin.span ( - id integer generated always as identity, + span_id integer not null, dataset_id integer not null, parent_id integer null, @@ -9,8 +9,9 @@ create table merlin.span ( type text not null, attributes jsonb not null, + constraint span_synthetic_key - primary key (dataset_id, id) + primary key (dataset_id, span_id) ) partition by list (dataset_id); @@ -18,8 +19,8 @@ comment on table merlin.span is e'' 'A temporal window of interest. A span may be refined by its children, providing additional information over ' 'more specific windows.'; -comment on column merlin.span.id is e'' - 'The synthetic identifier for this span.'; +comment on column merlin.span.span_id is e'' + 'The id for this span.'; comment on column merlin.span.dataset_id is e'' 'The dataset this span is part of.'; comment on column merlin.span.parent_id is e'' diff --git a/deployment/postgres-init-db/sql/views/merlin/simulated_activity.sql b/deployment/postgres-init-db/sql/views/merlin/simulated_activity.sql index e268a2cd11..4ae080a713 100644 --- a/deployment/postgres-init-db/sql/views/merlin/simulated_activity.sql +++ b/deployment/postgres-init-db/sql/views/merlin/simulated_activity.sql @@ -1,6 +1,6 @@ create view merlin.simulated_activity as ( - select span.id as id, + select span.span_id as id, sd.id as simulation_dataset_id, span.parent_id as parent_id, span.start_offset as start_offset, diff --git a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/GQL.java b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/GQL.java index 0ab5fcccba..93e4909dfa 100644 --- a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/GQL.java +++ b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/GQL.java @@ -524,7 +524,7 @@ mutation CreateSimulationTemplate($simulationTemplate: simulation_template_inser INSERT_SPAN(""" mutation InsertSpan($span: span_insert_input!){ span: insert_span_one(object: $span) { - id + span_id } }"""), SCHEDULE(""" diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationResults.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationResults.java index 0c5fc342bc..426668496a 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationResults.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationResults.java @@ -1,5 +1,6 @@ package gov.nasa.jpl.aerie.merlin.driver; +import gov.nasa.jpl.aerie.merlin.driver.engine.EventRecord; import gov.nasa.jpl.aerie.merlin.driver.engine.ProfileSegment; import gov.nasa.jpl.aerie.merlin.driver.timeline.EventGraph; import gov.nasa.jpl.aerie.merlin.protocol.types.Duration; @@ -22,7 +23,7 @@ public final class SimulationResults { public final Map simulatedActivities; public final Map unfinishedActivities; public final List> topics; - public final Map>>> events; + public final Map>> events; public SimulationResults( final Map>>> realProfiles, @@ -32,7 +33,7 @@ public SimulationResults( final Instant startTime, final Duration duration, final List> topics, - final SortedMap>>> events) + final SortedMap>> events) { this.startTime = startTime; this.duration = duration; diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/EventRecord.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/EventRecord.java new file mode 100644 index 0000000000..910523f61f --- /dev/null +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/EventRecord.java @@ -0,0 +1,6 @@ +package gov.nasa.jpl.aerie.merlin.driver.engine; + +import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue; +import java.util.Optional; + +public record EventRecord(int topicId, Optional spanId, SerializedValue value) {} diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index 67488e7782..9b87ae83be 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -531,7 +532,7 @@ public record SimulationActivityExtract( Map simulatedActivities, Map unfinishedActivities){} - private static SpanInfo computeTaskInfo( + private static SpanInfo computeSpanInfo( final TemporalEventSource timeline, final Topic activityTopic, final Iterable> serializableTopics @@ -560,10 +561,44 @@ public static SimulationActivityExtract computeActivitySimulationResults( engine, startTime, elapsedTime, - computeTaskInfo(timeline, activityTopic, serializableTopics) + computeSpanInfo(timeline, activityTopic, serializableTopics) ); } + private static HashMap spanToActivityDirectiveId( + final SimulationEngine engine, + final SpanInfo spanInfo + ){ + final var activityDirectiveIds = new HashMap(); + engine.spans.forEach((span, state) -> { + if (!spanInfo.isActivity(span)) return; + if (spanInfo.isDirective(span)) activityDirectiveIds.put(span, spanInfo.getDirective(span)); + }); + return activityDirectiveIds; + } + + private static HashMap spanToSimulatedActivities( + final SimulationEngine engine, + final SpanInfo spanInfo + ){ + final var activityDirectiveIds = spanToActivityDirectiveId(engine, spanInfo); + final var spanToSimulatedActivityId = new HashMap(activityDirectiveIds.size()); + final var usedSimulatedActivityIds = new HashSet<>(); + for (final var entry : activityDirectiveIds.entrySet()) { + spanToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); + usedSimulatedActivityIds.add(entry.getValue().id()); + } + long counter = 1L; + for (final var span : engine.spans.keySet()) { + if (!spanInfo.isActivity(span)) continue; + if (spanToSimulatedActivityId.containsKey(span)) continue; + + while (usedSimulatedActivityIds.contains(counter)) counter++; + spanToSimulatedActivityId.put(span, new SimulatedActivityId(counter++)); + } + return spanToSimulatedActivityId; + } + /** * Computes only activity-related results when resources are not needed */ @@ -575,12 +610,10 @@ public static SimulationActivityExtract computeActivitySimulationResults( ){ // Identify the nearest ancestor *activity* (excluding intermediate anonymous tasks). final var activityParents = new HashMap(); - final var activityDirectiveIds = new HashMap(); + final var activityDirectiveIds = spanToActivityDirectiveId(engine, spanInfo); engine.spans.forEach((span, state) -> { if (!spanInfo.isActivity(span)) return; - if (spanInfo.isDirective(span)) activityDirectiveIds.put(span, spanInfo.getDirective(span)); - var parent = state.parent(); while (parent.isPresent() && !spanInfo.isActivity(parent.get())) { parent = engine.spans.get(parent.get()).parent(); @@ -597,20 +630,7 @@ public static SimulationActivityExtract computeActivitySimulationResults( }); // Give every task corresponding to a child activity an ID that doesn't conflict with any root activity. - final var spanToSimulatedActivityId = new HashMap(activityDirectiveIds.size()); - final var usedSimulatedActivityIds = new HashSet<>(); - for (final var entry : activityDirectiveIds.entrySet()) { - spanToSimulatedActivityId.put(entry.getKey(), new SimulatedActivityId(entry.getValue().id())); - usedSimulatedActivityIds.add(entry.getValue().id()); - } - long counter = 1L; - for (final var span : engine.spans.keySet()) { - if (!spanInfo.isActivity(span)) continue; - if (spanToSimulatedActivityId.containsKey(span)) continue; - - while (usedSimulatedActivityIds.contains(counter)) counter++; - spanToSimulatedActivityId.put(span, new SimulatedActivityId(counter++)); - } + final var spanToSimulatedActivityId = spanToSimulatedActivities(engine, spanInfo); final var simulatedActivities = new HashMap(); final var unfinishedActivities = new HashMap(); @@ -687,7 +707,7 @@ public static SimulationResults computeResults( final Set resourceNames ) { // Collect per-task information from the event graph. - final var taskInfo = computeTaskInfo(timeline, activityTopic, serializableTopics); + final var spanInfo = computeSpanInfo(timeline, activityTopic, serializableTopics); // Extract profiles for every resource. final var realProfiles = new HashMap>>>(); @@ -719,7 +739,7 @@ public static SimulationResults computeResults( } } - final var activityResults = computeActivitySimulationResults(engine, startTime, elapsedTime, taskInfo); + final var activityResults = computeActivitySimulationResults(engine, startTime, elapsedTime, spanInfo); final List> topics = new ArrayList<>(); final var serializableTopicToId = new HashMap, Integer>(); @@ -728,7 +748,8 @@ public static SimulationResults computeResults( topics.add(Triple.of(topics.size(), serializableTopic.name(), serializableTopic.outputType().getSchema())); } - final var serializedTimeline = new TreeMap>>>(); + final var spanToActivities = spanToSimulatedActivities(engine,spanInfo); + final var serializedTimeline = new TreeMap>>(); var time = Duration.ZERO; for (var point : timeline.points()) { if (point instanceof TemporalEventSource.TimePoint.Delta delta) { @@ -736,11 +757,34 @@ public static SimulationResults computeResults( } else if (point instanceof TemporalEventSource.TimePoint.Commit commit) { final var serializedEventGraph = commit.events().substitute( event -> { - EventGraph> output = EventGraph.empty(); + // TODO can we do this more efficiently? + EventGraph output = EventGraph.empty(); for (final var serializableTopic : serializableTopics) { Optional serializedEvent = trySerializeEvent(event, serializableTopic); if (serializedEvent.isPresent()) { - output = EventGraph.concurrently(output, EventGraph.atom(Pair.of(serializableTopicToId.get(serializableTopic), serializedEvent.get()))); + // If the event's `provenance` has no simulated activity id, search its ancestors to find the nearest + // simulated activity id, if one exists + if (!spanToActivities.containsKey(event.provenance())) { + var spanId = Optional.of(event.provenance()); + + while (true) { + if (spanToActivities.containsKey(spanId.get())) { + spanToActivities.put(event.provenance(), spanToActivities.get(spanId.get())); + break; + } + spanId = engine.getSpan(spanId.get()).parent(); + if (spanId.isEmpty()) { + break; + } + } + } + var activitySpanID = Optional.ofNullable(spanToActivities.get(event.provenance()).id()); + output = EventGraph.concurrently( + output, + EventGraph.atom( + new EventRecord(serializableTopicToId.get(serializableTopic), + activitySpanID, + serializedEvent.get()))); } } return output; diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetSimulationEventsAction.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetSimulationEventsAction.java index 8ddb50b30c..fb85f50a94 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetSimulationEventsAction.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetSimulationEventsAction.java @@ -1,5 +1,6 @@ package gov.nasa.jpl.aerie.merlin.server.remotes.postgres; +import gov.nasa.jpl.aerie.merlin.driver.engine.EventRecord; import gov.nasa.jpl.aerie.merlin.driver.timeline.EventGraph; import gov.nasa.jpl.aerie.merlin.protocol.types.Duration; import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue; @@ -16,6 +17,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.SortedMap; import java.util.TreeMap; @@ -29,7 +31,8 @@ e.transaction_index, e.causal_time, e.topic_index, - e.value + e.value, + e.span_id from merlin.event as e where e.dataset_id = ? @@ -41,14 +44,14 @@ public GetSimulationEventsAction(final Connection connection) throws SQLExceptio this.statement = connection.prepareStatement(this.sql); } - public SortedMap>>> get(final long datasetId) throws SQLException + public SortedMap>> get(final long datasetId) throws SQLException { this.statement.setLong(1, datasetId); final var resultSet = this.statement.executeQuery(); final var transactionsByTimePoint = readResultSet(resultSet); - final var eventPoints = new TreeMap>>>(); + final var eventPoints = new TreeMap>>(); transactionsByTimePoint.forEach((time, transactions) -> { transactions.forEach(($, value) -> { try { @@ -63,24 +66,27 @@ public SortedMap>>> get return eventPoints; } - private static Map>>>> + private static Map>>> readResultSet(final ResultSet resultSet) throws SQLException { - final var nodesByTimePoint = new HashMap>>>>(); + final var nodesByTimePoint = new HashMap>>>(); while (resultSet.next()) { final var timePoint = parseOffset(resultSet, 1); final var transactionIndex = resultSet.getInt(2); final var causalTime = resultSet.getString(3); final var topicIndex = resultSet.getInt(4); final var serializedValue = parseSerializedValue(resultSet.getString(5)); + final Optional spanId = resultSet.getObject(6) == null ? Optional.empty() : Optional.of( + Long.valueOf(resultSet.getLong(6))); nodesByTimePoint .computeIfAbsent(timePoint, x -> new TreeMap<>()) .computeIfAbsent(transactionIndex, x -> new ArrayList<>()) .add(Pair.of( - causalTime, - Pair.of( + causalTime, + new EventRecord( topicIndex, + spanId, serializedValue ) ) diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetSpanRecords.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetSpanRecords.java index 303a4f1e98..6b31e9821e 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetSpanRecords.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetSpanRecords.java @@ -21,7 +21,7 @@ /*package-local*/ final class GetSpanRecords implements AutoCloseable { private final @Language("SQL") String sql = """ select - a.id, + a.span_id, a.type, a.parent_id, a.start_offset, diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/InsertSimulationEventsAction.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/InsertSimulationEventsAction.java index fe61567ef1..7b0f9c7a04 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/InsertSimulationEventsAction.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/InsertSimulationEventsAction.java @@ -1,8 +1,8 @@ package gov.nasa.jpl.aerie.merlin.server.remotes.postgres; +import gov.nasa.jpl.aerie.merlin.driver.engine.EventRecord; import gov.nasa.jpl.aerie.merlin.driver.timeline.EventGraph; import gov.nasa.jpl.aerie.merlin.protocol.types.Duration; -import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue; import gov.nasa.jpl.aerie.merlin.server.models.Timestamp; import org.apache.commons.lang3.tuple.Pair; import org.intellij.lang.annotations.Language; @@ -19,8 +19,8 @@ /*package-local*/ final class InsertSimulationEventsAction implements AutoCloseable { @Language("SQL") private static final String sql = """ - insert into merlin.event (dataset_id, real_time, transaction_index, causal_time, topic_index, value) - values (?, ?::timestamptz - ?::timestamptz, ?, ?, ?, ?::jsonb) + insert into merlin.event (dataset_id, real_time, transaction_index, causal_time, topic_index, value, span_id) + values (?, ?::timestamptz - ?::timestamptz, ?, ?, ?, ?::jsonb,?) """; private final PreparedStatement statement; @@ -31,7 +31,7 @@ public InsertSimulationEventsAction(final Connection connection) throws SQLExcep public void apply( final long datasetId, - final Map>>> eventPoints, + final Map>> eventPoints, final Timestamp simulationStart ) throws SQLException { for (final var eventPoint : eventPoints.entrySet()) { @@ -51,21 +51,21 @@ private static void batchInsertEventGraph( final Duration duration, final int transactionIndex, final Timestamp simulationStart, - final List>> flattenedEventGraph, + final List> flattenedEventGraph, final PreparedStatement statement ) throws SQLException { - for (final Pair> entry : flattenedEventGraph) { + for (final Pair entry : flattenedEventGraph) { final var causalTime = entry.getLeft(); - final Pair event = entry.getRight(); + final EventRecord event = entry.getRight(); statement.setLong(1, datasetId); setTimestamp(statement, 2, simulationStart.plusMicros(duration.in(MICROSECONDS))); setTimestamp(statement, 3, simulationStart); statement.setInt(4, transactionIndex); statement.setString(5, causalTime); - statement.setInt(6, event.getLeft()); - statement.setString(7, serializedValueP.unparse(event.getRight()).toString()); - + statement.setInt(6, event.topicId()); + statement.setString(7, serializedValueP.unparse(event.value()).toString()); + statement.setLong(8, event.spanId().isPresent() ? event.spanId().get() : null); statement.addBatch(); } } diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostSpansAction.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostSpansAction.java index 269d24b425..11ec124c47 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostSpansAction.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostSpansAction.java @@ -20,8 +20,8 @@ /*package-local*/ final class PostSpansAction implements AutoCloseable { private static final @Language("SQL") String sql = """ - insert into merlin.span (dataset_id, start_offset, duration, type, attributes) - values (?, ?::timestamptz - ?::timestamptz, ?::timestamptz - ?::timestamptz, ?, ?::jsonb) + insert into merlin.span (span_id,dataset_id,parent_id, start_offset, duration, type, attributes) + values (?,?,?, ?::timestamptz - ?::timestamptz, ?::timestamptz - ?::timestamptz, ?, ?::jsonb) """; private final PreparedStatement statement; @@ -30,11 +30,12 @@ public PostSpansAction(final Connection connection) throws SQLException { this.statement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); } - public Map apply( + public void apply( final long datasetId, final Map spans, final Timestamp simulationStart ) throws SQLException { + final var ids = spans.keySet().stream().toList(); for (final var id : ids) { final var act = spans.get(id); @@ -45,32 +46,30 @@ public Map apply( return new Timestamp(actEnd); }); - statement.setLong(1, datasetId); - setTimestamp(statement, 2, startTimestamp); - setTimestamp(statement, 3, simulationStart); + statement.setLong(1, id); + statement.setLong(2, datasetId); + if (act.parentId().isPresent()){ + statement.setLong(3,act.parentId().get()); + }else{ + statement.setNull(3,Types.BIGINT); + } + setTimestamp(statement, 4, startTimestamp); + setTimestamp(statement, 5, simulationStart); if (endTimestamp.isPresent()) { - setTimestamp(statement, 4, endTimestamp.get()); + setTimestamp(statement, 6, endTimestamp.get()); } else { - statement.setNull(4, Types.TIMESTAMP_WITH_TIMEZONE); + statement.setNull(6, Types.TIMESTAMP_WITH_TIMEZONE); } - setTimestamp(statement, 5, startTimestamp); - statement.setString(6, act.type()); - statement.setString(7, buildAttributes(act.attributes().directiveId(), act.attributes().arguments(), act.attributes().computedAttributes())); + setTimestamp(statement, 7, startTimestamp); + statement.setString(8, act.type()); + statement.setString(9, buildAttributes(act.attributes().directiveId(), act.attributes().arguments(), act.attributes().computedAttributes())); + statement.addBatch(); } statement.executeBatch(); - final var resultSet = statement.getGeneratedKeys(); - - final var simIdToPostgresId = new HashMap(ids.size()); - for (final var id : ids) { - if (!resultSet.next()) throw new Error("Not enough generated IDs returned from batch insertion."); - simIdToPostgresId.put(id, resultSet.getLong(1)); - } - - return simIdToPostgresId; } private String buildAttributes(final Optional directiveId, final Map arguments, final Optional returnValue) { diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostgresResultsCellRepository.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostgresResultsCellRepository.java index 1ca3b2b311..312200f0ed 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostgresResultsCellRepository.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostgresResultsCellRepository.java @@ -7,6 +7,7 @@ import gov.nasa.jpl.aerie.merlin.driver.SimulationFailure; import gov.nasa.jpl.aerie.merlin.driver.SimulationResults; import gov.nasa.jpl.aerie.merlin.driver.UnfinishedActivity; +import gov.nasa.jpl.aerie.merlin.driver.engine.EventRecord; import gov.nasa.jpl.aerie.merlin.driver.timeline.EventGraph; import gov.nasa.jpl.aerie.merlin.protocol.types.Duration; import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue; @@ -30,11 +31,14 @@ import java.sql.Connection; import java.sql.SQLException; import java.time.Instant; +import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.SortedMap; +import java.util.function.Function; import java.util.stream.Collectors; public final class PostgresResultsCellRepository implements ResultsCellRepository { @@ -298,7 +302,7 @@ private static List> getSimulationTopics(Co } } - private static SortedMap>>> + private static SortedMap>> getSimulationEvents( final Connection connection, final long datasetId @@ -388,7 +392,7 @@ private static void insertSimulationTopics( private static void insertSimulationEvents( Connection connection, long datasetId, - Map>>> events, + Map>> events, Timestamp simulationStart) throws SQLException { try ( @@ -407,7 +411,6 @@ private static void postActivities( ) throws SQLException { try ( final var postActivitiesAction = new PostSpansAction(connection); - final var updateSimulatedActivityParentsAction = new UpdateSimulatedActivityParentsAction(connection) ) { final var simulatedActivityRecords = simulatedActivities.entrySet().stream() .collect(Collectors.toMap( @@ -420,16 +423,46 @@ private static void postActivities( e -> unfinishedActivityToRecord(e.getValue()))); allActivityRecords.putAll(simulatedActivityRecords); - final var simIdToPgId = postActivitiesAction.apply( + // Sorts the map by SpanRecord parent ID to ensure foreign key constraints are met. + // Entries with null parent IDs are placed first to avoid foreign key violations + // for the "span_has_parent_span" constraint. + final var sortedAllActivityRecords = topoSort(allActivityRecords, $ -> $.parentId().stream().toList()); + + postActivitiesAction.apply( datasetId, - allActivityRecords, + sortedAllActivityRecords, simulationStart); + } + } - updateSimulatedActivityParentsAction.apply( - datasetId, - allActivityRecords, - simIdToPgId); + /** + * Take an unsorted map and produce a sorted LinkedHashMap where nodes always + * come after their dependencies. + * @param nodes a map from keys to values - the keys are used to define dependencies + * @param dependencies - for a given value, what are the keys of its dependencies? + * @return a sorted LinkedHashMap where nodes always come after their dependencies + * @throws IllegalArgumentException if a cycle is found + */ + private static LinkedHashMap topoSort(Map nodes, Function> dependencies) { + final var worklist = new ArrayList<>(nodes.entrySet()); + final var sortedMap = new LinkedHashMap(); + while (!worklist.isEmpty()) { + var madeProgress = false; + for (int i = worklist.size() - 1; i >= 0; i--) { + final var entry = worklist.get(i); + // A node is ready to be added to the output if all of its dependencies are already in the output + if (dependencies.apply(entry.getValue()).stream().allMatch(sortedMap::containsKey)) { + sortedMap.put(entry.getKey(), entry.getValue()); + worklist.remove(i); + madeProgress = true; + } + } + // If no nodes were added to the output in this round, there must be a cycle in the remaining nodes + if (!madeProgress) { + throw new IllegalArgumentException("Cycle detected in input to topoSort:" + worklist); + } } + return sortedMap; } private static SpanRecord simulatedActivityToRecord(final SimulatedActivity activity) { diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/UpdateSimulatedActivityParentsAction.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/UpdateSimulatedActivityParentsAction.java deleted file mode 100644 index ff806194b1..0000000000 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/UpdateSimulatedActivityParentsAction.java +++ /dev/null @@ -1,54 +0,0 @@ -package gov.nasa.jpl.aerie.merlin.server.remotes.postgres; - -import org.intellij.lang.annotations.Language; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Map; - -/*package-local*/ final class UpdateSimulatedActivityParentsAction implements AutoCloseable { - private final @Language("SQL") String sql = """ - update merlin.span - set parent_id = ? - where dataset_id = ? - and id = ? - """; - - private final PreparedStatement statement; - - public UpdateSimulatedActivityParentsAction(final Connection connection) throws SQLException { - this.statement = connection.prepareStatement(sql); - } - - public void apply( - final long datasetId, - final Map simulatedActivities, - final Map simIdToPgId - ) throws SQLException { - for (final var entry : simulatedActivities.entrySet()) { - final var activity = entry.getValue(); - final var id = entry.getKey(); - if (activity.parentId().isEmpty()) continue; - - this.statement.setLong(1, simIdToPgId.get(activity.parentId().get())); - this.statement.setLong(2, datasetId); - this.statement.setLong(3, simIdToPgId.get(id)); - - this.statement.addBatch(); - } - try { - final var results = this.statement.executeBatch(); - for (final var result : results) { - if (result != 1) throw new FailedUpdateException("merlin.span"); - } - } finally { - this.statement.clearBatch(); - } - } - - @Override - public void close() throws SQLException { - this.statement.close(); - } -} diff --git a/scheduler-server/src/main/java/gov/nasa/jpl/aerie/scheduler/server/services/GraphQLMerlinService.java b/scheduler-server/src/main/java/gov/nasa/jpl/aerie/scheduler/server/services/GraphQLMerlinService.java index b467cf4073..4c30ad8629 100644 --- a/scheduler-server/src/main/java/gov/nasa/jpl/aerie/scheduler/server/services/GraphQLMerlinService.java +++ b/scheduler-server/src/main/java/gov/nasa/jpl/aerie/scheduler/server/services/GraphQLMerlinService.java @@ -10,6 +10,7 @@ import gov.nasa.jpl.aerie.merlin.driver.SimulatedActivityId; import gov.nasa.jpl.aerie.merlin.driver.SimulationResults; import gov.nasa.jpl.aerie.merlin.driver.UnfinishedActivity; +import gov.nasa.jpl.aerie.merlin.driver.engine.EventRecord; import gov.nasa.jpl.aerie.merlin.driver.engine.ProfileSegment; import gov.nasa.jpl.aerie.merlin.driver.timeline.EventGraph; import gov.nasa.jpl.aerie.merlin.protocol.model.SchedulerModel; @@ -895,7 +896,7 @@ private Map getSpans(DatasetId datasetI parent_id type start_offset - id + span_id } } """.formatted(datasetId.id()); @@ -1489,7 +1490,7 @@ private void insertSimulationTopics( private void insertSimulationEvents( DatasetId datasetId, - Map>>> eventPoints) throws MerlinServiceException, IOException + Map>> eventPoints) throws MerlinServiceException, IOException { final var req = """ mutation($events:[event_insert_input!]!){ @@ -1518,20 +1519,21 @@ private JsonArrayBuilder batchInsertEventGraph( final long datasetId, final Duration duration, final int transactionIndex, - final List>> flattenedEventGraph + final List> flattenedEventGraph ) { final var events = Json.createArrayBuilder(); - for (final Pair> entry : flattenedEventGraph) { + for (final Pair entry : flattenedEventGraph) { final var causalTime = entry.getLeft(); - final Pair event = entry.getRight(); + final EventRecord event = entry.getRight(); events.add( Json.createObjectBuilder() .add("dataset_id",datasetId) .add("real_time", graphQLIntervalFromDuration(duration).toString()) .add("transaction_index", transactionIndex) .add("causal_time", causalTime) - .add("topic_index", event.getLeft()) - .add("value", serializedValueP.unparse(event.getRight())) + .add("topic_index", event.topicId()) + .add("value", serializedValueP.unparse(event.value())) + .add("span_id", event.spanId().get()) .build() ); } @@ -1555,20 +1557,18 @@ private void postActivities( e -> e.getKey().id(), e -> unfinishedActivityToRecord(e.getValue(), simulationActivityDirectiveIdToMerlinActivityDirectiveId))); allActivityRecords.putAll(simulatedActivityRecords); - final var simIdToPgId = postSpans( + postSpans( datasetId, allActivityRecords, simulationStart); updateSimulatedActivityParentsAction( datasetId, - simulatedActivityRecords, - simIdToPgId); + simulatedActivityRecords); } public void updateSimulatedActivityParentsAction( final DatasetId datasetId, - final Map simulatedActivities, - final Map simIdToPgId + final Map simulatedActivities ) throws MerlinServiceException, IOException { final var req = """ @@ -1587,8 +1587,8 @@ public void updateSimulatedActivityParentsAction( updates.add(Json.createObjectBuilder() .add("where", Json.createObjectBuilder() .add("dataset_id",Json.createObjectBuilder().add("_eq", datasetId.id()).build()) - .add("id", Json.createObjectBuilder().add("_eq", simIdToPgId.get(id)).build())) - .add("_set", Json.createObjectBuilder().add("parent_id", simIdToPgId.get(activity.parentId().get()))) + .add("span_id", Json.createObjectBuilder().add("_eq", id).build())) + .add("_set", Json.createObjectBuilder().add("parent_id", activity.parentId().get())) .build()); updateCounter++; } @@ -1640,7 +1640,7 @@ private static SpanRecord unfinishedActivityToRecord(final UnfinishedActivity ac Optional.empty())); } - public HashMap postSpans(final DatasetId datasetId, + public void postSpans(final DatasetId datasetId, final Map spans, final Instant simulationStart ) throws MerlinServiceException, IOException @@ -1649,7 +1649,7 @@ public HashMap postSpans(final DatasetId datasetId, mutation($spans:[span_insert_input!]!) { insert_span(objects: $spans) { returning { - id + span_id } } } @@ -1658,8 +1658,10 @@ public HashMap postSpans(final DatasetId datasetId, final var ids = spans.keySet().stream().toList(); for (final var id : ids) { final var act = spans.get(id); + final var startTime = graphQLIntervalFromDuration(simulationStart, act.start); spansJson.add(Json.createObjectBuilder() + .add("span_id",id) .add("dataset_id", datasetId.id()) .add("start_offset", startTime.toString()) .add("duration", act.duration.isPresent() ? graphQLIntervalFromDuration(act.duration().get()).toString() : "null") @@ -1670,14 +1672,7 @@ public HashMap postSpans(final DatasetId datasetId, final var arguments = Json.createObjectBuilder() .add("spans", spansJson) .build(); - final JsonObject response; - response = postRequest(req, arguments).get(); - final var returnedIds = response.getJsonObject("data").getJsonObject("insert_span").getJsonArray("returning"); - final var simIdToPostgresId = new HashMap(ids.size()); - for (int i = 0; i< ids.size(); ++i) { - simIdToPostgresId.put(ids.get(i), (long) returnedIds.get(i).asJsonObject().getInt("id")); - } - return simIdToPostgresId; + postRequest(req, arguments).get(); } private JsonValue buildAttributes(final Optional directiveId, final Map arguments, final Optional returnValue) { diff --git a/sequencing-server/src/lib/batchLoaders/simulatedActivityBatchLoader.ts b/sequencing-server/src/lib/batchLoaders/simulatedActivityBatchLoader.ts index 0e0462ba3f..5fe0dbb508 100644 --- a/sequencing-server/src/lib/batchLoaders/simulatedActivityBatchLoader.ts +++ b/sequencing-server/src/lib/batchLoaders/simulatedActivityBatchLoader.ts @@ -42,7 +42,7 @@ export const simulatedActivitiesBatchLoader: BatchLoader< simulation_start_time dataset { spans { - id + span_id attributes start_offset duration @@ -70,7 +70,7 @@ export const simulatedActivitiesBatchLoader: BatchLoader< const simulatedActivities: GraphQLSimulatedActivityInstance[] = spans.map(span => { return { - id: span.id, + id: span.span_id, simulation_dataset_id: simulation_dataset.id, plan_id: simulation_dataset.simulation.plan.id, model_id: simulation_dataset.simulation.plan.model_id, @@ -131,8 +131,8 @@ export const simulatedActivityInstanceBySimulatedActivityIdBatchLoader: BatchLoa } } dataset { - spans: spans(where: { id: { _eq: $simulatedActivityId } }) { - id + spans: spans(where: { span_id: { _eq: $simulatedActivityId } }) { + span_id attributes start_offset duration @@ -152,7 +152,7 @@ export const simulatedActivityInstanceBySimulatedActivityIdBatchLoader: BatchLoa return Promise.all( keys.map(async ({ simulationDatasetId, simulatedActivityId }) => { const simulation_dataset = result.find(res => - res.data?.simulation_dataset?.dataset?.spans?.some(span => span.id === simulatedActivityId), + res.data?.simulation_dataset?.dataset?.spans?.some(span => span.span_id === simulatedActivityId), )?.data.simulation_dataset; if (simulation_dataset === undefined) { return new ErrorWithStatusCode(`No simulation_dataset with id: ${simulationDatasetId}`, 404); @@ -175,7 +175,7 @@ export const simulatedActivityInstanceBySimulatedActivityIdBatchLoader: BatchLoa const span = spans[0]; const simulatedActivity: GraphQLSimulatedActivityInstance = { - id: span.id, + id: span.span_id, simulation_dataset_id: simulation_dataset.id, plan_id: simulation_dataset.simulation.plan.id, model_id: simulation_dataset.simulation.plan.model_id, @@ -236,7 +236,7 @@ export interface GQLSpan< ActivityArguments extends Record = Record, ActivityComputedAttributes extends Record = Record, > { - id: number; + span_id: number; attributes: GraphQLSimulatedActivityAttributes; start_offset: string; duration: string;