Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accommodate Nextflow CPU/Memory in Nomad Job taskResources #27

Merged
merged 6 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.0.2
version=0.0.3-rc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import io.nomadproject.client.models.Job
import io.nomadproject.client.models.JobRegisterRequest
import io.nomadproject.client.models.JobRegisterResponse
import io.nomadproject.client.models.JobSummary
import io.nomadproject.client.models.Resources
import io.nomadproject.client.models.Task
import io.nomadproject.client.models.TaskGroup
import io.nomadproject.client.models.TaskGroupSummary
import io.nomadproject.client.models.VolumeMount
import io.nomadproject.client.models.VolumeRequest
import nextflow.nomad.NomadConfig
import nextflow.processor.TaskRun
import nextflow.util.MemoryUnit

/**
* Nomad Service
Expand All @@ -48,40 +51,59 @@ class NomadService implements Closeable{

NomadService(NomadConfig config) {
this.config = config
ApiClient apiClient = new ApiClient()

final CONNECTION_TIMEOUT_MILLISECONDS = 60000
final READ_TIMEOUT_MILLISECONDS = 60000
final WRITE_TIMEOUT_MILLISECONDS = 60000

ApiClient apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS)
apiClient.basePath = config.clientOpts.address
log.debug "[NOMAD] Client Address: ${config.clientOpts.address}"

if( config.clientOpts.token ){
log.debug "[NOMAD BATCH] Creating Nomad connection using token: ${config.clientOpts.token?.take(5)}.."
log.debug "[NOMAD] Client Token: ${config.clientOpts.token?.take(5)}.."
apiClient.apiKey = config.clientOpts.token
}
this.jobsApi = new JobsApi(apiClient);
}

protected Resources getResources(TaskRun task) {
final DEFAULT_CPUS = 1
final DEFAULT_MEMORY = "300.MB"

final taskCfg = task.getConfig()
final taskCores = !taskCfg.get("cpus") ? DEFAULT_CPUS : taskCfg.get("cpus") as Integer
final taskMemory = taskCfg.get("memory") ? new MemoryUnit( taskCfg.get("memory") as String ) : new MemoryUnit(DEFAULT_MEMORY)

final res = new Resources()
.cores(taskCores)
.memoryMB(taskMemory.toMega() as Integer)

return res
}

@Override
void close() throws IOException {
}

String submitTask(String id, String name, String image,
List<String> args,
String workingDir,
Map<String, String>env){
String submitTask(String id, TaskRun task, List<String> args, Map<String, String>env){
Job job = new Job();
job.ID = id
job.name = name
job.name = task.name
job.type = "batch"
job.datacenters = this.config.jobOpts.datacenters
job.namespace = this.config.jobOpts.namespace

job.taskGroups = [createTaskGroup(id, name, image, args, workingDir, env)]
job.taskGroups = [createTaskGroup(task, args, env)]

JobRegisterRequest jobRegisterRequest = new JobRegisterRequest();
jobRegisterRequest.setJob(job);
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts.region, config.jobOpts.namespace, null, null)
jobRegisterResponse.evalID
}

TaskGroup createTaskGroup(String id, String name, String image, List<String> args, String workingDir, Map<String, String>env){
def task = createTask(id, image, args, workingDir, env)
TaskGroup createTaskGroup(TaskRun taskRun, List<String> args, Map<String, String>env){
def task = createTask(taskRun, args, env)
def taskGroup = new TaskGroup(
name: "group",
tasks: [ task ]
Expand All @@ -98,22 +120,28 @@ class NomadService implements Closeable{
return taskGroup
}

Task createTask(String id, String image, List<String> args, String workingDir, Map<String, String>env) {
def task = new Task(
Task createTask(TaskRun task, List<String> args, Map<String, String>env) {

def imageName = task.container
def workingDir = task.workDir.toAbsolutePath().toString()
def taskResources = getResources(task)

def taskDef = new Task(
name: "nf-task",
driver: "docker",
resources: taskResources,
config: [
image: image,
image: imageName,
privileged: true,
work_dir: workingDir,
command: args.first(),
args: args.tail(),
] as Map<String,Object>,
env: env
env: env,
)
if( config.jobOpts.dockerVolume){
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
task.config.mount = [
taskDef.config.mount = [
type : "volume",
target : destinationDir,
source : config.jobOpts.dockerVolume,
Expand All @@ -122,12 +150,12 @@ class NomadService implements Closeable{
}
if( config.jobOpts.volumeSpec){
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
task.volumeMounts = [ new VolumeMount(
taskDef.volumeMounts = [ new VolumeMount(
destination: destinationDir,
volume: config.jobOpts.volumeSpec.name
)]
}
task
taskDef
}


Expand All @@ -152,6 +180,8 @@ class NomadService implements Closeable{
}
}



boolean checkIfRunning(String jobId){
Job job = jobsApi.getJob(jobId, config.jobOpts.region, config.jobOpts.namespace, null, null, null, null, null, null, null)
job.status == "running"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.nomad.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.models.Resources
import io.nomadproject.client.models.TaskGroupSummary
import nextflow.exception.ProcessSubmitException
import nextflow.exception.ProcessUnrecoverableException
Expand All @@ -30,6 +31,7 @@ import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.util.Escape
import nextflow.util.MemoryUnit

import java.nio.file.Path

Expand Down Expand Up @@ -74,7 +76,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

@Override
boolean checkIfCompleted() {
if( !nomadService.checkIfCompleted(this.jobName) ){
if (!nomadService.checkIfCompleted(this.jobName)) {
return false
}

Expand Down Expand Up @@ -118,20 +120,17 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

String submitTask() {
log.debug "[NOMAD] Submitting task ${task.name} - work-dir=${task.workDirStr}"
def imageName = task.container
if( !imageName )
if (!task.container)
throw new ProcessSubmitException("Missing container image for process `$task.processor.name`")

def builder = createBashWrapper(task)
builder.build()

this.jobName = NomadHelper.sanitizeName(task.name + "-" + task.hash)

final launcher = getSubmitCommand(task)

nomadService.submitTask(this.jobName, task.name, imageName, launcher,
task.workDir.toAbsolutePath().toString(),
getEnv(task) )
final taskLauncher = getSubmitCommand(task)
final taskEnv = getEnv(task)
nomadService.submitTask(this.jobName, task, taskLauncher, taskEnv)

// submit the task execution
log.debug "[NOMAD] Submitted task ${task.name} with taskId=${this.jobName}"
Expand All @@ -157,9 +156,9 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
: new NomadScriptLauncher(task.toTaskBean())
}

protected Map<String, String>getEnv(TaskRun task){
protected Map<String, String> getEnv(TaskRun task) {
Map<String, String> ret = [:]
if( fusionEnabled() ) {
if (fusionEnabled()) {
ret += fusionLauncher().fusionEnv()
}
return ret
Expand Down
Loading