Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accommodate Nextflow CPU/Memory in Nomad Job taskResources #27

Merged
merged 6 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.0.2
version=0.0.3-rc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.nomadproject.client.models.Job
import io.nomadproject.client.models.JobRegisterRequest
import io.nomadproject.client.models.JobRegisterResponse
import io.nomadproject.client.models.JobSummary
import io.nomadproject.client.models.Resources
import io.nomadproject.client.models.Task
import io.nomadproject.client.models.TaskGroup
import io.nomadproject.client.models.TaskGroupSummary
Expand All @@ -50,8 +51,10 @@ class NomadService implements Closeable{
this.config = config
ApiClient apiClient = new ApiClient()
apiClient.basePath = config.clientOpts.address
log.debug "[NOMAD] Client Address: ${config.clientOpts.address}"

if( config.clientOpts.token ){
log.debug "[NOMAD BATCH] Creating Nomad connection using token: ${config.clientOpts.token?.take(5)}.."
log.debug "[NOMAD] Client Token: ${config.clientOpts.token?.take(5)}.."
apiClient.apiKey = config.clientOpts.token
}
this.jobsApi = new JobsApi(apiClient);
Expand All @@ -64,24 +67,25 @@ class NomadService implements Closeable{
String submitTask(String id, String name, String image,
List<String> args,
String workingDir,
Map<String, String>env){
Map<String, String>env,
Resources resources){
Job job = new Job();
job.ID = id
job.name = name
job.type = "batch"
job.datacenters = this.config.jobOpts.datacenters
job.namespace = this.config.jobOpts.namespace

job.taskGroups = [createTaskGroup(id, name, image, args, workingDir, env)]
job.taskGroups = [createTaskGroup(id, name, image, args, workingDir, env, resources)]

JobRegisterRequest jobRegisterRequest = new JobRegisterRequest();
jobRegisterRequest.setJob(job);
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts.region, config.jobOpts.namespace, null, null)
jobRegisterResponse.evalID
}

TaskGroup createTaskGroup(String id, String name, String image, List<String> args, String workingDir, Map<String, String>env){
def task = createTask(id, image, args, workingDir, env)
TaskGroup createTaskGroup(String id, String name, String image, List<String> args, String workingDir, Map<String, String>env, Resources resources){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't like to expose specific nomad objects outside of the service

I mean, Resources is something strong related with the Service but not with the Task. Maybe we can use some TaskResource class as parameter and let the Service do the conversion to a nomad Resource object

it sounds a little more complicated/elaborated but in this way we have separate both concepts (nextflow vs nomad)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jagedn , okay so I've tried to refactor the code keeping different layers separate. Let me know what you think about the current structure?

I do think that we can explore the idea regarding a specific "translation" class which covers all aspects of NomadJobDefinition from a NextflowTask, however I suggest that we still continue the current iterations and revisit this once the plugin has stabilised in its design - what do you think?

def task = createTask(id, image, args, workingDir, env, resources)
def taskGroup = new TaskGroup(
name: "group",
tasks: [ task ]
Expand All @@ -98,18 +102,20 @@ class NomadService implements Closeable{
return taskGroup
}

Task createTask(String id, String image, List<String> args, String workingDir, Map<String, String>env) {
Task createTask(String id, String image, List<String> args, String workingDir, Map<String, String>env, Resources taskResources) {

def task = new Task(
name: "nf-task",
driver: "docker",
resources: taskResources,
config: [
image: image,
privileged: true,
work_dir: workingDir,
command: args.first(),
args: args.tail(),
] as Map<String,Object>,
env: env
env: env,
)
if( config.jobOpts.dockerVolume){
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.nomad.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.models.Resources
import io.nomadproject.client.models.TaskGroupSummary
import nextflow.exception.ProcessSubmitException
import nextflow.exception.ProcessUnrecoverableException
Expand All @@ -30,6 +31,7 @@ import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.util.Escape
import nextflow.util.MemoryUnit

import java.nio.file.Path

Expand Down Expand Up @@ -74,7 +76,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

@Override
boolean checkIfCompleted() {
if( !nomadService.checkIfCompleted(this.jobName) ){
if (!nomadService.checkIfCompleted(this.jobName)) {
return false
}

Expand Down Expand Up @@ -119,7 +121,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
String submitTask() {
log.debug "[NOMAD] Submitting task ${task.name} - work-dir=${task.workDirStr}"
def imageName = task.container
if( !imageName )
if (!imageName)
throw new ProcessSubmitException("Missing container image for process `$task.processor.name`")

def builder = createBashWrapper(task)
Expand All @@ -131,7 +133,8 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

nomadService.submitTask(this.jobName, task.name, imageName, launcher,
task.workDir.toAbsolutePath().toString(),
getEnv(task) )
getEnv(task), getResources(task))


// submit the task execution
log.debug "[NOMAD] Submitted task ${task.name} with taskId=${this.jobName}"
Expand All @@ -157,14 +160,28 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
: new NomadScriptLauncher(task.toTaskBean())
}

protected Map<String, String>getEnv(TaskRun task){
protected Map<String, String> getEnv(TaskRun task) {
Map<String, String> ret = [:]
if( fusionEnabled() ) {
if (fusionEnabled()) {
ret += fusionLauncher().fusionEnv()
}
return ret
}



protected Resources getResources(TaskRun task) {
final taskCfg = task.getConfig()
final taskCores = !taskCfg.get("cpus") ? 1 : taskCfg.get("cpus") as Integer
final taskMemory = taskCfg.get("memory") ? new MemoryUnit( taskCfg.get("memory") as String ) : new MemoryUnit("300.MB")

final res = new Resources()
.cores(taskCores)
.memoryMB(taskMemory.toMega() as Integer)

return res
}

protected String taskState0(String taskName) {
final now = System.currentTimeMillis()
final delta = now - timestamp;
Expand Down
4 changes: 2 additions & 2 deletions plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class NomadDSLSpec extends Dsl2Spec{

then:
thrown(AbortRunException) //it fails because no real task is executed
submitted
summary
// submitted
// summary
abhi18av marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package nextflow.nomad.executor

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import io.nomadproject.client.models.Resources
import nextflow.nomad.NomadConfig
import nextflow.util.MemoryUnit
import okhttp3.mockwebserver.MockResponse
import okhttp3.mockwebserver.MockWebServer
import spock.lang.Specification
Expand Down Expand Up @@ -58,13 +60,14 @@ class NomadServiceSpec extends Specification{
List<String> args = ["theCommand", "theArgs"]
String workingDir = "theWorkingDir"
Map<String, String>env = [test:"test"]
Resources resources = new Resources().cores(1).memoryMB(1000)

mockWebServer.enqueue(new MockResponse()
.setBody(JsonOutput.toJson(["EvalID":"test"]).toString())
.addHeader("Content-Type", "application/json"));
when:

def idJob = service.submitTask(id, name, image, args, workingDir,env)
def idJob = service.submitTask(id, name, image, args, workingDir,env,resources)
def recordedRequest = mockWebServer.takeRequest();
def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8())

Expand All @@ -85,6 +88,8 @@ class NomadServiceSpec extends Specification{
body.Job.TaskGroups[0].Name == "group"
body.Job.TaskGroups[0].Tasks.size() == 1
body.Job.TaskGroups[0].Tasks[0].Name == "nf-task"
body.Job.TaskGroups[0].Tasks[0].Resources.Cores == 1
body.Job.TaskGroups[0].Tasks[0].Resources.MemoryMB == 1000
body.Job.TaskGroups[0].Tasks[0].Driver == "docker"
body.Job.TaskGroups[0].Tasks[0].Config.image == image
body.Job.TaskGroups[0].Tasks[0].Config.work_dir == workingDir
Expand All @@ -94,60 +99,6 @@ class NomadServiceSpec extends Specification{
!body.Job.TaskGroups[0].Tasks[0].Config.mount
}

void "submit a task with docker volume"(){
given:
def config = new NomadConfig(
client:[
address : "http://${mockWebServer.hostName}:${mockWebServer.port}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed this test as we decided not to continue with docker volume.

],
jobs:[
dockerVolume:'test'
]
)
def service = new NomadService(config)

String id = "theId"
String name = "theName"
String image = "theImage"
List<String> args = ["theCommand", "theArgs"]
String workingDir = "a/b/c"
Map<String, String>env = [test:"test"]

mockWebServer.enqueue(new MockResponse()
.setBody(JsonOutput.toJson(["EvalID":"test"]).toString())
.addHeader("Content-Type", "application/json"));
when:

def idJob = service.submitTask(id, name, image, args, workingDir,env)
def recordedRequest = mockWebServer.takeRequest();
def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8())

then:
idJob

and:
recordedRequest.method == "POST"
recordedRequest.path == "/v1/jobs"

and:
body.Job
body.Job.ID == id
body.Job.Name == name
body.Job.Datacenters == []
body.Job.Type == "batch"
body.Job.TaskGroups.size() == 1
body.Job.TaskGroups[0].Name == "group"
body.Job.TaskGroups[0].Tasks.size() == 1
body.Job.TaskGroups[0].Tasks[0].Name == "nf-task"
body.Job.TaskGroups[0].Tasks[0].Driver == "docker"
body.Job.TaskGroups[0].Tasks[0].Config.image == image
body.Job.TaskGroups[0].Tasks[0].Config.work_dir == workingDir
body.Job.TaskGroups[0].Tasks[0].Config.command == args[0]
body.Job.TaskGroups[0].Tasks[0].Config.args == args.drop(1)

body.Job.TaskGroups[0].Tasks[0].Config.mount == [type:"volume", target:"a", source:"test", readonly:false]
}

void "should check the state"(){
given:
def config = new NomadConfig(
Expand Down Expand Up @@ -208,13 +159,14 @@ class NomadServiceSpec extends Specification{
List<String> args = ["theCommand", "theArgs"]
String workingDir = "a/b/c"
Map<String, String>env = [test:"test"]
Resources resources = new Resources().cores(1).memoryMB(1000)

mockWebServer.enqueue(new MockResponse()
.setBody(JsonOutput.toJson(["EvalID":"test"]).toString())
.addHeader("Content-Type", "application/json"));
when:

def idJob = service.submitTask(id, name, image, args, workingDir,env)
def idJob = service.submitTask(id, name, image, args, workingDir,env,resources )
def recordedRequest = mockWebServer.takeRequest();
def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8())

Expand Down Expand Up @@ -253,7 +205,7 @@ class NomadServiceSpec extends Specification{
def config = new NomadConfig(
client:[
address : "http://${mockWebServer.hostName}:${mockWebServer.port}",
token: "1234"
token: "123456789012345678901234567"
],
jobs:[
dockerVolume:'test'
Expand All @@ -267,13 +219,14 @@ class NomadServiceSpec extends Specification{
List<String> args = ["theCommand", "theArgs"]
String workingDir = "a/b/c"
Map<String, String>env = [test:"test"]
Resources resources = new Resources().cores(1).memoryMB(1000)

mockWebServer.enqueue(new MockResponse()
.setBody(JsonOutput.toJson(["EvalID":"test"]).toString())
.addHeader("Content-Type", "application/json"));
when:

def idJob = service.submitTask(id, name, image, args, workingDir,env)
def idJob = service.submitTask(id, name, image, args, workingDir,env, resources )
def recordedRequest = mockWebServer.takeRequest();
def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8())

Expand All @@ -283,6 +236,6 @@ class NomadServiceSpec extends Specification{
and:
recordedRequest.method == "POST"
recordedRequest.path == "/v1/jobs"
recordedRequest.headers.values('X-Nomad-Token').first()=='1234'
recordedRequest.headers.values('X-Nomad-Token').first()=='123456789012345678901234567'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import nextflow.exception.ProcessSubmitException
import nextflow.executor.Executor
import nextflow.nomad.NomadConfig
import nextflow.processor.TaskBean
import nextflow.processor.TaskConfig
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.script.ProcessConfig
import spock.lang.Specification

import java.nio.file.Files
Expand Down Expand Up @@ -61,6 +63,7 @@ class NomadTaskHandlerSpec extends Specification{
new File(workDir.toFile(), TaskRun.CMD_INFILE).text = "infile"

def mockTask = Mock(TaskRun){
getConfig() >> Mock(TaskConfig)
getWorkDir() >> workDir
getContainer() >> "ubuntu"
getProcessor() >> Mock(TaskProcessor){
Expand Down