From be33967d5faf0028c9af4087b459e6110c3fd3fc Mon Sep 17 00:00:00 2001 From: Jorge Aguilera Date: Wed, 12 Jun 2024 18:32:55 +0200 Subject: [PATCH] add constraint spec Signed-off-by: Jorge Aguilera --- .../main/nextflow/nomad/NomadConfig.groovy | 48 ++++-- .../nomad/config/ConstraintSpec.groovy | 38 +++++ .../nomad/executor/NomadService.groovy | 16 ++ .../nextflow/nomad/NomadConfigSpec.groovy | 17 ++ .../nomad/executor/NomadServiceSpec.groovy | 146 +++++++++++++++--- 5 files changed, 237 insertions(+), 28 deletions(-) create mode 100644 plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintSpec.groovy diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy index a5f0f3f..0716b1f 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy @@ -19,6 +19,7 @@ package nextflow.nomad import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.nomad.config.AffinitySpec +import nextflow.nomad.config.ConstraintSpec import nextflow.nomad.config.VolumeSpec /** @@ -61,6 +62,7 @@ class NomadConfig { final String dockerVolume final VolumeSpec volumeSpec final AffinitySpec affinitySpec + final ConstraintSpec constraintSpec NomadJobOpts(Map nomadJobOpts){ deleteOnCompletion = nomadJobOpts.containsKey("deleteOnCompletion") ? @@ -78,25 +80,51 @@ class NomadConfig { if( dockerVolume ){ log.info "dockerVolume config will be deprecated, use volume type:'docker' name:'name' instead" } + + this.volumeSpec = parseVolume(nomadJobOpts) + this.affinitySpec = parseAffinity(nomadJobOpts) + this.constraintSpec = parseConstraint(nomadJobOpts) + } + + VolumeSpec parseVolume(Map nomadJobOpts){ if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Closure){ - this.volumeSpec = new VolumeSpec() + def volumeSpec = new VolumeSpec() def closure = (nomadJobOpts.volume as Closure) - def clone = closure.rehydrate(this.volumeSpec, closure.owner, closure.thisObject) + def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject) clone.resolveStrategy = Closure.DELEGATE_FIRST clone() - this.volumeSpec.validate() + volumeSpec.validate() + volumeSpec }else{ - volumeSpec = null + null } - if( nomadJobOpts.affinity && nomadJobOpts.affinity instanceof Closure){ - this.affinitySpec = new AffinitySpec() + } + + AffinitySpec parseAffinity(Map nomadJobOpts) { + if (nomadJobOpts.affinity && nomadJobOpts.affinity instanceof Closure) { + def affinitySpec = new AffinitySpec() def closure = (nomadJobOpts.affinity as Closure) - def clone = closure.rehydrate(this.affinitySpec, closure.owner, closure.thisObject) + def clone = closure.rehydrate(affinitySpec, closure.owner, closure.thisObject) clone.resolveStrategy = Closure.DELEGATE_FIRST clone() - this.affinitySpec.validate() - }else{ - affinitySpec = null + affinitySpec.validate() + affinitySpec + } else { + null + } + } + + ConstraintSpec parseConstraint(Map nomadJobOpts){ + if (nomadJobOpts.constraint && nomadJobOpts.constraint instanceof Closure) { + def constraintSpec = new ConstraintSpec() + def closure = (nomadJobOpts.constraint as Closure) + def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + constraintSpec.validate() + constraintSpec + } else { + null } } } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintSpec.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintSpec.groovy new file mode 100644 index 0000000..905b956 --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintSpec.groovy @@ -0,0 +1,38 @@ +package nextflow.nomad.config + +class ConstraintSpec { + + private String attribute + private String operator + private String value + + String getOperator(){ + return operator + } + + String getAttribute() { + return attribute + } + + String getValue() { + return value + } + + ConstraintSpec attribute(String attribute){ + this.attribute=attribute + this + } + + ConstraintSpec operator(String operator){ + this.operator = operator + this + } + + ConstraintSpec value(String value){ + this.value = value + this + } + + void validate(){ + } +} \ No newline at end of file diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index d8da13b..0061488 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -23,6 +23,7 @@ import io.nomadproject.client.ApiClient import io.nomadproject.client.api.JobsApi import io.nomadproject.client.model.Affinity import io.nomadproject.client.model.AllocationListStub +import io.nomadproject.client.model.Constraint import io.nomadproject.client.model.Job import io.nomadproject.client.model.JobRegisterRequest import io.nomadproject.client.model.JobRegisterResponse @@ -198,6 +199,21 @@ class NomadService implements Closeable{ } taskDef.affinities([affinity]) } + + if( config.jobOpts.constraintSpec ){ + def constraint = new Constraint() + if(config.jobOpts.constraintSpec.attribute){ + constraint.ltarget(config.jobOpts.constraintSpec.attribute) + } + + constraint.operand(config.jobOpts.constraintSpec.operator ?: "=") + + if(config.jobOpts.constraintSpec.value){ + constraint.rtarget(config.jobOpts.constraintSpec.value) + } + taskDef.constraints([constraint]) + } + taskDef } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy index 391207c..81fcf03 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy @@ -181,4 +181,21 @@ class NomadConfigSpec extends Specification { config.jobOpts.affinitySpec.getValue() == '3' config.jobOpts.affinitySpec.getWeight() == 50 } + + void "should instantiate a constraint spec if specified"() { + when: + def config = new NomadConfig([ + jobs: [constraint : { + attribute '${meta.my_custom_value}' + operator ">" + value "3" + }] + ]) + + then: + config.jobOpts.constraintSpec + config.jobOpts.constraintSpec.getAttribute() == '${meta.my_custom_value}' + config.jobOpts.constraintSpec.getOperator() == '>' + config.jobOpts.constraintSpec.getValue() == '3' + } } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy index 78b7243..12b7896 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy @@ -30,6 +30,8 @@ import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer import spock.lang.Specification +import java.nio.file.Path + /** * Unit test for Nomad Service * @@ -224,7 +226,6 @@ class NomadServiceSpec extends Specification{ } -/* void "submit a task"(){ given: def config = new NomadConfig( @@ -239,9 +240,10 @@ class NomadServiceSpec extends Specification{ String image = "theImage" List args = ["theCommand", "theArgs"] Mapenv = [test:"test"] - Resources resources = new Resources().cores(1).memoryMB(1000) def mockTask = Mock(TaskRun){ + getName() >> name + getContainer() >> image getConfig() >> Mock(TaskConfig) getWorkDirStr() >> "theWorkingDir" getContainer() >> "ubuntu" @@ -250,8 +252,9 @@ class NomadServiceSpec extends Specification{ isFusionEnabled() >> false } } + getWorkDir() >> Path.of("/tmp") toTaskBean() >> Mock(TaskBean){ - getWorkDir() >> workDir + getWorkDir() >> Path.of("/tmp") getScript() >> "theScript" getShell() >> ["bash"] getInputFiles() >> [:] @@ -285,10 +288,10 @@ class NomadServiceSpec extends Specification{ body.Job.TaskGroups[0].Tasks.size() == 1 body.Job.TaskGroups[0].Tasks[0].Name == "nf-task" body.Job.TaskGroups[0].Tasks[0].Resources.Cores == 1 - body.Job.TaskGroups[0].Tasks[0].Resources.MemoryMB == 1000 + body.Job.TaskGroups[0].Tasks[0].Resources.MemoryMB == 500 body.Job.TaskGroups[0].Tasks[0].Driver == "docker" body.Job.TaskGroups[0].Tasks[0].Config.image == image - body.Job.TaskGroups[0].Tasks[0].Config.work_dir == workingDir + body.Job.TaskGroups[0].Tasks[0].Config.work_dir == "/tmp" body.Job.TaskGroups[0].Tasks[0].Config.command == args[0] body.Job.TaskGroups[0].Tasks[0].Config.args == args.drop(1) @@ -311,16 +314,35 @@ class NomadServiceSpec extends Specification{ String name = "theName" String image = "theImage" List args = ["theCommand", "theArgs"] - String workingDir = "a/b/c" + String workingDir = "/a/b/c" Mapenv = [test:"test"] - Resources resources = new Resources().cores(1).memoryMB(1000) + + def mockTask = Mock(TaskRun){ + getName() >> name + getContainer() >> image + getConfig() >> Mock(TaskConfig) + getWorkDirStr() >> workingDir + getContainer() >> "ubuntu" + getProcessor() >> Mock(TaskProcessor){ + getExecutor() >> Mock(Executor){ + isFusionEnabled() >> false + } + } + getWorkDir() >> Path.of(workingDir) + toTaskBean() >> Mock(TaskBean){ + getWorkDir() >> Path.of(workingDir) + getScript() >> "theScript" + getShell() >> ["bash"] + getInputFiles() >> [:] + } + } mockWebServer.enqueue(new MockResponse() .setBody(JsonOutput.toJson(["EvalID":"test"]).toString()) .addHeader("Content-Type", "application/json")); when: - def idJob = service.submitTask(id, name, image, args, workingDir,env,resources ) + def idJob = service.submitTask(id, mockTask, args, env) def recordedRequest = mockWebServer.takeRequest(); def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8()) @@ -341,6 +363,8 @@ class NomadServiceSpec extends Specification{ body.Job.TaskGroups[0].Name == "group" body.Job.TaskGroups[0].Tasks.size() == 1 body.Job.TaskGroups[0].Tasks[0].Name == "nf-task" + body.Job.TaskGroups[0].Tasks[0].Resources.Cores == 1 + body.Job.TaskGroups[0].Tasks[0].Resources.MemoryMB == 500 body.Job.TaskGroups[0].Tasks[0].Driver == "docker" body.Job.TaskGroups[0].Tasks[0].Config.image == image body.Job.TaskGroups[0].Tasks[0].Config.work_dir == workingDir @@ -350,19 +374,22 @@ class NomadServiceSpec extends Specification{ body.Job.TaskGroups[0].Volumes.size() == 1 body.Job.TaskGroups[0].Volumes['test'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"] body.Job.TaskGroups[0].Tasks[0].VolumeMounts.size() == 1 - body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"a", Volume:"test"] + body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"/a", Volume:"test"] } - void "should send the token"(){ + void "submit a task with an affinity"(){ given: def config = new NomadConfig( client:[ - address : "http://${mockWebServer.hostName}:${mockWebServer.port}", - token: "123456789012345678901234567" + address : "http://${mockWebServer.hostName}:${mockWebServer.port}" ], jobs:[ - dockerVolume:'test' + affinity: { + attribute '${meta.my_custom_value}' + operator ">" + value "3" + } ] ) def service = new NomadService(config) @@ -371,16 +398,35 @@ class NomadServiceSpec extends Specification{ String name = "theName" String image = "theImage" List args = ["theCommand", "theArgs"] - String workingDir = "a/b/c" + String workingDir = "/a/b/c" Mapenv = [test:"test"] - Resources resources = new Resources().cores(1).memoryMB(1000) + + def mockTask = Mock(TaskRun){ + getName() >> name + getContainer() >> image + getConfig() >> Mock(TaskConfig) + getWorkDirStr() >> workingDir + getContainer() >> "ubuntu" + getProcessor() >> Mock(TaskProcessor){ + getExecutor() >> Mock(Executor){ + isFusionEnabled() >> false + } + } + getWorkDir() >> Path.of(workingDir) + toTaskBean() >> Mock(TaskBean){ + getWorkDir() >> Path.of(workingDir) + getScript() >> "theScript" + getShell() >> ["bash"] + getInputFiles() >> [:] + } + } mockWebServer.enqueue(new MockResponse() .setBody(JsonOutput.toJson(["EvalID":"test"]).toString()) .addHeader("Content-Type", "application/json")); when: - def idJob = service.submitTask(id, name, image, args, workingDir,env, resources ) + def idJob = service.submitTask(id, mockTask, args, env) def recordedRequest = mockWebServer.takeRequest(); def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8()) @@ -390,8 +436,72 @@ class NomadServiceSpec extends Specification{ and: recordedRequest.method == "POST" recordedRequest.path == "/v1/jobs" - recordedRequest.headers.values('X-Nomad-Token').first()=='123456789012345678901234567' + + and: + body.Job.TaskGroups[0].Tasks[0].Affinities[0].LTarget == '${meta.my_custom_value}' + } + + void "submit a task with a constraint"(){ + given: + def config = new NomadConfig( + client:[ + address : "http://${mockWebServer.hostName}:${mockWebServer.port}" + ], + jobs:[ + constraint: { + attribute '${meta.my_custom_value}' + operator ">" + value "3" + } + ] + ) + def service = new NomadService(config) + + String id = "theId" + String name = "theName" + String image = "theImage" + List args = ["theCommand", "theArgs"] + String workingDir = "/a/b/c" + Mapenv = [test:"test"] + + def mockTask = Mock(TaskRun){ + getName() >> name + getContainer() >> image + getConfig() >> Mock(TaskConfig) + getWorkDirStr() >> workingDir + getContainer() >> "ubuntu" + getProcessor() >> Mock(TaskProcessor){ + getExecutor() >> Mock(Executor){ + isFusionEnabled() >> false + } + } + getWorkDir() >> Path.of(workingDir) + toTaskBean() >> Mock(TaskBean){ + getWorkDir() >> Path.of(workingDir) + getScript() >> "theScript" + getShell() >> ["bash"] + getInputFiles() >> [:] + } + } + + mockWebServer.enqueue(new MockResponse() + .setBody(JsonOutput.toJson(["EvalID":"test"]).toString()) + .addHeader("Content-Type", "application/json")); + when: + + def idJob = service.submitTask(id, mockTask, args, env) + def recordedRequest = mockWebServer.takeRequest(); + def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8()) + + then: + idJob + + and: + recordedRequest.method == "POST" + recordedRequest.path == "/v1/jobs" + + and: + body.Job.TaskGroups[0].Tasks[0].Constraints[0].LTarget == '${meta.my_custom_value}' } - */ }