diff --git a/lib/build.gradle.kts b/lib/build.gradle.kts index 63c5cccd..3c694ff9 100644 --- a/lib/build.gradle.kts +++ b/lib/build.gradle.kts @@ -8,7 +8,7 @@ plugins { } group = "org.veupathdb.lib" -version = "1.7.5" +version = "1.8.0" dependencies { @@ -23,7 +23,7 @@ dependencies { implementation("org.veupathdb.lib:jackson-singleton:3.1.1") // DB - implementation("com.zaxxer:HikariCP:5.0.1") + implementation("com.zaxxer:HikariCP:5.1.0") implementation("org.postgresql:postgresql:42.7.3") // S3 @@ -31,7 +31,7 @@ dependencies { api("org.veupathdb.lib.s3:workspaces:4.1.2") // Rabbit - implementation("org.veupathdb.lib:rabbit-job-queue:1.2.1") + implementation("org.veupathdb.lib:rabbit-job-queue:2.0.0") // Metrics implementation("io.prometheus:simpleclient:0.16.0") diff --git a/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/config/AsyncQueueConfig.kt b/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/config/AsyncQueueConfig.kt index c21b67a8..91b30908 100644 --- a/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/config/AsyncQueueConfig.kt +++ b/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/config/AsyncQueueConfig.kt @@ -1,7 +1,11 @@ package org.veupathdb.lib.compute.platform.config +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes + private const val DefaultRabbitMQPort = 5672 private const val DefaultWorkerCount = 5 +private const val DefaultMessageAckTimeoutMinutes = 30 /** * Configuration entry for a single RabbitMQ queue. @@ -29,6 +33,14 @@ private const val DefaultWorkerCount = 5 * queue. * * Default value is `5`. + * + * @param messageAckTimeout Timeout window in which a queue message must be + * acknowledged. RabbitMQ will kill channels on which a message has not been + * acknowledged within this window. + * + * This value *MUST* be at least 5 minutes. + * + * Default value is 30 minutes. */ class AsyncQueueConfig( internal val id: String, @@ -37,6 +49,7 @@ class AsyncQueueConfig( internal val host: String, internal val port: Int, internal val workers: Int, + internal val messageAckTimeout: Duration, ) { /** @@ -55,7 +68,7 @@ class AsyncQueueConfig( * @param host Hostname for the target RabbitMQ instance. */ constructor(id: String, username: String, password: String, host: String) : - this(id, username, password, host, DefaultRabbitMQPort, DefaultWorkerCount) + this(id, username, password, host, DefaultRabbitMQPort, DefaultWorkerCount, DefaultMessageAckTimeoutMinutes.minutes) /** * Constructs a new [AsyncQueueConfig] instance. @@ -78,7 +91,7 @@ class AsyncQueueConfig( * Default value is `5`. */ constructor(id: String, username: String, password: String, host: String, workers: Int) : - this(id, username, password, host, DefaultRabbitMQPort, workers) + this(id, username, password, host, DefaultRabbitMQPort, workers, DefaultMessageAckTimeoutMinutes.minutes) companion object { @JvmStatic @@ -135,6 +148,17 @@ class AsyncQueueConfig( */ var workers = DefaultWorkerCount + /** + * Message acknowledgement timeout value to set for this queue. + * + * This value should be long enough to accommodate the longest expected job + * runtimes for the queue. RabbitMQ itself will disconnect the queue if a + * message is not acknowledged within the configured timeout window. + * + * Default value is 30 minutes. + */ + var messageAckTimeout = DefaultMessageAckTimeoutMinutes.minutes + /** * Sets the unique identifier for the queue. */ @@ -183,6 +207,17 @@ class AsyncQueueConfig( return this } + /** + * Sets the [messageAckTimeout] value to the given duration. + */ + fun messageAckTimeout(timeout: Duration) = apply { this.messageAckTimeout = timeout } + + /** + * Sets the [messageAckTimeout] value to a duration of the given value in + * minutes. + */ + fun messageAckTimeoutMinutes(timeout: Int) = apply { this.messageAckTimeout = timeout.minutes } + fun build(): AsyncQueueConfig { // We check null and blank because these are likely coming from env vars // and docker compose will set blank values for vars defined in the @@ -208,7 +243,11 @@ class AsyncQueueConfig( if (host!!.isBlank()) throw IllegalStateException("Cannot build an AsyncQueueConfig instance with a blank host!") - return AsyncQueueConfig(id!!, username!!, password!!, host!!, port, workers) + if (messageAckTimeout < 5.minutes) + throw IllegalStateException("Message ack timeout values less than 5 minutes are likely to cause undefined" + + " behavior in RabbitMQ") + + return AsyncQueueConfig(id!!, username!!, password!!, host!!, port, workers, messageAckTimeout) } } } \ No newline at end of file diff --git a/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/queues/QueueWrapper.kt b/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/queues/QueueWrapper.kt index 4e46e39b..86315bf7 100644 --- a/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/queues/QueueWrapper.kt +++ b/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/queues/QueueWrapper.kt @@ -4,17 +4,15 @@ import com.fasterxml.jackson.databind.JsonNode import org.slf4j.LoggerFactory import org.veupathdb.lib.compute.platform.JobManager import org.veupathdb.lib.compute.platform.config.AsyncQueueConfig -import org.veupathdb.lib.compute.platform.intern.db.QueueDB import org.veupathdb.lib.compute.platform.intern.jobs.JobExecContext import org.veupathdb.lib.compute.platform.intern.jobs.JobExecutors import org.veupathdb.lib.compute.platform.intern.metrics.JobMetrics import org.veupathdb.lib.compute.platform.intern.metrics.QueueMetrics -import org.veupathdb.lib.compute.platform.intern.s3.S3 import org.veupathdb.lib.compute.platform.job.PlatformJobResultStatus import org.veupathdb.lib.hash_id.HashID -import org.veupathdb.lib.rabbit.jobs.QueueConfig -import org.veupathdb.lib.rabbit.jobs.QueueDispatcher -import org.veupathdb.lib.rabbit.jobs.QueueWorker +import org.veupathdb.lib.rabbit.jobs.JobQueueDispatcher +import org.veupathdb.lib.rabbit.jobs.JobQueueExecutor +import org.veupathdb.lib.rabbit.jobs.config.QueueConfig import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification import org.veupathdb.lib.rabbit.jobs.model.JobDispatch import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification @@ -33,31 +31,36 @@ internal class QueueWrapper(conf: AsyncQueueConfig) { val name = conf.id - private val dispatch: QueueDispatcher - private val handler: QueueWorker + private val dispatch: JobQueueDispatcher + private val handler: JobQueueExecutor init { Log.info("initializing queue wrapper for queue {}", name) - val qc = QueueConfig().also { - it.hostname = conf.host - it.hostPort = conf.port - it.username = conf.username - it.password = conf.password - it.workers = conf.workers - - it.jobQueueName = "${conf.id}_jobs" - it.successQueueName = "${conf.id}_success" - it.errorQueueName = "${conf.id}_error" - } + val qc = QueueConfig() + .connection { + hostname = conf.host + hostPort = conf.port + username = conf.username + password = conf.password + } + .executor { + workers = conf.workers + maxJobExecutionTime = conf.messageAckTimeout + } + .apply { + jobQueueName = "${conf.id}_jobs" + successQueueName = "${conf.id}_success" + errorQueueName = "${conf.id}_error" + } // Setup dispatch end of queue the wrapper - dispatch = QueueDispatcher(qc) + dispatch = JobQueueDispatcher(qc) dispatch.onError(this::onError) dispatch.onSuccess(this::onSuccess) // Setup worker end of the queue wrapper - handler = QueueWorker(qc) + handler = JobQueueExecutor(qc) handler.onJob(this::onJob) } @@ -98,7 +101,7 @@ internal class QueueWrapper(conf: AsyncQueueConfig) { } } catch (e: Throwable) { Log.error("job execution failed with an exception for job ${job.jobID}", e) - handler.sendError(ErrorNotification(job.jobID, 1, e.message)) + handler.sendError(ErrorNotification(jobID = job.jobID, code = 1, message = e.message)) } } } \ No newline at end of file diff --git a/readme.adoc b/readme.adoc index 1fa5565c..b22f37d2 100644 --- a/readme.adoc +++ b/readme.adoc @@ -29,7 +29,7 @@ image::docs/assets/overview.png[] [source, kotlin] ---- dependencies { - implementation("org.veupathdb.lib:compute-platform:1.6.1") + implementation("org.veupathdb.lib:compute-platform:1.8.0") } ---- diff --git a/test/docker-compose.test.yml b/test/docker-compose.test.yml index b8a3f95f..fb779797 100644 --- a/test/docker-compose.test.yml +++ b/test/docker-compose.test.yml @@ -23,7 +23,7 @@ services: POSTGRES_PASSWORD: password rabbit: - image: rabbitmq:3.11.10-management-alpine + image: rabbitmq:3.13.3-management-alpine ports: - "5672:5672" diff --git a/test/src/main/kotlin/lcp/main.kt b/test/src/main/kotlin/lcp/main.kt index 485bff33..7431d9e2 100644 --- a/test/src/main/kotlin/lcp/main.kt +++ b/test/src/main/kotlin/lcp/main.kt @@ -6,12 +6,21 @@ import org.veupathdb.lib.compute.platform.config.* import org.veupathdb.lib.compute.platform.job.JobContext import org.veupathdb.lib.compute.platform.job.JobExecutor import org.veupathdb.lib.compute.platform.job.JobResult +import org.veupathdb.lib.s3.s34k.S3Api +import org.veupathdb.lib.s3.s34k.S3Config +import org.veupathdb.lib.s3.s34k.fields.BucketName +import kotlin.time.Duration.Companion.minutes private val Log = LoggerFactory.getLogger("main.kt") fun main() { + // Init minio + setupS3ForTests() + + // Init test target initPlatform() + // Run test Log.info("{}", AsyncPlatform.listJobReferences()) } @@ -23,10 +32,10 @@ private fun initPlatform() { .port(5432) .username("postgres") .password("password") - .poolSize(1) + .poolSize(5) .build()) .s3Config(AsyncS3Config("localhost", 9000, false, "derp", "minioadmin", "minioadmin", "flumps")) - .addQueue(AsyncQueueConfig("queue", "guest", "guest", "localhost", 5672, 1)) + .addQueue(AsyncQueueConfig("queue", "guest", "guest", "localhost", 5672, 1, 5.minutes)) .jobConfig(AsyncJobConfig({ Executor() }, 1)) .localWorkspaceRoot("/tmp/florp") .build()) @@ -39,4 +48,9 @@ class Executor : JobExecutor { log.info("I'm job {}!", ctx.jobID) return JobResult.success() } +} + +private fun setupS3ForTests() { + val client = S3Api.newClient(S3Config("localhost", 9000u, false, "minioadmin", "minioadmin")) + client.buckets.createIfNotExists(BucketName("derp")) } \ No newline at end of file