Skip to content

Commit

Permalink
Merge pull request #27 from nextflow-io/patch-nomad-client
Browse files Browse the repository at this point in the history
Accommodate Nextflow CPU/Memory in Nomad Job taskResources
  • Loading branch information
abhi18av authored Mar 12, 2024
2 parents 5fa6f00 + 09a980a commit 65b9226
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 112 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.0.2
version=0.0.3-rc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import io.nomadproject.client.models.Job
import io.nomadproject.client.models.JobRegisterRequest
import io.nomadproject.client.models.JobRegisterResponse
import io.nomadproject.client.models.JobSummary
import io.nomadproject.client.models.Resources
import io.nomadproject.client.models.Task
import io.nomadproject.client.models.TaskGroup
import io.nomadproject.client.models.TaskGroupSummary
import io.nomadproject.client.models.VolumeMount
import io.nomadproject.client.models.VolumeRequest
import nextflow.nomad.NomadConfig
import nextflow.processor.TaskRun
import nextflow.util.MemoryUnit

/**
* Nomad Service
Expand All @@ -48,40 +51,59 @@ class NomadService implements Closeable{

NomadService(NomadConfig config) {
this.config = config
ApiClient apiClient = new ApiClient()

final CONNECTION_TIMEOUT_MILLISECONDS = 60000
final READ_TIMEOUT_MILLISECONDS = 60000
final WRITE_TIMEOUT_MILLISECONDS = 60000

ApiClient apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS)
apiClient.basePath = config.clientOpts.address
log.debug "[NOMAD] Client Address: ${config.clientOpts.address}"

if( config.clientOpts.token ){
log.debug "[NOMAD BATCH] Creating Nomad connection using token: ${config.clientOpts.token?.take(5)}.."
log.debug "[NOMAD] Client Token: ${config.clientOpts.token?.take(5)}.."
apiClient.apiKey = config.clientOpts.token
}
this.jobsApi = new JobsApi(apiClient);
}

protected Resources getResources(TaskRun task) {
final DEFAULT_CPUS = 1
final DEFAULT_MEMORY = "300.MB"

final taskCfg = task.getConfig()
final taskCores = !taskCfg.get("cpus") ? DEFAULT_CPUS : taskCfg.get("cpus") as Integer
final taskMemory = taskCfg.get("memory") ? new MemoryUnit( taskCfg.get("memory") as String ) : new MemoryUnit(DEFAULT_MEMORY)

final res = new Resources()
.cores(taskCores)
.memoryMB(taskMemory.toMega() as Integer)

return res
}

@Override
void close() throws IOException {
}

String submitTask(String id, String name, String image,
List<String> args,
String workingDir,
Map<String, String>env){
String submitTask(String id, TaskRun task, List<String> args, Map<String, String>env){
Job job = new Job();
job.ID = id
job.name = name
job.name = task.name
job.type = "batch"
job.datacenters = this.config.jobOpts.datacenters
job.namespace = this.config.jobOpts.namespace

job.taskGroups = [createTaskGroup(id, name, image, args, workingDir, env)]
job.taskGroups = [createTaskGroup(task, args, env)]

JobRegisterRequest jobRegisterRequest = new JobRegisterRequest();
jobRegisterRequest.setJob(job);
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts.region, config.jobOpts.namespace, null, null)
jobRegisterResponse.evalID
}

TaskGroup createTaskGroup(String id, String name, String image, List<String> args, String workingDir, Map<String, String>env){
def task = createTask(id, image, args, workingDir, env)
TaskGroup createTaskGroup(TaskRun taskRun, List<String> args, Map<String, String>env){
def task = createTask(taskRun, args, env)
def taskGroup = new TaskGroup(
name: "group",
tasks: [ task ]
Expand All @@ -98,22 +120,28 @@ class NomadService implements Closeable{
return taskGroup
}

Task createTask(String id, String image, List<String> args, String workingDir, Map<String, String>env) {
def task = new Task(
Task createTask(TaskRun task, List<String> args, Map<String, String>env) {

def imageName = task.container
def workingDir = task.workDir.toAbsolutePath().toString()
def taskResources = getResources(task)

def taskDef = new Task(
name: "nf-task",
driver: "docker",
resources: taskResources,
config: [
image: image,
image: imageName,
privileged: true,
work_dir: workingDir,
command: args.first(),
args: args.tail(),
] as Map<String,Object>,
env: env
env: env,
)
if( config.jobOpts.dockerVolume){
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
task.config.mount = [
taskDef.config.mount = [
type : "volume",
target : destinationDir,
source : config.jobOpts.dockerVolume,
Expand All @@ -122,12 +150,12 @@ class NomadService implements Closeable{
}
if( config.jobOpts.volumeSpec){
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
task.volumeMounts = [ new VolumeMount(
taskDef.volumeMounts = [ new VolumeMount(
destination: destinationDir,
volume: config.jobOpts.volumeSpec.name
)]
}
task
taskDef
}


Expand All @@ -152,6 +180,8 @@ class NomadService implements Closeable{
}
}



boolean checkIfRunning(String jobId){
Job job = jobsApi.getJob(jobId, config.jobOpts.region, config.jobOpts.namespace, null, null, null, null, null, null, null)
job.status == "running"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.nomad.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.models.Resources
import io.nomadproject.client.models.TaskGroupSummary
import nextflow.exception.ProcessSubmitException
import nextflow.exception.ProcessUnrecoverableException
Expand All @@ -30,6 +31,7 @@ import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.util.Escape
import nextflow.util.MemoryUnit

import java.nio.file.Path

Expand Down Expand Up @@ -74,7 +76,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

@Override
boolean checkIfCompleted() {
if( !nomadService.checkIfCompleted(this.jobName) ){
if (!nomadService.checkIfCompleted(this.jobName)) {
return false
}

Expand Down Expand Up @@ -118,20 +120,17 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

String submitTask() {
log.debug "[NOMAD] Submitting task ${task.name} - work-dir=${task.workDirStr}"
def imageName = task.container
if( !imageName )
if (!task.container)
throw new ProcessSubmitException("Missing container image for process `$task.processor.name`")

def builder = createBashWrapper(task)
builder.build()

this.jobName = NomadHelper.sanitizeName(task.name + "-" + task.hash)

final launcher = getSubmitCommand(task)

nomadService.submitTask(this.jobName, task.name, imageName, launcher,
task.workDir.toAbsolutePath().toString(),
getEnv(task) )
final taskLauncher = getSubmitCommand(task)
final taskEnv = getEnv(task)
nomadService.submitTask(this.jobName, task, taskLauncher, taskEnv)

// submit the task execution
log.debug "[NOMAD] Submitted task ${task.name} with taskId=${this.jobName}"
Expand All @@ -157,9 +156,9 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
: new NomadScriptLauncher(task.toTaskBean())
}

protected Map<String, String>getEnv(TaskRun task){
protected Map<String, String> getEnv(TaskRun task) {
Map<String, String> ret = [:]
if( fusionEnabled() ) {
if (fusionEnabled()) {
ret += fusionLauncher().fusionEnv()
}
return ret
Expand Down
Loading

0 comments on commit 65b9226

Please sign in to comment.