From be306f450e3300782cd3d065eb46a6faefa6b07d Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Thu, 5 Oct 2023 07:12:44 -0500 Subject: [PATCH] dcache-bulk: refine container executor model Motivation: In https://rb.dcache.org/r/14115 master@8a5c358af45586383d87873952407c646f81f4c6 the container executor model was made more like SRM by using an unbounded cached executor. While this brought Bulk performance in line with SRM, it also drives memory usage to be very near to physical memory. In JFR, after a sustained usage/submission of about 1700 requests of 10K targets each over 6 hours, one can observe this warning (see attached). However, a totally unbounded executor is not necessary. All that is required is an executor which has enough slots to accommodate the in-flight semaphore. Modification: Split the pooled executor into a main and a callback executor, each initialized with max threads equal to the in-flight semaphore value. Use the callback executor exclusively for message/activity callbacks (instead of direct executor). Result: Same performance as before, but with a total memory footprint at a little more than half available physical memory instead of being very close to it. Target: master Request: 9.2 Requires-notes: yes Patch: https://rb.dcache.org/r/14118/ Acked-by: Tigran Acked-by: Lea --- .../bulk/job/BulkRequestContainerJob.java | 12 ++++++---- .../manager/ConcurrentRequestManager.java | 24 +++++++++++++++---- .../org/dcache/services/bulk/bulk.xml | 10 ++++++++ 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java index f99aa52c3c2..b1444ab34b2 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -73,7 +73,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import com.google.common.collect.Range; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import diskCacheV111.util.CacheException; import diskCacheV111.util.FsPath; import diskCacheV111.util.NamespaceHandlerAware; @@ -480,7 +479,7 @@ public void failure(int rc, Object error) { remove(); } } - }, MoreExecutors.directExecutor()); + }, callbackExecutor); } /** @@ -514,7 +513,7 @@ public void failure(int rc, Object error) { remove(); } } - }, MoreExecutors.directExecutor()); + }, callbackExecutor); } /** @@ -567,7 +566,7 @@ private void performActivity(boolean async) throws InterruptedException { try { activityFuture = activity.perform(ruid, id == null ? seqNo : id, path, attributes); if (async) { - activityFuture.addListener(() -> handleCompletion(), executor); + activityFuture.addListener(() -> handleCompletion(), callbackExecutor); } } catch (BulkServiceException | UnsupportedOperationException e) { LOGGER.error("{}, perform failed for {}: {}", ruid, target, e.getMessage()); @@ -669,6 +668,7 @@ private void storeOrUpdate(Throwable error) throws InterruptedException { private SignalAware callback; private Thread runThread; private ExecutorService executor; + private ExecutorService callbackExecutor; private Semaphore dirListSemaphore; private Semaphore inFlightSemaphore; @@ -839,6 +839,10 @@ public void setListHandler(ListDirectoryHandler listHandler) { this.listHandler = listHandler; } + public void setCallbackExecutor(ExecutorService callbackExecutor) { + this.callbackExecutor = callbackExecutor; + } + public void setExecutor(ExecutorService executor) { this.executor = executor; } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java index 7179b18d0df..951841c7143 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java @@ -92,6 +92,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.util.BulkRequestTarget; import org.dcache.services.bulk.util.BulkRequestTarget.State; import org.dcache.services.bulk.util.BulkServiceStatistics; +import org.dcache.util.BoundedCachedExecutor; import org.dcache.util.FireAndForgetTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -309,10 +310,15 @@ private ListMultimap userRequests() { */ private ExecutorService processorExecutorService; + /** + * Thread dedicated to job callbacks. + */ + private ExecutorService callbackExecutor; + /** * Thread dedicated to jobs. */ - private ExecutorService pooledExecutorService; + private ExecutorService executorService; /** * Records number of jobs and requests processed. @@ -357,7 +363,6 @@ public void initialize() throws Exception { schedulerProvider.initialize(); processor = new ConcurrentRequestProcessor(schedulerProvider.getRequestScheduler()); processorExecutorService = Executors.newSingleThreadScheduledExecutor(); - pooledExecutorService = Executors.newCachedThreadPool(); processorFuture = processorExecutorService.submit(processor); } @@ -420,11 +425,21 @@ public int getMaxActiveRequests() { return maxActiveRequests; } + @Required + public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) { + this.callbackExecutor = callbackExecutor; + } + @Required public void setCompletionHandler(BulkRequestCompletionHandler completionHandler) { this.completionHandler = completionHandler; } + @Required + public void setExecutor(BoundedCachedExecutor pooledExecutor) { + this.executorService = pooledExecutor; + } + @Required public void setTargetStore(BulkTargetStore targetStore) { this.targetStore = targetStore; @@ -518,12 +533,13 @@ void startJob(BulkRequestContainerJob job) { String key = job.getTarget().getKey(); LOGGER.trace("submitting job {} to executor, target {}.", key, job.getTarget()); - job.setExecutor(pooledExecutorService); + job.setExecutor(executorService); + job.setCallbackExecutor(callbackExecutor); job.setCallback(this); try { if (isJobValid(job)) { /* possibly cancelled in flight */ job.update(State.RUNNING); - pooledExecutorService.submit(new FireAndForgetTask(job)); + executorService.submit(new FireAndForgetTask(job)); } } catch (RuntimeException e) { job.getTarget().setErrorObject(e); diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index 097bea036be..ae4331a8a30 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -243,6 +243,16 @@ + + + + + + + + + +