diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java index 28a997e5160..5e145773772 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java @@ -80,6 +80,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.util.BulkRequestTarget.PID; import org.dcache.services.bulk.util.BulkRequestTargetBuilder; import org.dcache.services.bulk.util.BulkServiceStatistics; +import org.dcache.util.BoundedCachedExecutor; import org.dcache.util.list.ListDirectoryHandler; import org.dcache.vehicles.FileAttributes; import org.slf4j.Logger; @@ -102,6 +103,9 @@ public final class RequestContainerJobFactory { private BulkServiceStatistics statistics; private Semaphore dirListSemaphore; private Semaphore inFlightSemaphore; + private BoundedCachedExecutor taskExecutor; + private BoundedCachedExecutor callbackExecutor; + private BoundedCachedExecutor listExecutor; public BulkRequestContainerJob createRequestJob(BulkRequest request) throws BulkServiceException { @@ -132,6 +136,9 @@ public BulkRequestContainerJob createRequestJob(BulkRequest request) containerJob.setListHandler(listHandler); containerJob.setDirListSemaphore(dirListSemaphore); containerJob.setInFlightSemaphore(inFlightSemaphore); + containerJob.setExecutor(taskExecutor); + containerJob.setListExecutor(listExecutor); + containerJob.setCallbackExecutor(callbackExecutor); containerJob.initialize(); return containerJob; } @@ -149,11 +156,21 @@ public void setActivityFactory(BulkActivityFactory activityFactory) { this.activityFactory = activityFactory; } + @Required + public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) { + this.callbackExecutor = callbackExecutor; + } + @Required public void setListHandler(ListDirectoryHandler listHandler) { this.listHandler = listHandler; } + @Required + public void setListExecutor(BoundedCachedExecutor listExecutor) { + this.listExecutor = listExecutor; + } + @Required public void setDirListSemaphore(int permits) { dirListSemaphore = new Semaphore(permits); @@ -184,6 +201,11 @@ public void setTargetStore(BulkTargetStore targetStore) { this.targetStore = targetStore; } + @Required + public void setTaskExecutor(BoundedCachedExecutor taskExecutor) { + this.taskExecutor = taskExecutor; + } + BulkActivity create(BulkRequest request) throws BulkServiceException { String rid = request.getUid(); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java index 1bef438619c..aa7808a04c3 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java @@ -311,19 +311,9 @@ private ListMultimap userRequests() { private ExecutorService processorExecutorService; /** - * Thread dedicated to job callbacks. + * Thread dedicated to running containerJobs. */ - private ExecutorService callbackExecutor; - - /** - * Thread dedicated to directory listing. - */ - private ExecutorService listExecutor; - - /** - * Thread dedicated to jobs. - */ - private ExecutorService executorService; + private ExecutorService containerExecutor; /** * Records number of jobs and requests processed. @@ -430,24 +420,14 @@ public int getMaxActiveRequests() { return maxActiveRequests; } - @Required - public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) { - this.callbackExecutor = callbackExecutor; - } - @Required public void setCompletionHandler(BulkRequestCompletionHandler completionHandler) { this.completionHandler = completionHandler; } @Required - public void setExecutor(BoundedCachedExecutor pooledExecutor) { - this.executorService = pooledExecutor; - } - - @Required - public void setListExecutor(BoundedCachedExecutor listExecutor) { - this.listExecutor = listExecutor; + public void setContainerExecutor(BoundedCachedExecutor containerExecutor) { + this.containerExecutor = containerExecutor; } @Required @@ -543,14 +523,11 @@ void startJob(BulkRequestContainerJob job) { String key = job.getTarget().getKey(); LOGGER.trace("submitting job {} to executor, target {}.", key, job.getTarget()); - job.setExecutor(executorService); - job.setListExecutor(listExecutor); - job.setCallbackExecutor(callbackExecutor); job.setCallback(this); try { if (isJobValid(job)) { /* possibly cancelled in flight */ job.update(State.RUNNING); - executorService.submit(new FireAndForgetTask(job)); + containerExecutor.submit(new FireAndForgetTask(job)); } } catch (RuntimeException e) { job.getTarget().setErrorObject(e); diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index 78909bb1b51..603d7a400bb 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -67,13 +67,21 @@ - + Used to execute jobs that are executed by a batch container. - - - - - + + + + + + + + + + + + + @@ -209,6 +217,9 @@ + + + @@ -243,21 +254,7 @@ - - - - - - - - - - - - - - - + @@ -299,6 +296,9 @@ + + + diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index 8a2ef72d8c6..6876b56bba5 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -69,7 +69,7 @@ bulk.request-scheduler=org.dcache.services.bulk.manager.scheduler.LeastRecentFir # bulk.limits.container-processing-threads=100 bulk.limits.incoming-request-threads=10 -bulk.limits.cancellation-threads=10 +bulk.limits.cancellation-threads=${bulk.limits.container-processing-threads} # ---- Expiration of the cache serving to front the request storage. #