diff --git a/deployment/Environment.md b/deployment/Environment.md index aa170d81d6..e9018dd9ac 100644 --- a/deployment/Environment.md +++ b/deployment/Environment.md @@ -16,17 +16,19 @@ See the [environment variables document](https://github.com/NASA-AMMOS/aerie-gat ## Aerie Merlin -| Name | Description | Type | Default | -| -------------------- | --------------------------------------------------------------------------------------------------------------------------- | -------- | ------------------------------- | -| `JAVA_OPTS` | Configuration for Merlin's logging level and output file | `string` | log level: warn. output: stderr | -| `MERLIN_PORT` | Port number for the Merlin server | `number` | 27183 | -| `MERLIN_LOCAL_STORE` | Local storage for Merlin in the container | `string` | /usr/src/app/merlin_file_store | -| `MERLIN_DB_SERVER` | The DB instance that Merlin will connect with | `string` | | -| `MERLIN_DB_PORT` | The DB instance port number that Merlin will connect with | `number` | 5432 | -| `MERLIN_DB_USER` | Username of the DB instance | `string` | | -| `MERLIN_DB_PASSWORD` | Password of the DB instance | `string` | | -| `MERLIN_DB` | The DB for Merlin. | `string` | aerie_merlin | -| `UNTRUE_PLAN_START` | Temporary solution to provide plan start time to models, should be set to a time that models will not fail to initialize on | `string` | | +| Name | Description | Type | Default | +| ------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | -------- | ------------------------------- | +| `JAVA_OPTS` | Configuration for Merlin's logging level and output file | `string` | log level: warn. output: stderr | +| `MERLIN_PORT` | Port number for the Merlin server | `number` | 27183 | +| `MERLIN_LOCAL_STORE` | Local storage for Merlin in the container | `string` | /usr/src/app/merlin_file_store | +| `MERLIN_DB_SERVER` | The DB instance that Merlin will connect with | `string` | | +| `MERLIN_DB_PORT` | The DB instance port number that Merlin will connect with | `number` | 5432 | +| `MERLIN_DB_USER` | Username of the DB instance | `string` | | +| `MERLIN_DB_PASSWORD` | Password of the DB instance | `string` | | +| `MERLIN_DB` | The DB for Merlin. | `string` | aerie_merlin | +| `UNTRUE_PLAN_START` | Temporary solution to provide plan start time to models, should be set to a time that models will not fail to initialize on | `string` | | +| `ENABLE_CONTINUOUS_VALIDATION_THREAD` | Flag to enable a worker thread that continously computes and caches activity directive validation results | `boolean`| true | +| `VALIDATION_THREAD_POLLING_PERIOD` | Number of milliseconds the above worker thread should wait before querying the database for new, unvalidated directives | `string` | 500 | ## Aerie Merlin Worker diff --git a/deployment/hasura/migrations/AerieMerlin/32_automatic_activity_validations/down.sql b/deployment/hasura/migrations/AerieMerlin/32_automatic_activity_validations/down.sql new file mode 100644 index 0000000000..bf86e3c3bb --- /dev/null +++ b/deployment/hasura/migrations/AerieMerlin/32_automatic_activity_validations/down.sql @@ -0,0 +1,22 @@ +drop trigger validation_entry_on_insert on activity_directive; + +drop function activity_directive_validation_entry; + +create or replace function activity_directive_set_arguments_updated_at() + returns trigger + security definer + language plpgsql as $$begin + call plan_locked_exception(new.plan_id); + new.last_modified_arguments_at = now(); + return new; +end$$; + +alter table activity_directive_validations + drop column last_modified_arguments_at, + drop column status, + add column last_modified_at timestamptz not null default now(); + +comment on column activity_directive_validations.last_modified_at is e'' + 'The time at which these argument validations were last modified.'; + +call migrations.mark_migration_rolled_back('32'); diff --git a/deployment/hasura/migrations/AerieMerlin/32_automatic_activity_validations/up.sql b/deployment/hasura/migrations/AerieMerlin/32_automatic_activity_validations/up.sql new file mode 100644 index 0000000000..ecae5b91f5 --- /dev/null +++ b/deployment/hasura/migrations/AerieMerlin/32_automatic_activity_validations/up.sql @@ -0,0 +1,43 @@ +alter table activity_directive_validations + drop column last_modified_at, + add column status text not null default 'pending', + add column last_modified_arguments_at timestamptz not null; + +comment on column activity_directive_validations.last_modified_arguments_at is e'' + 'The time at which these argument validations were last modified.'; + +-- reuse exising insert empty validations row on argument update +create or replace function activity_directive_set_arguments_updated_at() + returns trigger + security definer + language plpgsql as +$$ begin + call plan_locked_exception(new.plan_id); + new.last_modified_arguments_at = now(); + + -- request new validation + update activity_directive_validations + set last_modified_arguments_at = new.last_modified_arguments_at, + status = 'pending' + where (directive_id, plan_id) = (new.id, new.plan_id); + + return new; +end $$; + +create function activity_directive_validation_entry() + returns trigger + security definer + language plpgsql as +$$ begin + insert into activity_directive_validations + (directive_id, plan_id, last_modified_arguments_at) + values (new.id, new.plan_id, new.last_modified_arguments_at); + return new; +end $$; + +create trigger validation_entry_on_insert + after insert on activity_directive + for each row +execute function activity_directive_validation_entry(); + +call migrations.mark_migration_applied('32'); diff --git a/docker-compose.yml b/docker-compose.yml index 5fb4ab55ec..6d5a43db75 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,6 +44,7 @@ services: -Dorg.slf4j.simpleLogger.log.com.zaxxer.hikari=INFO -Dorg.slf4j.simpleLogger.logFile=System.err UNTRUE_PLAN_START: "2000-01-01T11:58:55.816Z" + ENABLE_CONTINUOUS_VALIDATION_THREAD: true image: aerie_merlin ports: ["27183:27183", "5005:5005"] restart: always diff --git a/e2e-tests/docker-compose-test.yml b/e2e-tests/docker-compose-test.yml index 9ce9338ce5..7b1e7bc8e7 100644 --- a/e2e-tests/docker-compose-test.yml +++ b/e2e-tests/docker-compose-test.yml @@ -46,6 +46,7 @@ services: -Dorg.slf4j.simpleLogger.log.com.zaxxer.hikari=INFO -Dorg.slf4j.simpleLogger.logFile=System.err UNTRUE_PLAN_START: '2000-01-01T11:58:55.816Z' + ENABLE_CONTINUOUS_VALIDATION_THREAD: true image: aerie_merlin ports: ['27183:27183', '5005:5005'] restart: always diff --git a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/AutomaticValidationTests.java b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/AutomaticValidationTests.java new file mode 100644 index 0000000000..d59018b489 --- /dev/null +++ b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/AutomaticValidationTests.java @@ -0,0 +1,164 @@ +package gov.nasa.jpl.aerie.e2e; + +import com.microsoft.playwright.Playwright; +import gov.nasa.jpl.aerie.e2e.types.ActivityValidation; +import gov.nasa.jpl.aerie.e2e.utils.GatewayRequests; +import gov.nasa.jpl.aerie.e2e.utils.HasuraRequests; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import javax.json.Json; +import java.io.IOException; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class AutomaticValidationTests { + // Requests + private Playwright playwright; + private HasuraRequests hasura; + + // Per-Test Data + private int modelId; + private int planId; + + @BeforeAll + void beforeAll() { + // Setup Requests + playwright = Playwright.create(); + hasura = new HasuraRequests(playwright); + } + + @AfterAll + void afterAll() { + // Cleanup Requests + hasura.close(); + playwright.close(); + } + + @BeforeEach + void beforeEach() throws IOException, InterruptedException { + // Insert the Mission Model + try (final var gateway = new GatewayRequests(playwright)) { + modelId = hasura.createMissionModel( + gateway.uploadJarFile(), + "Banananation (e2e tests)", + "aerie_e2e_tests", + "Automatic Validation Tests"); + } + // Insert the Plan + planId = hasura.createPlan( + modelId, + "Test Plan - Automatic Validation Tests", + "1212h", + "2021-01-01T00:00:00Z"); + } + + @AfterEach + void afterEach() throws IOException { + // Remove Model, Plan, and Constraint + hasura.deletePlan(planId); + hasura.deleteMissionModel(modelId); + } + + @Test + void validationSuccess() throws IOException, InterruptedException { + final var activityId = hasura.insertActivity( + planId, + "BiteBanana", + "1h", + Json.createObjectBuilder().add("biteSize", 1).build()); + Thread.sleep(1000); // TODO consider a while loop here + final var activityValidations = hasura.getActivityValidations(planId); + final ActivityValidation activityValidation = activityValidations.get((long) activityId); + assertEquals(new ActivityValidation.Success(), activityValidation); + } + + @Test + void noSuchActivityType() throws IOException, InterruptedException { + final var activityId = hasura.insertActivity( + planId, + "NopeBanana", + "1h", + Json.createObjectBuilder().build()); + Thread.sleep(1000); // TODO consider a while loop here + final var activityValidations = hasura.getActivityValidations(planId); + final ActivityValidation activityValidation = activityValidations.get((long) activityId); + assertEquals(new ActivityValidation.NoSuchActivityTypeFailure("no such activity type", "NopeBanana"), activityValidation); + } + + @Test + void validationNotice() throws IOException, InterruptedException { + final var activityId = hasura.insertActivity( + planId, + "BiteBanana", + "1h", + Json.createObjectBuilder().add("biteSize", 0).build()); + Thread.sleep(1000); // TODO consider a while loop here + final var activityValidations = hasura.getActivityValidations(planId); + final ActivityValidation activityValidation = activityValidations.get((long) activityId); + assertEquals(new ActivityValidation.ValidationFailure(List.of( + new ActivityValidation.ValidationNotice(List.of("biteSize"), "bite size must be positive")) + ), activityValidation); + } + + @Test + void instantiationError() throws IOException, InterruptedException { + final var activityId = hasura.insertActivity( + planId, + "BakeBananaBread", + "1h", + Json.createObjectBuilder() + .add("dontNeed", 0) + .add("temperature", "this is a string") + .add("tbSugar", 1) + .build()); + Thread.sleep(1000); // TODO consider a while loop here + final var activityValidations = hasura.getActivityValidations(planId); + final ActivityValidation activityValidation = activityValidations.get((long) activityId); + assertEquals(new ActivityValidation.InstantiationFailure( + List.of("dontNeed"), + List.of("glutenFree"), + List.of(new ActivityValidation.UnconstructableArgument( + "temperature", + "Expected real number, got StringValue[value=this is a string]")) + ), activityValidation); + } + + @Test + void exceptionDuringValidationHandled() throws IOException, InterruptedException { + final var exceptionActivityId = hasura.insertActivity( + planId, + "ExceptionActivity", + "1h", + Json.createObjectBuilder().add("throwException", true).build()); + + // sleep to make sure exception activity is picked up + Thread.sleep(1000); // TODO consider a while loop here + + final var biteActivityId = hasura.insertActivity( + planId, + "BiteBanana", + "1h", + Json.createObjectBuilder().add("biteSize", 1).build()); + + Thread.sleep(1000); // TODO consider a while loop here + + final var activityValidations = hasura.getActivityValidations(planId); + + final ActivityValidation exceptionValidations = activityValidations.get((long) exceptionActivityId); + final ActivityValidation biteValidations = activityValidations.get((long) biteActivityId); + + // then make sure the exception was caught and serialized, and didn't crash the worker thread + assertEquals(new ActivityValidation.ValidationFailure(List.of( + new ActivityValidation.ValidationNotice(List.of("throwException"), "Throwing runtime exception during validation"))), + exceptionValidations); + // if the above is true, bite banana will have its validation still + assertEquals(new ActivityValidation.Success(), biteValidations); + } +} diff --git a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/MissionModelTests.java b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/MissionModelTests.java index 66a5edc4af..99fb616742 100644 --- a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/MissionModelTests.java +++ b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/MissionModelTests.java @@ -116,6 +116,7 @@ private ArrayList expectedActivityTypesBanananation() { new ValueSchemaStruct(Map.of( "duration", VALUE_SCHEMA_DURATION, "durationInSeconds", new ValueSchemaMeta(Map.of("unit", Json.createObjectBuilder(Map.of("value", "s")).build()), VALUE_SCHEMA_REAL))))); + activityTypes.add(new ActivityType("ExceptionActivity", Map.of("throwException", new Parameter(0, VALUE_SCHEMA_BOOLEAN)))); activityTypes.add(new ActivityType("grandchild", Map.of("counter", new Parameter(0, VALUE_SCHEMA_INT)))); activityTypes.add(new ActivityType("GrowBanana", Map.of( "quantity", new Parameter(0, VALUE_SCHEMA_INT), diff --git a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java index a351ea8277..d7590d1963 100644 --- a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java +++ b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/SchedulingTests.java @@ -324,7 +324,7 @@ void schedulingPostsSimResults() throws IOException { assertEquals("05:00:00", plantSegments.get(2).startOffset()); // GB2 end final var topics = hasura.getTopicsEvents(datasetId); - assertEquals(39, topics.size()); + assertEquals(41, topics.size()); // Assert that the keys to be inspected are included assertTrue(topics.containsKey("ActivityType.Input.GrowBanana")); assertTrue(topics.containsKey("ActivityType.Output.GrowBanana")); diff --git a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/types/ActivityValidation.java b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/types/ActivityValidation.java new file mode 100644 index 0000000000..599e84d394 --- /dev/null +++ b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/types/ActivityValidation.java @@ -0,0 +1,54 @@ +package gov.nasa.jpl.aerie.e2e.types; + +import javax.json.JsonObject; +import javax.json.JsonString; +import javax.json.JsonValue; +import java.util.List; + +public sealed interface ActivityValidation { + record Pending() implements ActivityValidation {} + record Success() implements ActivityValidation {} + record InstantiationFailure(List extraneousArguments, List missingArguments, List unconstructableArguments) implements ActivityValidation {} + record ValidationFailure(List notices) implements ActivityValidation {} + record NoSuchActivityTypeFailure(String message, String activityType) implements ActivityValidation {} + + record ValidationNotice(List subjects, String message) { } + record UnconstructableArgument(String name, String failure) { } + + static ActivityValidation fromJSON(JsonObject obj) { + final var status = obj.getString("status"); + if (!status.equals("complete")) { + return new Pending(); + } + final var validations = obj.getJsonObject("validations"); + if (validations.getBoolean("success")) { + return new Success(); + } + final String type = validations.getString("type"); + final JsonObject errors = validations.getJsonObject("errors"); + return switch (type) { + case "INSTANTIATION_ERRORS" -> new InstantiationFailure( + getStringArray(errors, "extraneousArguments"), + getStringArray(errors, "missingArguments"), + errors + .asJsonObject() + .getJsonArray("unconstructableArguments") + .getValuesAs( + $ -> new UnconstructableArgument( + $.asJsonObject().getString("name"), + $.asJsonObject().getString("failure")))); + case "VALIDATION_NOTICES" -> new ValidationFailure( + errors.getJsonArray("validationNotices") + .getValuesAs( + $ -> new ValidationNotice( + getStringArray($, "subjects"), + $.asJsonObject().getString("message")))); + case "NO_SUCH_ACTIVITY_TYPE" -> new NoSuchActivityTypeFailure(errors.getJsonObject("noSuchActivityError").getString("message"), errors.getJsonObject("noSuchActivityError").getString("activity_type")); + default -> throw new RuntimeException("Unhandled error type: " + type); + }; + } + + static List getStringArray(JsonValue object, String key) { + return object.asJsonObject().getJsonArray(key).getValuesAs(subj -> ((JsonString) subj).getString()); + } +} diff --git a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/GQL.java b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/GQL.java index 45204406d9..94dedae3ea 100644 --- a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/GQL.java +++ b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/GQL.java @@ -210,6 +210,16 @@ query GetEffectiveActivityArgumentsBulk($modelId: ID!, $activities: [EffectiveAr success } }"""), + GET_ACTIVITY_VALIDATIONS(""" + query GetActivityValidations($planId: Int!) { + activity_directive_validations(where: {plan_id: {_eq: $planId}}) { + status + validations + plan_id + last_modified_arguments_at + directive_id + } + }"""), GET_EFFECTIVE_MODEL_ARGUMENTS(""" query GetEffectiveModelArguments($modelId: ID!, $modelArgs: ModelArguments!) { getModelEffectiveArguments(missionModelId: $modelId, modelArguments: $modelArgs) { diff --git a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/HasuraRequests.java b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/HasuraRequests.java index 6921677f47..d9d32251ad 100644 --- a/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/HasuraRequests.java +++ b/e2e-tests/src/test/java/gov/nasa/jpl/aerie/e2e/utils/HasuraRequests.java @@ -9,6 +9,7 @@ import org.apache.commons.lang3.tuple.Pair; import javax.json.Json; +import javax.json.JsonArray; import javax.json.JsonValue; import javax.json.JsonObject; import java.io.IOException; @@ -222,6 +223,22 @@ public List getEffectiveActivityArgumentsBulk( .getJsonArray("getActivityEffectiveArgumentsBulk") .getValuesAs(EffectiveActivityArguments::fromJSON); } + + public Map getActivityValidations(final int planId) throws IOException { + final var variables = Json.createObjectBuilder() + .add("planId", planId) + .build(); + final JsonArray response = makeRequest(GQL.GET_ACTIVITY_VALIDATIONS, variables) + .getJsonArray("activity_directive_validations"); + final var res = new HashMap(); + for (final var object : response) { + res.put( + (long) object.asJsonObject().getInt("directive_id"), + ActivityValidation.fromJSON(object.asJsonObject())); + } + return res; + } + //endregion //region Simulation diff --git a/examples/banananation/src/main/java/gov/nasa/jpl/aerie/banananation/activities/ExceptionActivity.java b/examples/banananation/src/main/java/gov/nasa/jpl/aerie/banananation/activities/ExceptionActivity.java new file mode 100644 index 0000000000..167daea947 --- /dev/null +++ b/examples/banananation/src/main/java/gov/nasa/jpl/aerie/banananation/activities/ExceptionActivity.java @@ -0,0 +1,29 @@ +package gov.nasa.jpl.aerie.banananation.activities; + +import gov.nasa.jpl.aerie.merlin.framework.annotations.ActivityType; +import gov.nasa.jpl.aerie.merlin.framework.annotations.Export.Parameter; +import gov.nasa.jpl.aerie.merlin.framework.annotations.Export.Validation; + +/** + * Conditionally throws a runtime exception at both validation time and runtime + */ +@ActivityType("ExceptionActivity") +public final class ExceptionActivity { + @Parameter + public boolean throwException = false; + + @Validation("Throws an exception if set") + @Validation.Subject("throwException") + public boolean conditionallyThrowException() { + if (this.throwException) { + throw new RuntimeException("Throwing runtime exception during validation"); + } + return true; + } + + public void run() { + if (this.throwException) { + throw new RuntimeException("Throwing runtime exception during runtime"); + } + } +} diff --git a/examples/banananation/src/main/java/gov/nasa/jpl/aerie/banananation/package-info.java b/examples/banananation/src/main/java/gov/nasa/jpl/aerie/banananation/package-info.java index 9b2b863fdd..1f27ab1fb0 100644 --- a/examples/banananation/src/main/java/gov/nasa/jpl/aerie/banananation/package-info.java +++ b/examples/banananation/src/main/java/gov/nasa/jpl/aerie/banananation/package-info.java @@ -25,6 +25,7 @@ @WithActivityType(DurationParameterActivity.class) @WithActivityType(ControllableDurationActivity.class) @WithActivityType(RipenBananaActivity.class) +@WithActivityType(ExceptionActivity.class) package gov.nasa.jpl.aerie.banananation; @@ -38,6 +39,7 @@ import gov.nasa.jpl.aerie.banananation.activities.DecomposingSpawnActivity; import gov.nasa.jpl.aerie.banananation.activities.DownloadBananaActivity; import gov.nasa.jpl.aerie.banananation.activities.DurationParameterActivity; +import gov.nasa.jpl.aerie.banananation.activities.ExceptionActivity; import gov.nasa.jpl.aerie.banananation.activities.GrowBananaActivity; import gov.nasa.jpl.aerie.banananation.activities.LineCountBananaActivity; import gov.nasa.jpl.aerie.banananation.activities.ParameterTestActivity; diff --git a/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MapperMethodMaker.java b/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MapperMethodMaker.java index c4bfa837ff..7226c983c5 100644 --- a/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MapperMethodMaker.java +++ b/merlin-framework-processor/src/main/java/gov/nasa/jpl/aerie/merlin/processor/generator/MapperMethodMaker.java @@ -159,6 +159,7 @@ public final MethodSpec makeGetValidationFailuresMethod() { return CodeBlock .builder() + .beginControlFlow("try") .addStatement( "if (!$L.$L()) notices.add(new $T($T.of($L), $S))", "input", @@ -166,7 +167,10 @@ public final MethodSpec makeGetValidationFailuresMethod() { ValidationNotice.class, List.class, subjects, - validation.failureMessage()); + validation.failureMessage()) + .nextControlFlow("catch ($T ex)", Throwable.class) + .addStatement("notices.add(new $T($T.of($L), ex.getMessage()))", ValidationNotice.class, List.class, subjects) + .endControlFlow(); }) .reduce(CodeBlock.builder(), (x, y) -> x.add(y.build())) .build()) diff --git a/merlin-server/sql/merlin/applied_migrations.sql b/merlin-server/sql/merlin/applied_migrations.sql index b792418c8f..4e505e9bb7 100644 --- a/merlin-server/sql/merlin/applied_migrations.sql +++ b/merlin-server/sql/merlin/applied_migrations.sql @@ -34,3 +34,4 @@ call migrations.mark_migration_applied('28'); call migrations.mark_migration_applied('29'); call migrations.mark_migration_applied('30'); call migrations.mark_migration_applied('31'); +call migrations.mark_migration_applied('32'); diff --git a/merlin-server/sql/merlin/tables/activity_directive.sql b/merlin-server/sql/merlin/tables/activity_directive.sql index 21890bc429..fe6cf8ff7d 100644 --- a/merlin-server/sql/merlin/tables/activity_directive.sql +++ b/merlin-server/sql/merlin/tables/activity_directive.sql @@ -186,11 +186,19 @@ comment on trigger set_timestamp on activity_directive is e'' create function activity_directive_set_arguments_updated_at() returns trigger security definer - language plpgsql as $$begin - call plan_locked_exception(new.plan_id); - new.last_modified_arguments_at = now(); + language plpgsql as +$$ begin + call plan_locked_exception(new.plan_id); + new.last_modified_arguments_at = now(); + + -- request new validation + update activity_directive_validations + set last_modified_arguments_at = new.last_modified_arguments_at, + status = 'pending' + where (directive_id, plan_id) = (new.id, new.plan_id); + return new; -end$$; +end $$; comment on function activity_directive_set_arguments_updated_at() is e'' 'Sets the last_modified_arguments_at field of an activity_directive to the current time.'; @@ -203,6 +211,22 @@ execute function activity_directive_set_arguments_updated_at(); comment on trigger set_arguments_timestamp on activity_directive is e'' 'Sets the last_modified_arguments_at field of an activity_directive to the current time.'; +create function activity_directive_validation_entry() + returns trigger + security definer + language plpgsql as +$$ begin + insert into activity_directive_validations + (directive_id, plan_id, last_modified_arguments_at) + values (new.id, new.plan_id, new.last_modified_arguments_at); + return new; +end $$; + +create trigger validation_entry_on_insert + after insert on activity_directive + for each row +execute function activity_directive_validation_entry(); + create function check_activity_directive_metadata() returns trigger security definer diff --git a/merlin-server/sql/merlin/tables/activity_directive_validations.sql b/merlin-server/sql/merlin/tables/activity_directive_validations.sql index 93f0a881f0..0feb40b104 100644 --- a/merlin-server/sql/merlin/tables/activity_directive_validations.sql +++ b/merlin-server/sql/merlin/tables/activity_directive_validations.sql @@ -1,8 +1,8 @@ create table activity_directive_validations ( directive_id integer not null, plan_id integer not null, - - last_modified_at timestamptz not null default now(), + last_modified_arguments_at timestamptz not null, + status text not null default 'pending', validations jsonb default '{}'::jsonb, constraint activity_directive_validations_natural_key @@ -21,7 +21,7 @@ comment on column activity_directive_validations.directive_id is e'' 'The activity directive these validations are extracted from.'; comment on column activity_directive_validations.plan_id is '' 'The plan associated with the activity directive these validations are extracted from.'; -comment on column activity_directive_validations.last_modified_at is e'' +comment on column activity_directive_validations.last_modified_arguments_at is e'' 'The time at which these argument validations were last modified.'; comment on column activity_directive_validations.validations is e'' 'The argument validations extracted from an activity directive.'; diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/AerieAppDriver.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/AerieAppDriver.java index 56cdb8a257..cf56b85d09 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/AerieAppDriver.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/AerieAppDriver.java @@ -14,6 +14,7 @@ import gov.nasa.jpl.aerie.merlin.server.remotes.ResultsCellRepository; import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresConstraintRepository; import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresMissionModelRepository; +import gov.nasa.jpl.aerie.merlin.server.services.ValidationWorker; import gov.nasa.jpl.aerie.permissions.PermissionsService; import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresPlanRepository; import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresResultsCellRepository; @@ -53,6 +54,16 @@ public static void main(final String[] args) { configuration.merlinFileStore(), stores.missionModels(), configuration.untruePlanStart()); + + if (configuration.enableContinuousValidationThread()) { + final var validationWorker = new ValidationWorker( + missionModelController, + configuration.validationThreadPollingPeriod()); + final var thread = new Thread(validationWorker::workerLoop); + thread.setDaemon(true); + thread.start(); + } + final var planController = new LocalPlanService(stores.plans()); final var typescriptCodeGenerationService = new TypescriptCodeGenerationServiceAdapter(missionModelController, planController); @@ -169,7 +180,9 @@ private static AppConfiguration loadConfiguration() { getEnv("MERLIN_DB", "aerie_merlin")), Instant.parse(getEnv("UNTRUE_PLAN_START", "")), URI.create(getEnv("HASURA_GRAPHQL_URL", "http://localhost:8080/v1/graphql")), - getEnv("HASURA_GRAPHQL_ADMIN_SECRET", "") + getEnv("HASURA_GRAPHQL_ADMIN_SECRET", ""), + Boolean.parseBoolean(getEnv("ENABLE_CONTINUOUS_VALIDATION_THREAD", "false")), + Integer.parseInt(getEnv("VALIDATION_THREAD_POLLING_PERIOD", "500")) ); } } diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/config/AppConfiguration.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/config/AppConfiguration.java index 3c32d4594b..347766d50d 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/config/AppConfiguration.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/config/AppConfiguration.java @@ -12,7 +12,9 @@ public record AppConfiguration ( Store store, Instant untruePlanStart, URI hasuraGraphqlURI, - String hasuraGraphQlAdminSecret + String hasuraGraphQlAdminSecret, + boolean enableContinuousValidationThread, + int validationThreadPollingPeriod ) { public AppConfiguration { Objects.requireNonNull(merlinFileStore); diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/http/ResponseSerializers.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/http/ResponseSerializers.java index 04eb3074c1..ffd1beeb14 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/http/ResponseSerializers.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/http/ResponseSerializers.java @@ -24,6 +24,7 @@ import gov.nasa.jpl.aerie.merlin.server.services.LocalMissionModelService; import gov.nasa.jpl.aerie.merlin.server.services.MissionModelService; import gov.nasa.jpl.aerie.merlin.server.services.MissionModelService.BulkEffectiveArgumentResponse; +import gov.nasa.jpl.aerie.merlin.server.services.MissionModelService.BulkArgumentValidationResponse; import gov.nasa.jpl.aerie.merlin.server.services.UnexpectedSubtypeError; import org.apache.commons.lang3.tuple.Pair; @@ -158,6 +159,45 @@ public static JsonValue serializeBulkEffectiveArgumentResponse(BulkEffectiveArgu .build(); } + public static JsonValue serializeBulkArgumentValidationResponseList(final List responses) { + return serializeIterable(ResponseSerializers::serializeBulkArgumentValidationResponse, responses); + } + + public static JsonValue serializeBulkArgumentValidationResponse(BulkArgumentValidationResponse response) { + // TODO use pattern matching in switch statement with JDK 21 + if (response instanceof BulkArgumentValidationResponse.Success) { + return Json.createObjectBuilder() + .add("success", JsonValue.TRUE) + .build(); + } else if (response instanceof BulkArgumentValidationResponse.Validation v) { + return Json.createObjectBuilder() + .add("success", JsonValue.FALSE) + .add("type", "VALIDATION_NOTICES") + .add("errors", Json.createObjectBuilder() + .add("validationNotices", serializeIterable(ResponseSerializers::serializeValidationNotice, v.notices())) + .build()) + .build(); + } else if (response instanceof BulkArgumentValidationResponse.NoSuchActivityError e) { + return Json.createObjectBuilder() + .add("success", JsonValue.FALSE) + .add("type", "NO_SUCH_ACTIVITY_TYPE") + .add("errors", Json.createObjectBuilder() + .add("noSuchActivityError", serializeNoSuchActivityTypeException(e.ex())) + .build()) + .build(); + } else if (response instanceof BulkArgumentValidationResponse.InstantiationError f) { + return Json.createObjectBuilder(serializeInstantiationException(f.ex()).asJsonObject()) + .add("type", "INSTANTIATION_ERRORS") + .build(); + } + + // This should never happen, but we don't have exhaustive pattern matching + return Json.createObjectBuilder() + .add("success", JsonValue.FALSE) + .add("errors", String.format("Internal error: %s", response)) + .build(); + } + public static JsonValue serializeCreatedDatasetId(final long datasetId) { return Json.createObjectBuilder() .add("datasetId", datasetId) diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/models/ActivityDirectiveForValidation.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/models/ActivityDirectiveForValidation.java index 5adf6bb53a..6aefe8ca42 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/models/ActivityDirectiveForValidation.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/models/ActivityDirectiveForValidation.java @@ -2,6 +2,8 @@ import gov.nasa.jpl.aerie.merlin.driver.SerializedActivity; +import java.sql.Timestamp; + public record ActivityDirectiveForValidation ( ActivityDirectiveId id, diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/models/MissionModelId.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/models/MissionModelId.java new file mode 100644 index 0000000000..d212b7f442 --- /dev/null +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/models/MissionModelId.java @@ -0,0 +1,8 @@ +package gov.nasa.jpl.aerie.merlin.server.models; + +public record MissionModelId(long id) { + @Override + public String toString() { + return String.valueOf(this.id()); + } +} diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/MissionModelRepository.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/MissionModelRepository.java index 158f5d3a31..f19b5d3b71 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/MissionModelRepository.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/MissionModelRepository.java @@ -1,14 +1,14 @@ package gov.nasa.jpl.aerie.merlin.server.remotes; import gov.nasa.jpl.aerie.merlin.protocol.model.InputType.Parameter; -import gov.nasa.jpl.aerie.merlin.protocol.model.InputType.ValidationNotice; import gov.nasa.jpl.aerie.merlin.protocol.model.Resource; -import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveId; +import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveForValidation; import gov.nasa.jpl.aerie.merlin.server.models.ActivityType; import gov.nasa.jpl.aerie.merlin.server.models.Constraint; +import gov.nasa.jpl.aerie.merlin.server.models.MissionModelId; import gov.nasa.jpl.aerie.merlin.server.models.MissionModelJar; -import gov.nasa.jpl.aerie.merlin.server.models.PlanId; -import gov.nasa.jpl.aerie.merlin.server.models.Timestamp; +import gov.nasa.jpl.aerie.merlin.server.services.MissionModelService.BulkArgumentValidationResponse; +import org.apache.commons.lang3.tuple.Pair; import java.util.List; import java.util.Map; @@ -23,8 +23,9 @@ public interface MissionModelRepository { // Mutations void updateModelParameters(String missionModelId, final List modelParameters) throws NoSuchMissionModelException; void updateActivityTypes(String missionModelId, final Map activityTypes) throws NoSuchMissionModelException; - void updateActivityDirectiveValidations(final ActivityDirectiveId directiveId, final PlanId planId, final Timestamp argumentsModifiedTime, final List notices); void updateResourceTypes(String missionModelId, final Map> resourceTypes) throws NoSuchMissionModelException; + Map> getUnvalidatedDirectives(); + void updateDirectiveValidations(List> updates); final class NoSuchMissionModelException extends Exception {} } diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetUnvalidatedDirectivesAction.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetUnvalidatedDirectivesAction.java new file mode 100644 index 0000000000..0697906f63 --- /dev/null +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/GetUnvalidatedDirectivesAction.java @@ -0,0 +1,67 @@ +package gov.nasa.jpl.aerie.merlin.server.remotes.postgres; + +import gov.nasa.jpl.aerie.merlin.driver.SerializedActivity; +import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveForValidation; +import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveId; +import gov.nasa.jpl.aerie.merlin.server.models.MissionModelId; +import gov.nasa.jpl.aerie.merlin.server.models.PlanId; +import gov.nasa.jpl.aerie.merlin.server.models.Timestamp; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.activityArgumentsP; +import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.getJsonColumn; + +public class GetUnvalidatedDirectivesAction implements AutoCloseable { + private static final String sql = """ + select ad.id, ad.plan_id, ad.type, ad.arguments, p.model_id, adv.last_modified_arguments_at + from activity_directive ad + join activity_directive_validations adv + on ad.id = adv.directive_id and ad.plan_id = adv.plan_id + join plan p + on ad.plan_id = p.id + where adv.status = 'pending'; + """; + + private final PreparedStatement statement; + + public GetUnvalidatedDirectivesAction(Connection connection) throws SQLException { + this.statement = connection.prepareStatement(sql); + } + + public Map> get() throws SQLException { + final var results = this.statement.executeQuery(); + final var map = new HashMap>(); + + while (results.next()) { + final var modelId = new MissionModelId(results.getInt("model_id")); + final var directiveId = results.getInt("id"); + final var planId = results.getInt("plan_id"); + final var type = results.getString("type"); + final var arguments = getJsonColumn(results, "arguments", activityArgumentsP) + .getSuccessOrThrow($ -> new Error("Corrupt activity arguments cannot be parsed: " + $.reason())); + final var argumentTimestamp = results.getTimestamp("last_modified_arguments_at"); + + map.computeIfAbsent(modelId, $ -> new ArrayList<>()) + .add(new ActivityDirectiveForValidation( + new ActivityDirectiveId(directiveId), + new PlanId(planId), + argumentTimestamp, + new SerializedActivity(type, arguments) + )); + } + + return map; + } + + @Override + public void close() throws SQLException { + this.statement.close(); + } +} diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostgresMissionModelRepository.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostgresMissionModelRepository.java index b111532bcb..c5bf12b48c 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostgresMissionModelRepository.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PostgresMissionModelRepository.java @@ -2,15 +2,15 @@ import gov.nasa.jpl.aerie.constraints.model.ConstraintType; import gov.nasa.jpl.aerie.merlin.protocol.model.InputType.Parameter; -import gov.nasa.jpl.aerie.merlin.protocol.model.InputType.ValidationNotice; import gov.nasa.jpl.aerie.merlin.protocol.model.Resource; -import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveId; +import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveForValidation; import gov.nasa.jpl.aerie.merlin.server.models.ActivityType; import gov.nasa.jpl.aerie.merlin.server.models.Constraint; +import gov.nasa.jpl.aerie.merlin.server.models.MissionModelId; import gov.nasa.jpl.aerie.merlin.server.models.MissionModelJar; -import gov.nasa.jpl.aerie.merlin.server.models.PlanId; -import gov.nasa.jpl.aerie.merlin.server.models.Timestamp; import gov.nasa.jpl.aerie.merlin.server.remotes.MissionModelRepository; +import gov.nasa.jpl.aerie.merlin.server.services.MissionModelService; +import org.apache.commons.lang3.tuple.Pair; import javax.sql.DataSource; import java.sql.SQLException; @@ -146,20 +146,22 @@ public void updateResourceTypes(final String missionModelId, final Map notices - ) - { - try (final var connection = this.dataSource.getConnection()) { - try (final var updateActivityDirectiveValidationsAction = new UpdateActivityDirectiveValidationsAction(connection)) { - updateActivityDirectiveValidationsAction.apply(directiveId.id(), planId.id(), argumentsModifiedTime, notices); - } - } catch (final SQLException ex) { - throw new DatabaseException( - "Failed to update derived data for activity directive with id `%d` and plan id '%d'".formatted(directiveId.id(), planId.id()), ex); + public Map> getUnvalidatedDirectives() { + try (final var connection = this.dataSource.getConnection(); + final var unvalidatedDirectivesAction = new GetUnvalidatedDirectivesAction(connection)) { + return unvalidatedDirectivesAction.get(); + } catch (SQLException ex) { + throw new DatabaseException("Failed to get unvalidated activity directives", ex); + } + } + + @Override + public void updateDirectiveValidations(List> updates) { + try (final var connection = this.dataSource.getConnection(); + final var updateAction = new UpdateActivityDirectiveValidationsAction(connection)) { + updateAction.apply(updates); + } catch (SQLException ex) { + throw new DatabaseException("Failed to update activity directive validations", ex); } } diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PreparedStatements.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PreparedStatements.java index 7819ed1ccf..639561e6f5 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PreparedStatements.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/PreparedStatements.java @@ -2,11 +2,11 @@ import gov.nasa.jpl.aerie.merlin.driver.SimulationFailure; import gov.nasa.jpl.aerie.merlin.protocol.model.InputType.Parameter; -import gov.nasa.jpl.aerie.merlin.protocol.model.InputType.ValidationNotice; import gov.nasa.jpl.aerie.merlin.protocol.types.Duration; import gov.nasa.jpl.aerie.merlin.server.http.MerlinParsers; import gov.nasa.jpl.aerie.merlin.server.http.ResponseSerializers; import gov.nasa.jpl.aerie.merlin.server.models.Timestamp; +import gov.nasa.jpl.aerie.merlin.server.services.MissionModelService; import org.intellij.lang.annotations.Language; import javax.json.Json; @@ -83,9 +83,12 @@ public static void setRequiredParameters(final PreparedStatement statement, fina statement.setString(parameter, ResponseSerializers.serializeStringList(requiredParameters).toString()); } - public static void setValidationNotices(final PreparedStatement statement, final int parameter, final List notices) - throws SQLException { - statement.setString(parameter, ResponseSerializers.serializeValidationNotices(notices).toString()); + public static void setValidationResponse( + final PreparedStatement statement, + final int parameter, + final MissionModelService.BulkArgumentValidationResponse response + ) throws SQLException { + statement.setString(parameter, ResponseSerializers.serializeBulkArgumentValidationResponse(response).toString()); } public static void setFailureReason(final PreparedStatement statement, final int parameter, final SimulationFailure reason) diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/UpdateActivityDirectiveValidationsAction.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/UpdateActivityDirectiveValidationsAction.java index 066b860443..4033597a19 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/UpdateActivityDirectiveValidationsAction.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/remotes/postgres/UpdateActivityDirectiveValidationsAction.java @@ -1,9 +1,11 @@ package gov.nasa.jpl.aerie.merlin.server.remotes.postgres; -import gov.nasa.jpl.aerie.merlin.protocol.model.InputType.ValidationNotice; -import gov.nasa.jpl.aerie.merlin.server.models.Timestamp; +import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveForValidation; +import org.apache.commons.lang3.tuple.Pair; import org.intellij.lang.annotations.Language; +import gov.nasa.jpl.aerie.merlin.server.services.MissionModelService.BulkArgumentValidationResponse; +import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -11,31 +13,39 @@ /*package-local*/ final class UpdateActivityDirectiveValidationsAction implements AutoCloseable { private static final @Language("SQL") String sql = """ - insert into activity_directive_validations - as validation (directive_id, plan_id, last_modified_at, validations) - values (?, ?, ?::timestamptz, ?::json) - on conflict (directive_id, plan_id) do update - set - last_modified_at = excluded.last_modified_at, - validations = excluded.validations - where validation.last_modified_at < excluded.last_modified_at + update activity_directive_validations + set validations = ?::jsonb, + status = 'complete' + where (directive_id, plan_id, last_modified_arguments_at) = (?, ?, ?) """; private final PreparedStatement statement; public UpdateActivityDirectiveValidationsAction(final Connection connection) throws SQLException { this.statement = connection.prepareStatement(sql); + connection.setAutoCommit(false); } - public void apply(final long directiveId, final long planId, final Timestamp argumentsModifiedTime, final List notices) - throws SQLException, FailedUpdateException - { - this.statement.setLong(1, directiveId); - this.statement.setLong(2, planId); - PreparedStatements.setTimestamp(this.statement, 3, argumentsModifiedTime); - PreparedStatements.setValidationNotices(this.statement, 4, notices); - - statement.executeUpdate(); + public void apply(List> updates) throws SQLException { + try { + for (final var update : updates) { + final var directive = update.getLeft(); + final var validation = update.getRight(); + + PreparedStatements.setValidationResponse(statement, 1, validation); + statement.setLong(2, directive.id().id()); + statement.setLong(3, directive.planId().id()); + statement.setTimestamp(4, directive.argumentsModifiedTime()); + + statement.addBatch(); + } + + statement.executeBatch(); // throws BatchUpdateException if any statement fails + statement.getConnection().commit(); + } catch (BatchUpdateException e) { + statement.getConnection().rollback(); + throw e; + } } @Override diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/LocalMissionModelService.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/LocalMissionModelService.java index 3613a4c5e5..bacf2ea303 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/LocalMissionModelService.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/LocalMissionModelService.java @@ -17,8 +17,10 @@ import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveForValidation; import gov.nasa.jpl.aerie.merlin.server.models.ActivityType; import gov.nasa.jpl.aerie.merlin.server.models.Constraint; +import gov.nasa.jpl.aerie.merlin.server.models.MissionModelId; import gov.nasa.jpl.aerie.merlin.server.models.MissionModelJar; import gov.nasa.jpl.aerie.merlin.server.remotes.MissionModelRepository; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +32,7 @@ import java.util.Map; import java.util.Optional; import java.util.function.Consumer; +import java.util.stream.Collectors; /** * Implements the missionModel service {@link MissionModelService} interface on a set of local domain objects. @@ -136,6 +139,43 @@ public List validateActivityArguments(final String missionMode return directiveType.getInputType().validateArguments(activity.getArguments()); } + public List validateActivityArgumentsBulk( + final MissionModelId modelId, + final List activities + ) throws NoSuchMissionModelException, MissionModelLoadException { + // load mission model once for all activities + final var modelType = this.loadMissionModelType(modelId.toString()); + final var registry = DirectiveTypeRegistry.extract(modelType); + + // map all directives to validation response + return activities.stream().map((directive) -> { + final var typeName = directive.activity().getTypeName(); + final var arguments = directive.activity().getArguments(); + + try { + final var directiveType = registry.directiveTypes().get(typeName); + if (directiveType == null) { + return new BulkArgumentValidationResponse.NoSuchActivityError(new NoSuchActivityTypeException(typeName)); + } + + final var notices = directiveType.getInputType().validateArguments(arguments); + return notices.isEmpty() + ? new BulkArgumentValidationResponse.Success() + : new BulkArgumentValidationResponse.Validation(notices); + } catch (InstantiationException e) { + return new BulkArgumentValidationResponse.InstantiationError(e); + } + }).collect(Collectors.toList()); + } + + public Map> getUnvalidatedDirectives() { + return missionModelRepository.getUnvalidatedDirectives(); + } + + public void updateDirectiveValidations(List> updates) { + missionModelRepository.updateDirectiveValidations(updates); + } + /** * Validate that a set of activity parameters conforms to the expectations of a named mission model. * @@ -310,14 +350,6 @@ public void refreshResourceTypes(final String missionModelId) } } - @Override - public void refreshActivityValidations(final String missionModelId, final ActivityDirectiveForValidation directive) - throws NoSuchMissionModelException, InstantiationException - { - final var notices = validateActivityArguments(missionModelId, directive.activity()); - this.missionModelRepository.updateActivityDirectiveValidations(directive.id(), directive.planId(), directive.argumentsModifiedTime(), notices); - } - private ModelType loadMissionModelType(final String missionModelId) throws NoSuchMissionModelException, MissionModelLoadException { diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/MissionModelService.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/MissionModelService.java index 0aca7d14cc..a027b10e9d 100644 --- a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/MissionModelService.java +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/MissionModelService.java @@ -10,8 +10,6 @@ import gov.nasa.jpl.aerie.merlin.protocol.types.InstantiationException; import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue; import gov.nasa.jpl.aerie.merlin.protocol.types.ValueSchema; -import gov.nasa.jpl.aerie.merlin.server.ResultsProtocol; -import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveForValidation; import gov.nasa.jpl.aerie.merlin.server.models.ActivityType; import gov.nasa.jpl.aerie.merlin.server.models.Constraint; import gov.nasa.jpl.aerie.merlin.server.models.MissionModelJar; @@ -71,8 +69,6 @@ SimulationResults runSimulation(CreateSimulationMessage message, Consumer notices) implements BulkArgumentValidationResponse { } + record NoSuchActivityError(NoSuchActivityTypeException ex) implements BulkArgumentValidationResponse { } + record InstantiationError(InstantiationException ex) implements BulkArgumentValidationResponse { } + } } diff --git a/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/ValidationWorker.java b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/ValidationWorker.java new file mode 100644 index 0000000000..62ff4a87bb --- /dev/null +++ b/merlin-server/src/main/java/gov/nasa/jpl/aerie/merlin/server/services/ValidationWorker.java @@ -0,0 +1,70 @@ +package gov.nasa.jpl.aerie.merlin.server.services; + +import gov.nasa.jpl.aerie.merlin.server.models.ActivityDirectiveForValidation; +import gov.nasa.jpl.aerie.merlin.server.services.MissionModelService.NoSuchMissionModelException; +import gov.nasa.jpl.aerie.merlin.server.services.MissionModelService.BulkArgumentValidationResponse; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.stream.IntStream; +import java.util.List; + +public record ValidationWorker(LocalMissionModelService missionModelService, int pollingPeriod) { + + private static final Logger logger = LoggerFactory.getLogger(ValidationWorker.class); + + public void workerLoop() { + logger.info("validation worker starting..."); + while (!Thread.interrupted()) { + try { + Thread.sleep(pollingPeriod); + + // get unvalidated directives, batched by mission model id + final var validationRequests = missionModelService.getUnvalidatedDirectives(); + if (!validationRequests.isEmpty()) { + logger.debug( + "queried {} directives that need validations, across {} models", + validationRequests.size(), + validationRequests.keySet().size()); + } + + // spin up each mission model once and process all corresponding directive validations + for (final var entry : validationRequests.entrySet()) { + final var beginTime = System.nanoTime(); + + final var modelId = entry.getKey(); + logger.debug("processing batch for mission model: {}", modelId.toString()); + + final var unvalidatedDirectives = entry.getValue(); + final var responses = missionModelService.validateActivityArgumentsBulk(modelId, unvalidatedDirectives); + + // zip together directives and validations, since DB action needs to insert validations for a given directive + final List> zippedList = zip(unvalidatedDirectives, responses); + + // write validations out to DB + missionModelService.updateDirectiveValidations(zippedList); + + final var endTime = System.nanoTime(); + final var duration = (endTime - beginTime) / 1_000_000.0; + logger.debug("processed model batch of size {} in {} ms", unvalidatedDirectives.size(), duration); + } + + } catch (NoSuchMissionModelException ex) { + logger.error("Validation request failed due to no such mission model: {}", ex.toString()); + } catch (InterruptedException ex) { + // we were interrupted, so exit gracefully + return; + } catch (Throwable t) { + // catch all to keep validation thread from dying, which would require a merlin-server restart + logger.error("Recovering from unexpected error encountered in validation thread: ", t); + } + } + } + + private static List> zip(List left, List right) { + return IntStream.range(0, Math.min(left.size(), right.size())) + .mapToObj(i -> Pair.of(left.get(i), right.get(i))) + .toList(); + } +} diff --git a/merlin-server/src/test/java/gov/nasa/jpl/aerie/merlin/server/mocks/StubMissionModelService.java b/merlin-server/src/test/java/gov/nasa/jpl/aerie/merlin/server/mocks/StubMissionModelService.java index b0042025d1..af7f903016 100644 --- a/merlin-server/src/test/java/gov/nasa/jpl/aerie/merlin/server/mocks/StubMissionModelService.java +++ b/merlin-server/src/test/java/gov/nasa/jpl/aerie/merlin/server/mocks/StubMissionModelService.java @@ -213,9 +213,6 @@ public void refreshModelParameters(final String missionModelId) throws NoSuchMis @Override public void refreshActivityTypes(final String missionModelId) throws NoSuchMissionModelException {} - @Override - public void refreshActivityValidations(final String missionModelId, final ActivityDirectiveForValidation directive) {} - @Override public void refreshResourceTypes(final String missionModelId) throws NoSuchMissionModelException {} }