diff --git a/src/main/java/cz/cvut/kbss/termit/event/ClearLongRunningTaskQueueEvent.java b/src/main/java/cz/cvut/kbss/termit/event/ClearLongRunningTaskQueueEvent.java new file mode 100644 index 000000000..b83eda541 --- /dev/null +++ b/src/main/java/cz/cvut/kbss/termit/event/ClearLongRunningTaskQueueEvent.java @@ -0,0 +1,12 @@ +package cz.cvut.kbss.termit.event; + +import org.springframework.context.ApplicationEvent; + +/** + * Indicates that the long-running task queue should be cleared. + */ +public class ClearLongRunningTaskQueueEvent extends ApplicationEvent { + public ClearLongRunningTaskQueueEvent(Object source) { + super(source); + } +} diff --git a/src/main/java/cz/cvut/kbss/termit/rest/AdminController.java b/src/main/java/cz/cvut/kbss/termit/rest/AdminController.java index 9e3a1da40..520a49086 100644 --- a/src/main/java/cz/cvut/kbss/termit/rest/AdminController.java +++ b/src/main/java/cz/cvut/kbss/termit/rest/AdminController.java @@ -59,4 +59,15 @@ public void invalidateCaches() { LOG.debug("Cache invalidation request received from client."); adminBean.invalidateCaches(); } + + @Operation(security = {@SecurityRequirement(name = "bearer-key")}, + description = "Clears the queue of long-running tasks.") + @ApiResponse(responseCode = "204", description = "Long-running tasks queue cleared.") + @PreAuthorize("hasRole('" + SecurityConstants.ROLE_ADMIN + "')") + @DeleteMapping("/long-running-tasks") + @ResponseStatus(HttpStatus.NO_CONTENT) + public void clearLongRunningTasksQueue() { + LOG.debug("Long-running task queue clearing request received from client."); + adminBean.clearLongRunningTasksQueue(); + } } diff --git a/src/main/java/cz/cvut/kbss/termit/service/jmx/AppAdminBean.java b/src/main/java/cz/cvut/kbss/termit/service/jmx/AppAdminBean.java index d81a2ec6c..94d5db6b0 100644 --- a/src/main/java/cz/cvut/kbss/termit/service/jmx/AppAdminBean.java +++ b/src/main/java/cz/cvut/kbss/termit/service/jmx/AppAdminBean.java @@ -17,6 +17,7 @@ */ package cz.cvut.kbss.termit.service.jmx; +import cz.cvut.kbss.termit.event.ClearLongRunningTaskQueueEvent; import cz.cvut.kbss.termit.event.EvictCacheEvent; import cz.cvut.kbss.termit.event.RefreshLastModifiedEvent; import cz.cvut.kbss.termit.rest.dto.HealthInfo; @@ -67,6 +68,12 @@ public void invalidateCaches() { eventPublisher.publishEvent(new RefreshLastModifiedEvent(this)); } + @ManagedOperation(description = "Clears the queue of long-running tasks.") + public void clearLongRunningTasksQueue() { + LOG.info("Clearing long-running tasks queue..."); + eventPublisher.publishEvent(new ClearLongRunningTaskQueueEvent(this)); + } + @ManagedOperation(description = "Sends test email to the specified address.") public void sendTestEmail(String address) { final Message message = Message.to(address).subject("TermIt Test Email") diff --git a/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTasksRegistry.java b/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTasksRegistry.java index c8c6e31cf..5722d9b48 100644 --- a/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTasksRegistry.java +++ b/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTasksRegistry.java @@ -1,16 +1,21 @@ package cz.cvut.kbss.termit.util.longrunning; +import cz.cvut.kbss.termit.event.ClearLongRunningTaskQueueEvent; import cz.cvut.kbss.termit.event.LongRunningTaskChangedEvent; import jakarta.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; @Component public class LongRunningTasksRegistry { @@ -38,6 +43,28 @@ public void onTaskChanged(@Nonnull final LongRunningTask task) { eventPublisher.publishEvent(new LongRunningTaskChangedEvent(this, status)); } + @Order(Ordered.LOWEST_PRECEDENCE) + @EventListener(ClearLongRunningTaskQueueEvent.class) + public void onClearLongRunningTaskQueueEvent() { + AtomicInteger count = new AtomicInteger(); + LOG.info("Clearing long running task registry..."); + + registry.entrySet().removeIf(entry -> { + if (!entry.getValue().isRunning()) { + count.incrementAndGet(); + return true; + } + return false; + }); + performCleanup(); + + if (count.get() > 0) { + LOG.warn("Cleared {} non-running tasks from the registry", count.get()); + } else { + LOG.info("Long running task registry cleared."); + } + } + private void handleTaskChanged(@Nonnull final LongRunningTask task) { if(task.isDone()) { registry.remove(task.getUuid()); @@ -45,6 +72,10 @@ private void handleTaskChanged(@Nonnull final LongRunningTask task) { registry.put(task.getUuid(), task); } + performCleanup(); + } + + private void performCleanup() { // perform cleanup registry.forEach((key, value) -> { if (value.isDone()) { diff --git a/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java b/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java index fdb1cfc99..b3bb7af03 100644 --- a/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java +++ b/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java @@ -1,6 +1,7 @@ package cz.cvut.kbss.termit.util.throttle; import cz.cvut.kbss.termit.TermItApplication; +import cz.cvut.kbss.termit.event.ClearLongRunningTaskQueueEvent; import cz.cvut.kbss.termit.exception.TermItException; import cz.cvut.kbss.termit.exception.ThrottleAspectException; import cz.cvut.kbss.termit.util.Configuration; @@ -18,6 +19,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Profile; import org.springframework.context.annotation.Scope; +import org.springframework.context.event.EventListener; +import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.expression.EvaluationContext; import org.springframework.expression.EvaluationException; @@ -39,6 +42,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -182,6 +186,51 @@ private static StandardEvaluationContext makeDefaultContext() { return standardEvaluationContext; } + /** + * Prevents accepting new tasks by synchronization + * and cancels all scheduled tasks. + */ + @Order(Ordered.HIGHEST_PRECEDENCE) + @EventListener(ClearLongRunningTaskQueueEvent.class) + public void onClearLongRunningTaskQueueEvent() { + synchronized (throttledFutures) { // synchronize in the filed declaration order + synchronized (lastRun) { + synchronized (scheduledFutures) { + LOG.info("Clearing throttled tasks..."); + + long count = 0; + Iterator>> throttledIt = + throttledFutures.entrySet().iterator(); + + while(throttledIt.hasNext()) { + final Map.Entry> entry = throttledIt.next(); + final ThrottledFuture future = entry.getValue(); + final Identifier identifier = entry.getKey(); + if(future.isRunning() || future.isDone()) continue; + + // cancel the throttled future + future.cancel(false); + // cancel the scheduled future + Optional.ofNullable(scheduledFutures.get(identifier)) + .ifPresent(scheduled -> { + scheduled.cancel(false); + if (scheduled.isCancelled()) { + scheduledFutures.remove(identifier); + } + }); + if (future.isCancelled()) { + throttledIt.remove(); + } + count++; + notifyTaskChanged(future); + } + clearOldFutures(); + LOG.info("Cancelled {} pending throttled tasks", count); + } + } + } + } + /** * @return future or null * @throws TermItException when the target method throws