-
Notifications
You must be signed in to change notification settings - Fork 8
XSEDE Tutorial Part 3: BigJob
This page contains material for part 3 of the XSEDE 2012 Tutorial. It covers BigJob, a SAGA-based Pilot-Job implementation, how it is installed and used to submit jobs to and transfer files across XSEDE resources.
BigJob, a SAGA-based Pilot-Job, is a general purpose Pilot-Job framework. Pilot-Jobs support the use of container jobs with sophisticated workflow management to coordinate the launch and interaction of actual computational tasks within the container. This results in the decoupling of workload submission from resource assignment, allowing a flexible execution model that enables the distributed scale-out of applications on multiple and possibly heterogeneous resources. It allows the execution of jobs without the necessity to queue each individual job.
Additional information about BigJob can be found on the website: http://saga-project.github.com/BigJob/. A comprehensive API documentation is available at http://saga-project.github.com/BigJob/apidoc/.
Pilot description defines the resource specification for managing the jobs on that resource. The following are the resource specifications that need to be provided:
- service_url - specifies the SAGA Bliss job adaptor and resource hostname on which jobs can be executed. For remote hosts password less login need to be enabled.
- number_of_processes - specifies the total number of processes need to be allocated to run the jobs.
- queue - specifies the job queue to be used.
- working_directory - specifies the directory in which the Pilot-Job agent executes
- walltime - specifies the number of minutes the resources are requested.
- file_transfers - specifies the files that need to be transferred in order to execute the jobs successfully. Generally files common to all the jobs need to be listed here.
pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
"number_of_processes": 12,
"allocation": "XSEDE12-SAGA",
"queue": "development",
"working_directory": os.getenv("HOME")+"/agent",
"walltime":10
})
The Compute Unit Description allows to specify the actual job parameters and data needed to execute the job.
- executable - specifies the executable.
- arguments - specifies the list of arguments to be passed to executable.
- environment - specifies the list of environment variables to be set for the successful of job execution.
- working_directory - specifies the directory in which the job has to execute. If not specified Pilot-Job creates a default directory.
- number_of_processes - specifies the number of processes to be assigned for the job execution.
- spmd_variation - specifies the type of job. By default it is single job.
- output - specifies the file in which the standard output of the job execution to be stored.
- error - specifies the file in which the standard error of the job execution to be stored.
- file_transfers - specifies the files that need to be transferred in order to execute the job successfully. Generally files specific to the job need to be listed here.
compute_unit_description = { "executable": "/bin/echo",
"arguments": ["Hello","$ENV1","$ENV2"],
"environment": ['ENV1=env_arg1','ENV2=env_arg2'],
"number_of_processes": 4,
"spmd_variation":"mpi",
"output": "stdout.txt",
"error": "stderr.txt"
}
For the hands-on part of this tutorial, you have to be logged in to Lonestar and have your virtualenv activated. Part 2 of this tutorial explains this in more detail.
BigJob is available via PyPi and can be installed via easy install by typing:
pip install BigJob
To make sure that your installation works, run the following command to check if the BigJob module can be imported by the interpreter:
python -c "import bigjob; print bigjob.version"
Prior to running these examples, you will need to create a directory called 'agent' in the same location that you are running your scripts from. BigJob uses this as its working directory. For this tutorial, we'll create the agent directory in the $HOME
directory by typing:
mkdir $HOME/agent
Note: For larger jobs on your own account, it is recommended to run your scripts out of $SCRATCH or $WORK, since the agent directory grows quickly.
The below example submits N jobs using SAGA Pilot-Job. It demonstrates the mapping of a simple echo job using all of the parameters of a Compute Unit Description.
In your $HOME directory, open a new file simple_ensembles.py with your favorite editor (e.g., vim) and paste the following content:
import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://[email protected]:6379"
if __name__ == "__main__":
pilot_compute_service = PilotComputeService(COORDINATION_URL)
resource_desc = { "service_url": "sge://localhost",
"number_of_processes": 12,
"allocation": "XSEDE12-SAGA",
"queue": "development",
"working_directory": os.getenv("HOME")+"/agent",
"walltime":10
}
pilot_compute_service.create_pilot(pilot_compute_description=resource_desc)
compute_data_service = ComputeDataService()
compute_data_service.add_pilot_compute_service(pilot_compute_service)
print ("Finished Pilot-Job setup. Submitting compute units")
# submit compute units
for i in range(NUMBER_JOBS):
compute_unit_description = {
"executable": "/bin/echo",
"arguments": ["Hello","$ENV1","$ENV2"],
"environment": ['ENV1=env_arg1','ENV2=env_arg2'],
"number_of_processes": 4,
"spmd_variation":"mpi",
"output": "stdout.txt",
"error": "stderr.txt",
}
compute_data_service.submit_compute_unit(compute_unit_description)
print ("Waiting for compute units to complete")
compute_data_service.wait()
print ("Terminate Pilot Jobs")
compute_data_service.cancel()
pilot_compute_service.cancel()
If you run the script, what do you get?
python simple_ensembles.py
You will have to go into the agent directory, then the directory named after the pilot-service, and then the sub-job directories.
The below example demonstrates Mandelbrot execution via Pilot-Job. Make sure you have all the setup needed mentioned in section 5 of SAGA Bliss tutorial to execute Mandelbrot.
Cut and paste code below into bj_mandelbrot.py
. How does the execution of bj_mandelbrot.py differ from saga_mandelbrot.py (part 2). Hint: Try to find the execution time difference via SAGA Bliss Mandelbrot and Pilot-Job Mandelbrot.
import os, time, sys
from PIL import Image
import bliss.saga as saga
from pilot import PilotComputeService, ComputeDataService, State
# the dimension (in pixel) of the whole fractal
imgx = 8192
imgy = 8192
# the number of tiles in X and Y direction
tilesx = 2
tilesy = 2
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://[email protected]:6379"
if __name__ == "__main__":
pilot_compute_service = PilotComputeService(COORDINATION_URL)
pilot_compute_description=[]
# copy image tiles back to our 'local' directory
dirname = 'sftp://localhost/%s/PJ-mbrot/' % os.getenv('SCRATCH')
workdir = saga.filesystem.Directory(dirname, saga.filesystem.Create)
pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
"number_of_processes": 12,
"allocation": "XSEDE12-SAGA",
"queue": "development",
"processes_per_node":12,
"working_directory": workdir.get_url().path,
"walltime":10
})
for pcd in pilot_compute_description:
pilot_compute_service.create_pilot(pilot_compute_description=pcd)
compute_data_service = ComputeDataService()
compute_data_service.add_pilot_compute_service(pilot_compute_service)
print ("Finished Pilot-Job setup. Submitting compute units")
# submit compute units
for x in range(0, tilesx):
for y in range(0, tilesy):
# describe a single Mandelbrot job. we're using the
# directory created above as the job's working directory
outputfile = 'tile_x%s_y%s.png' % (x,y)
compute_unit_description = {
"executable": "python",
"arguments": [os.getenv("HOME")+'/mandelbrot.py', str(imgx), str(imgy),
str(imgx/tilesx*x), str(imgx/tilesx*(x+1)),
str(imgy/tilesy*y), str(imgy/tilesy*(y+1)),
outputfile],
"number_of_processes": 1,
"working_directory":workdir.get_url().path,
"output": "stdout_x%s_y%s.txt" % (x,y),
"error": "stderr_x%s_y%s.txt" % (x,y),
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
print ("Waiting for compute units to complete")
compute_data_service.wait()
# Preparing the final image
for image in workdir.list('*.png'):
print ' * Copying %s/%s back to %s' % (workdir.get_url(), image, os.getcwd())
workdir.copy(image, 'sftp://localhost/%s/' % os.getcwd())
# stitch together the final image
fullimage = Image.new('RGB',(imgx, imgy),(255,255,255))
print ' * Stitching together the whole fractal: mandelbrot_full.png'
for x in range(0, tilesx):
for y in range(0, tilesy):
partimage = Image.open('tile_x%s_y%s.png' % (x, y))
fullimage.paste(partimage, (imgx/tilesx*x, imgy/tilesy*y, imgx/tilesx*(x+1), imgy/tilesy*(y+1)) )
fullimage.save("mandelbrot_full.png", "PNG")
print ("Terminate Pilot Jobs")
compute_data_service.cancel()
pilot_compute_service.cancel()
The below example submit set of echo jobs(A) using SAGA Pilot-Job, and for every successful job (with state Done) it submit another /bin/echo job to the same Pilot-Job.
In your $HOME directory, open a new file chained_example.py with your favorite editor (e.g., vim) and paste the following content:
import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
import pdb
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://[email protected]:6379"
if __name__ == "__main__":
pilot_compute_service = PilotComputeService(COORDINATION_URL)
pilot_compute_description=[]
all_A_cus = []
pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
"number_of_processes": 12,
"allocation": "XSEDE12-SAGA",
"queue": "development",
"working_directory": os.getenv("HOME")+"/agent",
"walltime":10,
})
for pcd in pilot_compute_description:
pilot_compute_service.create_pilot(pilot_compute_description=pcd)
compute_data_service = ComputeDataService()
compute_data_service.add_pilot_compute_service(pilot_compute_service)
print ("Finished Pilot-Job setup. Submit compute units")
# submit Set A compute units
for i in range(NUMBER_JOBS):
compute_unit_description = { "executable": "/bin/echo",
"arguments": ["Hello","$ENV1","$ENV2"],
"environment": ['ENV1=env_arg1','ENV2=env_arg2'],
"number_of_processes": 4,
"spmd_variation":"mpi",
"output": "A_stdout.txt",
"error": "A_stderr.txt"
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
all_A_cus.append(compute_unit)
# Chaining tasks i.e submit a compute unit, when compute unit from A is successfully executed.
while 1:
for i in all_A_cus:
if i.get_state() == "Done":
compute_unit_description = { "executable": "/bin/echo",
"arguments": ["$ENV1","$ENV2"],
"environment": ['ENV1=task_B'+str(i),'ENV2=after_task_A'+str(i)],
"number_of_processes": 2,
"spmd_variation":"mpi",
"output": "B_stdout.txt",
"error": "B_stderr.txt"
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
all_A_cus.remove(i)
if len(all_A_cus) == 0:
break
# Wait for set B jobs.
compute_data_service.wait()
print ("Terminate Pilot Jobs")
compute_data_service.cancel()
pilot_compute_service.cancel()
The script provides a simple workflow which submit a set of jobs(A) and jobs(B) and wait until they are completed and then submits set of jobs(C). It demonstrates synchronization mechanisms provided by SAGA Pilot-API.
In your $HOME directory, open a new file coupled_ensembles.py with your favorite editor (e.g., vim) and paste the following content:
import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://[email protected]:6379"
if __name__ == "__main__":
pilot_compute_service = PilotComputeService(COORDINATION_URL)
pilot_compute_description=[]
pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
"number_of_processes": 12,
"allocation": "XSEDE12-SAGA",
"queue": "development",
"working_directory": os.getenv("HOME")+"/agent",
"walltime":10
})
for pcd in pilot_compute_description:
pilot_compute_service.create_pilot(pilot_compute_description=pcd)
compute_data_service = ComputeDataService()
compute_data_service.add_pilot_compute_service(pilot_compute_service)
print ("Finished Pilot-Job setup. Submitting compute units")
# submit a set of CUs, call it A
for i in range(NUMBER_JOBS):
compute_unit_description = { "executable": "/bin/echo",
"arguments": ["Hello","$ENV1","$ENV2"],
"environment": ['ENV1=env_arg1','ENV2=env_arg2'],
"number_of_processes": 4,
"spmd_variation":"mpi",
"output": "A_stdout.txt",
"error": "A_stderr.txt"
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
# submit a set of CUs, call it B
for i in range(NUMBER_JOBS):
compute_unit_description = { "executable": "/bin/date",
"arguments": [],
"environment": {},
"number_of_processes": 1,
"spmd_variation":"single",
"output": "B_stdout.txt",
"error": "B_stderr.txt",
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
print ("Wait for CUs of task set A & B to complete")
compute_data_service.wait()
# submit a set of CUs, call it C
for i in range(NUMBER_JOBS):
compute_unit_description = { "executable": "/bin/echo",
"arguments": ["Hello","$ENV1","$ENV2"],
"environment": ['ENV1=env_arg1','ENV2=env_arg2'],
"number_of_processes": 1,
"spmd_variation":"single",
"output": "C_stdout.txt",
"error": "C_stderr.txt",
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
print ("Wait for CUs of task set C to complete")
compute_data_service.wait()
print ("Terminate Pilot Jobs")
compute_data_service.cancel()
pilot_compute_service.cancel()
The script provides an example to submit jobs to multiple pilots. The Pilot-API manages the jobs across multiple pilots launched on same/different machines.
In your $HOME directory, open a new file scaling_jobs.py with your favorite editor (e.g., vim) and paste the following content:
import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
COORDINATION_URL = "redis://[email protected]:6379"
### This is the number of jobs you want to run
NUMBER_JOBS=24
if __name__ == "__main__":
pilot_compute_service = PilotComputeService(COORDINATION_URL)
pilot_compute_description=[]
pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
"number_of_processes": 12,
"allocation": "XSEDE12-SAGA",
"queue": "development",
"working_directory": os.getenv("HOME")+"/agent",
"walltime":10,
})
pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
"number_of_processes": 12,
"allocation": "XSEDE12-SAGA",
"queue": "development",
"working_directory": os.getenv("HOME")+"/agent",
"walltime":10
})
for pcd in pilot_compute_description:
pilot_compute_service.create_pilot(pilot_compute_description=pcd)
compute_data_service = ComputeDataService()
compute_data_service.add_pilot_compute_service(pilot_compute_service)
print ("Finished Pilot-Job setup. Submitting compute units")
# submit compute units
for i in range(NUMBER_JOBS):
compute_unit_description = {"executable": "/bin/echo",
"arguments": ["Hello","$ENV1","$ENV2"],
"environment": ['ENV1=env_arg1','ENV2=env_arg2'],
"number_of_processes": 4,
"spmd_variation":"mpi",
"output": "stdout.txt",
"error": "stderr.txt",
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
print ("Waiting for compute units to complete")
compute_data_service.wait()
print ("Terminate Pilot Jobs")
compute_data_service.cancel()
pilot_compute_service.cancel()
The example demonstrates associating data required for the successful execution of executable. The Pilot-API is responsible for moving the necessary data to the executable working directory.
Create a test.txt file in $HOME directory
cat /etc/motd > $HOME/test.txt
In your $HOME directory, open a new file compute_data.py with your favorite editor (e.g., vim) and paste the following content:
import sys
import os
import time
import logging
from pilot import PilotComputeService, ComputeDataService, State
COORDINATION_URL = "redis://[email protected]:6379"
if __name__ == "__main__":
pilot_compute_service = PilotComputeService(COORDINATION_URL)
pilot_compute_description = []
# create pilot job service and initiate a pilot job
pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
"number_of_processes": 12,
"allocation": "XSEDE12-SAGA",
"queue": "development",
"working_directory": os.getenv("HOME")+"/agent",
"walltime":10,
})
for pcd in pilot_compute_description:
pilotjob = pilot_compute_service.create_pilot(pilot_compute_description=pcd)
compute_data_service = ComputeDataService()
compute_data_service.add_pilot_compute_service(pilot_compute_service)
# Submit 24 compute units
for i in range(24):
compute_unit_description = { "executable": "/bin/cat",
"arguments": ["test.txt"],
"number_of_processes": 1,
"output": "stdout.txt",
"error": "stderr.txt",
"file_transfer": ["ssh://" + os.getenv("HOME") + "/test.txt > BIGJOB_WORK_DIR"]
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
logging.debug("Finished submission. Waiting for completion of CU")
compute_data_service.wait()
logging.debug("Terminate Pilot Compute Service")
compute_data_service.cancel()
pilot_compute_service.cancel()
For more complex example please have a look at **https://github.com/saga-project/experiments-SC12/blob/master/scripts/generic/bfast_perf_pd.py [(Running Bfast genomic application)]