From bd0a7c62c699128b9d3a3e9771acea9c09fb3178 Mon Sep 17 00:00:00 2001 From: Theresa Kamerman Date: Thu, 5 Oct 2023 09:50:11 -0700 Subject: [PATCH 1/2] Close all running threads at the end of main Without this, Javalin and the listener thread will continue running, even if the main loop has closed due to an exception. This prevents the container from exiting (meaning Docker won't restart it), and makes the worker appear alive but unresponsive to the notifications it is receiving when looking at the logs. --- .../worker/ListenSimulationCapability.java | 14 +++-- .../merlin/worker/MerlinWorkerAppDriver.java | 55 +++++++++++-------- .../worker/ListenSchedulerCapability.java | 14 +++-- .../worker/SchedulerWorkerAppDriver.java | 50 +++++++++-------- 4 files changed, 76 insertions(+), 57 deletions(-) diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/ListenSimulationCapability.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/ListenSimulationCapability.java index 946424b669..934ada2e81 100644 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/ListenSimulationCapability.java +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/ListenSimulationCapability.java @@ -28,8 +28,8 @@ public ListenSimulationCapability( this.notificationQueue = notificationQueue; } - public void registerListener() { - new Thread(() -> { + public Thread registerListener() { + final var listenThread = new Thread(() -> { try (final var connection = this.dataSource.getConnection()) { try (final var listenSimulationStatusAction = new ListenSimulationStatusAction(connection)) { listenSimulationStatusAction.apply(); @@ -37,7 +37,7 @@ public void registerListener() { throw new DatabaseException("Failed to register as LISTEN to postgres database.", ex); } - while (true) { + while (!Thread.currentThread().isInterrupted()) { final var pgConnection = connection.unwrap(PGConnection.class); final var notifications = pgConnection.getNotifications(10000); if (notifications != null) { @@ -54,16 +54,20 @@ public void registerListener() { try { this.notificationQueue.put(notificationPayload); } catch (InterruptedException e) { - // We do not expect this thread to be interrupted. If it is, exit gracefully: + // This thread will be interrupted when the worker's main loop exits, so it should exit gracefully: + logger.info("Listener has been interrupted"); return; } } } } } + logger.info("Listener has received interrupted signal"); } catch (SQLException e) { throw new DatabaseException("Listener encountered exception", e); } - }).start(); + }); + listenThread.start(); + return listenThread; } } diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/MerlinWorkerAppDriver.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/MerlinWorkerAppDriver.java index 7a24565db2..9773a81435 100644 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/MerlinWorkerAppDriver.java +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/MerlinWorkerAppDriver.java @@ -55,38 +55,45 @@ public static void main(String[] args) throws InterruptedException { configuration.untruePlanStart() ); final var planController = new LocalPlanService(stores.plans()); - final var simulationAgent = new SynchronousSimulationAgent(planController, missionModelController, configuration.simulationProgressPollPeriodMillis()); + final var simulationAgent = new SynchronousSimulationAgent( + planController, + missionModelController, + configuration.simulationProgressPollPeriodMillis()); final var notificationQueue = new LinkedBlockingQueue(); final var listenAction = new ListenSimulationCapability(hikariDataSource, notificationQueue); - listenAction.registerListener(); + final var listenThread = listenAction.registerListener(); - final var app = Javalin.create().start(8080); - app.get("/health", ctx -> ctx.status(200)); + try (final var app = Javalin.create().start(8080)) { + app.get("/health", ctx -> ctx.status(200)); - while (true) { - final var notification = notificationQueue.take(); - final var planId = new PlanId(notification.planId()); - final var datasetId = notification.datasetId(); + while (true) { + final var notification = notificationQueue.take(); + final var planId = new PlanId(notification.planId()); + final var datasetId = notification.datasetId(); - final Optional owner = stores.results().claim(planId, datasetId); - if (owner.isEmpty()) continue; + final Optional owner = stores.results().claim(planId, datasetId); + if (owner.isEmpty()) continue; - final var revisionData = new PostgresPlanRevisionData( - notification.modelRevision(), - notification.planRevision(), - notification.simulationRevision(), - notification.simulationTemplateRevision()); - final ResultsProtocol.WriterRole writer = owner.get(); - try { - simulationAgent.simulate(planId, revisionData, writer); - } catch (final Throwable ex) { - ex.printStackTrace(System.err); - writer.failWith(b -> b - .type("UNEXPECTED_SIMULATION_EXCEPTION") - .message("Something went wrong while simulating") - .trace(ex)); + final var revisionData = new PostgresPlanRevisionData( + notification.modelRevision(), + notification.planRevision(), + notification.simulationRevision(), + notification.simulationTemplateRevision()); + final ResultsProtocol.WriterRole writer = owner.get(); + try { + simulationAgent.simulate(planId, revisionData, writer); + } catch (final Throwable ex) { + ex.printStackTrace(System.err); + writer.failWith(b -> b + .type("UNEXPECTED_SIMULATION_EXCEPTION") + .message("Something went wrong while simulating") + .trace(ex)); + } } + } finally { + // Kill the listening thread + listenThread.interrupt(); } } diff --git a/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/ListenSchedulerCapability.java b/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/ListenSchedulerCapability.java index 83ae9e44ec..d6625ad409 100644 --- a/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/ListenSchedulerCapability.java +++ b/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/ListenSchedulerCapability.java @@ -28,8 +28,8 @@ public ListenSchedulerCapability( this.notificationQueue = notificationQueue; } - public void registerListener() { - new Thread(() -> { + public Thread registerListener() { + final var listenerThread = new Thread(() -> { try (final var connection = this.dataSource.getConnection()) { try (final var listenSimulationStatusAction = new ListenSchedulingRequestStatusAction(connection)) { listenSimulationStatusAction.apply(); @@ -37,7 +37,7 @@ public void registerListener() { throw new DatabaseException("Failed to register as LISTEN to postgres database.", ex); } - while (true) { + while (!Thread.currentThread().isInterrupted()) { final var pgConnection = connection.unwrap(PGConnection.class); final var notifications = pgConnection.getNotifications(10000); if (notifications != null) { @@ -54,16 +54,20 @@ public void registerListener() { try { this.notificationQueue.put(notificationPayload); } catch (InterruptedException e) { - // We do not expect this thread to be interrupted. If it is, exit gracefully: + // This thread will be interrupted when the worker's main loop exits, so it should exit gracefully: + logger.info("Listener has been interrupted"); return; } } } } } + logger.info("Listener has received interrupted signal"); } catch (SQLException e) { throw new DatabaseException("Listener encountered exception", e); } - }).start(); + }); + listenerThread.start(); + return listenerThread; } } diff --git a/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/SchedulerWorkerAppDriver.java b/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/SchedulerWorkerAppDriver.java index f1682b818b..5f504fb6db 100644 --- a/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/SchedulerWorkerAppDriver.java +++ b/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/SchedulerWorkerAppDriver.java @@ -71,30 +71,34 @@ public static void main(String[] args) throws Exception { final var notificationQueue = new LinkedBlockingQueue(); final var listenAction = new ListenSchedulerCapability(hikariDataSource, notificationQueue); - listenAction.registerListener(); - - final var app = Javalin.create().start(8080); - app.get("/health", ctx -> ctx.status(200)); - - while (true) { - final var notification = notificationQueue.take(); - final var specificationRevision = notification.specificationRevision(); - final var specificationId = new SpecificationId(notification.specificationId()); - - final Optional owner = stores.results().claim(specificationId); - if (owner.isEmpty()) continue; - - final var revisionData = new SpecificationRevisionData(specificationRevision); - final ResultsProtocol.WriterRole writer = owner.get(); - try { - scheduleAgent.schedule(new ScheduleRequest(specificationId, revisionData), writer); - } catch (final Throwable ex) { - ex.printStackTrace(System.err); - writer.failWith(b -> b - .type("UNEXPECTED_SCHEDULER_EXCEPTION") - .message("Something went wrong while scheduling") - .trace(ex)); + final var listenThread = listenAction.registerListener(); + + try(final var app = Javalin.create().start(8080)) { + app.get("/health", ctx -> ctx.status(200)); + + while (true) { + final var notification = notificationQueue.take(); + final var specificationRevision = notification.specificationRevision(); + final var specificationId = new SpecificationId(notification.specificationId()); + + final Optional owner = stores.results().claim(specificationId); + if (owner.isEmpty()) continue; + + final var revisionData = new SpecificationRevisionData(specificationRevision); + final ResultsProtocol.WriterRole writer = owner.get(); + try { + scheduleAgent.schedule(new ScheduleRequest(specificationId, revisionData), writer); + } catch (final Throwable ex) { + ex.printStackTrace(System.err); + writer.failWith(b -> b + .type("UNEXPECTED_SCHEDULER_EXCEPTION") + .message("Something went wrong while scheduling") + .trace(ex)); + } } + } finally { + // Kill the listen thread + listenThread.interrupt(); } } From e8ebffb16d74c802015fe875f9c139d0d58dae80 Mon Sep 17 00:00:00 2001 From: Theresa Kamerman Date: Thu, 5 Oct 2023 09:52:54 -0700 Subject: [PATCH 2/2] Shut worker down if listening thread goes down --- .../jpl/aerie/merlin/worker/MerlinWorkerAppDriver.java | 6 ++++-- .../aerie/scheduler/worker/SchedulerWorkerAppDriver.java | 7 +++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/MerlinWorkerAppDriver.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/MerlinWorkerAppDriver.java index 9773a81435..daa4adf4ed 100644 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/MerlinWorkerAppDriver.java +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/MerlinWorkerAppDriver.java @@ -21,6 +21,7 @@ import java.time.Instant; import java.util.Optional; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; public final class MerlinWorkerAppDriver { public static void main(String[] args) throws InterruptedException { @@ -67,8 +68,9 @@ public static void main(String[] args) throws InterruptedException { try (final var app = Javalin.create().start(8080)) { app.get("/health", ctx -> ctx.status(200)); - while (true) { - final var notification = notificationQueue.take(); + while (listenThread.isAlive()) { + final var notification = notificationQueue.poll(1, TimeUnit.MINUTES); + if(notification == null) continue; final var planId = new PlanId(notification.planId()); final var datasetId = notification.datasetId(); diff --git a/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/SchedulerWorkerAppDriver.java b/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/SchedulerWorkerAppDriver.java index 5f504fb6db..7bc698ae0e 100644 --- a/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/SchedulerWorkerAppDriver.java +++ b/scheduler-worker/src/main/java/gov/nasa/jpl/aerie/scheduler/worker/SchedulerWorkerAppDriver.java @@ -5,6 +5,8 @@ import java.nio.file.Path; import java.util.Optional; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import gov.nasa.jpl.aerie.scheduler.server.ResultsProtocol; @@ -76,8 +78,9 @@ public static void main(String[] args) throws Exception { try(final var app = Javalin.create().start(8080)) { app.get("/health", ctx -> ctx.status(200)); - while (true) { - final var notification = notificationQueue.take(); + while (listenThread.isAlive()) { + final var notification = notificationQueue.poll(1, TimeUnit.MINUTES); + if (notification == null) continue; final var specificationRevision = notification.specificationRevision(); final var specificationId = new SpecificationId(notification.specificationId());