Skip to content

XSEDE Tutorial Part 3: BigJob

melrom edited this page Nov 2, 2012 · 83 revisions

This page contains material for part 3 of the XSEDE 2012 Tutorial. It covers BigJob, a SAGA-based Pilot-Job implementation, how it is installed and used to submit jobs to and transfer files across XSEDE resources.

Direct Link to the BigJob slides

Introduction to BigJob

BigJob, a SAGA-based Pilot-Job, is a general purpose Pilot-Job framework. Pilot-Jobs support the use of container jobs with sophisticated workflow management to coordinate the launch and interaction of actual computational tasks within the container. This results in the decoupling of workload submission from resource assignment, allowing a flexible execution model that enables the distributed scale-out of applications on multiple and possibly heterogeneous resources. It allows the execution of jobs without the necessity to queue each individual job.

Additional information about BigJob can be found on the website: http://saga-project.github.com/BigJob/. A comprehensive API documentation is available at http://saga-project.github.com/BigJob/apidoc/.

Pilot Description

Pilot description defines the resource specification for managing the jobs on that resource. The following are the resource specifications that need to be provided:

  • service_url - specifies the SAGA Bliss job adaptor and resource hostname on which jobs can be executed. For remote hosts password less login need to be enabled.
  • number_of_processes - specifies the total number of processes need to be allocated to run the jobs.
  • queue - specifies the job queue to be used.
  • working_directory - specifies the directory in which the Pilot-Job agent executes
  • walltime - specifies the number of minutes the resources are requested.
  • file_transfer - specifies the files that need to be transferred in order to execute the jobs successfully. Generally files common to all the jobs need to be listed here.
pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
                                   "number_of_processes": 12,
                                   "allocation": "XSEDE12-SAGA",
                                   "queue": "development",
                                   "working_directory": os.getenv("HOME")+"/agent",
                                   "walltime":10
                                })

Compute Unit Description

The Compute Unit Description allows the user to specify the actual job parameters and data needed to execute the job.

  • executable - specifies the executable.
  • arguments - specifies the list of arguments to be passed to executable.
  • environment - specifies the list of environment variables to be set for the successful of job execution.
  • working_directory - specifies the directory in which the job has to execute. If not specified Pilot-Job creates a default directory.
  • number_of_processes - specifies the number of processes to be assigned for the job execution.
  • spmd_variation - specifies the type of job. By default it is single job.
  • output - specifies the file in which the standard output of the job execution to be stored.
  • error - specifies the file in which the standard error of the job execution to be stored.
  • file_transfer - specifies the files that need to be transferred in order to execute the job successfully. Generally files specific to the job need to be listed here.
compute_unit_description = { "executable": "/bin/echo",
                             "arguments": ["Hello","$ENV1","$ENV2"],
                             "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                             "number_of_processes": 4,            
                             "spmd_variation":"mpi",
                             "output": "stdout.txt",
                             "error": "stderr.txt"
                           }    

Environment Setup and Installation

For the hands-on part of this tutorial, you have to be logged in to Lonestar and have your virtualenv activated. Part 2 of this tutorial explains this in more detail.

Install BigJob

BigJob is available via PyPi and can be installed via easy install by typing:

pip install BigJob

To make sure that your installation works, run the following command to check if the BigJob module can be imported by the interpreter:

python -c "import bigjob; print bigjob.version"

Create BigJob Agent Directory

Prior to running these examples, you will need to create a directory called 'agent' in the same location that you are running your scripts from. BigJob uses this as its working directory. For this tutorial, we'll create the agent directory in the $HOME directory by typing:

mkdir $HOME/agent

Note: For larger jobs on your own account, it is recommended to run your scripts out of $SCRATCH or $WORK, since the agent directory grows quickly.

Hands-On: Submitting Multiple Jobs to Resources using BigJob

Simple Ensemble

The below example submits N jobs using SAGA Pilot-Job. It demonstrates the mapping of a simple echo job using all of the parameters of a Compute Unit Description.

In your $HOME directory, open a new file simple_ensembles.py with your favorite editor (e.g., vim) and paste the following content:

import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
	
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://localhost"

if __name__ == "__main__":

    pilot_compute_service = PilotComputeService(COORDINATION_URL)

    pilot_compute_description = { "service_url": "sge://localhost",
                                  "number_of_processes": 12,
                                  "allocation": "XSEDE12-SAGA",
                                  "queue": "development",                                      
                                  "working_directory": os.getenv("HOME")+"/agent",
                                  "walltime":10
                                }

    pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)

    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)

    print ("Finished Pilot-Job setup. Submitting compute units")

    # submit compute units
    for i in range(NUMBER_JOBS):
        compute_unit_description = {
                "executable": "/bin/echo",
                "arguments": ["Hello","$ENV1","$ENV2"],
                "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                "number_of_processes": 4,            
                "spmd_variation":"mpi",
                "output": "stdout.txt",
                "error": "stderr.txt",
                }    
        compute_data_service.submit_compute_unit(compute_unit_description)

    print ("Waiting for compute units to complete")
    compute_data_service.wait()

    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Execute the script using command

python simple_ensembles.py

If you run the script, what do you get? You will have to go into the working directory( which is $HOME/agent in this case ), then the directory named after the pilot-service, and then the compute unit directories associated with that pilot-service.

MandelBrot Example

The below example demonstrates Mandelbrot execution via Pilot-Job. Make sure you have all the setup needed mentioned in section 5 of SAGA Bliss tutorial to execute Mandelbrot.

Cut and paste code below into bj_mandelbrot.py.

import os, time, sys
from PIL import Image
import bliss.saga as saga 
from pilot import PilotComputeService, ComputeDataService, State

# the dimension (in pixel) of the whole fractal
imgx = 8192 
imgy = 8192

# the number of tiles in X and Y direction
tilesx = 2
tilesy = 2

	
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://localhost"

if __name__ == "__main__":

    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    
    # copy image tiles back to our 'local' directory
    dirname = 'sftp://localhost/%s/PJ-mbrot/' % os.getenv('SCRATCH')
    workdir = saga.filesystem.Directory(dirname, saga.filesystem.Create)

    pilot_compute_description={ "service_url": "sge://localhost",
                                "number_of_processes": 12,
                                "allocation": "XSEDE12-SAGA",
                                "queue": "development",
                                "processes_per_node":12,
                                "working_directory": workdir.get_url().path,
                                "walltime":10
                              }

    pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)

    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)

    print ("Finished Pilot-Job setup. Submitting compute units")

    # submit compute units
    for x in range(0, tilesx):
        for y in range(0, tilesy):                
            # describe a single Mandelbrot job. we're using the 
            # directory created above as the job's working directory
            outputfile = 'tile_x%s_y%s.png' % (x,y)
                
            compute_unit_description = {
                        "executable": "python",
                        "arguments": [os.getenv("HOME")+'/mandelbrot.py', str(imgx), str(imgy), 
                                        str(imgx/tilesx*x), str(imgx/tilesx*(x+1)),
                                        str(imgy/tilesy*y), str(imgy/tilesy*(y+1)),
                                        outputfile],
                        "number_of_processes": 1,    
                        "working_directory":workdir.get_url().path,        
                        "output": "stdout_x%s_y%s.txt" % (x,y),
                        "error": "stderr_x%s_y%s.txt" % (x,y),
                        }    
            compute_data_service.submit_compute_unit(compute_unit_description)

    print ("Waiting for compute units to complete")
    compute_data_service.wait()
                
    # Preparing the final image
    for image in workdir.list('*.png'):
        print ' * Copying %s/%s back to %s' % (workdir.get_url(), image, os.getcwd())
        workdir.copy(image, 'sftp://localhost/%s/' % os.getcwd()) 

    # stitch together the final image
    fullimage = Image.new('RGB',(imgx, imgy),(255,255,255))
    print ' * Stitching together the whole fractal: mandelbrot_full.png'
    for x in range(0, tilesx):
        for y in range(0, tilesy):
            partimage = Image.open('tile_x%s_y%s.png' % (x, y))
            fullimage.paste(partimage, (imgx/tilesx*x, imgy/tilesy*y, imgx/tilesx*(x+1), imgy/tilesy*(y+1)) )
    fullimage.save("mandelbrot_full.png", "PNG")

    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Execute the bj_mandelbrot.py script using command

python bj_mandelbrot.py

How does the execution of bj_mandelbrot.py differ from saga_mandelbrot.py (part 2). Hint: Try to find the execution time difference via SAGA Bliss Mandelbrot and Pilot-Job Mandelbrot. Why is the time difference?

Chained Ensembles

The below example submit set of echo jobs(A) using SAGA Pilot-Job, and for every successful job (with state Done) it submit another /bin/echo job (set B) to the same Pilot-Job.

In your $HOME directory, open a new file chained_ensemble.py with your favorite editor (e.g., vim) and paste the following content:

import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State


### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://localhost"

if __name__ == "__main__":

    pilot_compute_service = PilotComputeService(COORDINATION_URL)

    pilot_compute_description = { "service_url": "sge://localhost",
                                  "number_of_processes": 12,
                                  "allocation": "XSEDE12-SAGA",
                                  "queue": "development",
                                  "working_directory": os.getenv("HOME")+"/agent",
                                  "walltime":10,
                                }

    pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)

    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)

    print ("Finished Pilot-Job setup. Submit compute units")
    # submit Set A compute units
    all_A_cus = []
    for i in range(NUMBER_JOBS):
        compute_unit_description = { "executable": "/bin/echo",
                                      "arguments": ["Hello","$ENV1","$ENV2"],
                                      "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                                      "number_of_processes": 1,            
                                      "output": "A_stdout.txt",
                                      "error": "A_stderr.txt"
                                    }    
        compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
        all_A_cus.append(compute_unit) # Store all the compute units.

    # Chaining tasks i.e submit a compute unit, when compute unit from A is successfully executed.

    while 1:
        for i in all_A_cus:
            if i.get_state() == "Done":
                compute_unit_description = { "executable": "/bin/echo",
                                             "arguments": ["$ENV1","$ENV2"],
                                             "environment": ['ENV1=task_B:','ENV2=after_task_A'+str(i)],
                                             "number_of_processes": 1,
                                             "output": "B_stdout.txt",
                                             "error": "B_stderr.txt"
                                           }
                compute_data_service.submit_compute_unit(compute_unit_description)
                all_A_cus.remove(i)
    
        if len(all_A_cus) == 0:
            break
 
    # Wait for set B jobs.
    compute_data_service.wait()

    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Execute the script using command

python chained_ensemble.py

Can you identify whether job of setB executed after job of set A? You will have to go into the working directory( which is $HOME/agent in this case ), then the directory named after the pilot-service, and then the compute unit directories associated with that pilot-service. Check the output files.

Coupled Ensemble

The script provides a simple workflow which submit a set of jobs(A) and jobs(B) and wait until they are completed and then submits set of jobs(C). It demonstrates synchronization mechanisms provided by SAGA Pilot-API.

In your $HOME directory, open a new file coupled_ensemble.py with your favorite editor (e.g., vim) and paste the following content:

import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
    	
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://localhost"
    
if __name__ == "__main__":
    
    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    
    pilot_compute_description = { "service_url": "sge://localhost",
                                  "number_of_processes": 12,
                                  "allocation": "XSEDE12-SAGA",
                                  "queue": "development",
                                  "working_directory": os.getenv("HOME")+"/agent",
                                  "walltime":10
                                }
    
    pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)
    
    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)
    
    print ("Finished Pilot-Job setup. Submitting compute units")
    
    # submit a set of CUs, call it A
    for i in range(NUMBER_JOBS):
        compute_unit_description = { "executable": "/bin/echo",
                                     "arguments": ["Hello","$ENV1","$ENV2"],
                                     "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                                     "number_of_processes": 1,            
                                     "output": "A_stdout.txt",
                                     "error": "A_stderr.txt"
                                    }    
        compute_data_service.submit_compute_unit(compute_unit_description)
    
    
    # submit a set of CUs, call it B
    for i in range(NUMBER_JOBS):
        compute_unit_description = { "executable": "/bin/date",
                                     "arguments": [],
                                     "environment": {},
                                     "number_of_processes": 1,
                                     "output": "B_stdout.txt",
                                     "error": "B_stderr.txt",
                                    }
        compute_data_service.submit_compute_unit(compute_unit_description)
     
    print ("Wait for CUs of task set A & B to complete")
    compute_data_service.wait()
    
    # submit a set of CUs, call it C
    for i in range(NUMBER_JOBS):
        compute_unit_description = { "executable": "/bin/echo",
                                     "arguments": ["Hello","$ENV1","$ENV2"],
                                     "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                                     "number_of_processes": 1,
                                     "spmd_variation":"single",
                                     "output": "C_stdout.txt",
                                     "error": "C_stderr.txt",
                                    }
        compute_data_service.submit_compute_unit(compute_unit_description)
    
    print ("Wait for CUs of task set C to complete")
    compute_data_service.wait()
    
    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Execute the script using command

python coupled_ensemble.py

Can you identify whether jobs of set(C) executed after jobs of set A & B? You will have to go into the working directory( which is $HOME/agent in this case ), then the directory named after the pilot-service, and then the compute unit directories associated with that pilot-service. Check the time stamps of the compute unit output file.

Submitting jobs to multiple Pilot jobs

The script provides an example to submit jobs to multiple pilots. The Pilot-API manages the jobs across multiple pilots launched on same/different machines.

In your $HOME directory, open a new file mulitple_pilotjobs.py with your favorite editor (e.g., vim) and paste the following content:

import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
    
COORDINATION_URL = "redis://localhost"
    	
### This is the number of jobs you want to run
NUMBER_JOBS=24
    
if __name__ == "__main__":
    
    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    pilot_compute_description=[]
    
    pilot_compute_description.append({ "service_url": "sge://localhost",
                                       "number_of_processes": 12,
                                       "allocation": "XSEDE12-SAGA",
    	                               "queue": "development",
                                       "working_directory": os.getenv("HOME")+"/agent",
                                       "walltime":10,
                                     })
    
    pilot_compute_description.append({ "service_url": "sge://localhost",
                                       "number_of_processes": 12,
                                       "allocation": "XSEDE12-SAGA",
                                       "queue": "development",
                                       "working_directory": os.getenv("HOME")+"/agent",
                                       "walltime":10
                                     })
    
    for pcd in pilot_compute_description:
        pilot_compute_service.create_pilot(pilot_compute_description=pcd)
    
    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)
    
    print ("Finished Pilot-Job setup. Submitting compute units")
    
    # submit compute units
    for i in range(NUMBER_JOBS):
        compute_unit_description = {"executable": "/bin/echo",
                                    "arguments": ["Hello","$ENV1","$ENV2"],
                                    "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                                    "number_of_processes": 1,            
                                    "output": "stdout.txt",
                                    "error": "stderr.txt",
                                    }    
        compute_data_service.submit_compute_unit(compute_unit_description)
    
    print ("Waiting for compute units to complete")
    compute_data_service.wait()
    
    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Execute the script using command

python mulitple_pilotjobs.py

Can you identify what jobs has been scheduled to which Pilot-Job? You will have to go into the working directory( which is $HOME/agent in this case ). For each Pilot-Job, a unique directory is created. The Pilot-Job directory contains list of compute unit directories scheduled to that Pilot-Job.

Compute-Data

The example demonstrates associating data required for the successful execution of executable. The Pilot-API is responsible for moving the necessary data to the executable working directory.

Create a test.txt file in $HOME directory cat /etc/motd > $HOME/test.txt

In your $HOME directory, open a new file compute_data.py with your favorite editor (e.g., vim) and paste the following content:

import sys
import os
import time
import logging
from pilot import PilotComputeService, ComputeDataService, State
    
COORDINATION_URL = "redis://localhost"
    
if __name__ == "__main__":      
    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    # create pilot job service and initiate a pilot job

    pilot_compute_description = { "service_url": "sge://localhost",
                                  "number_of_processes": 12,
                                  "allocation": "XSEDE12-SAGA",
    	                          "queue": "development",
                                  "working_directory": os.getenv("HOME")+"/agent",
                                  "walltime":10,
                                }
    
    pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)
    
    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)

    print ("Finished Pilot-Job setup. Submitting compute units")
        
    # Submit 8 compute units
    for i in range(8):
        compute_unit_description = { "executable": "/bin/cat",
                                     "arguments": ["test.txt"],
                                     "number_of_processes": 1,
                                     "output": "stdout.txt",
                                     "error": "stderr.txt",   
                                     "file_transfer": ["ssh://" + os.getenv("HOME") + "/test.txt > SUBJOB_WORK_DIR"]
                                    }    
        compute_data_service.submit_compute_unit(compute_unit_description)
    
    print("Finished submission. Waiting for completion of CU")
    compute_data_service.wait()
            
    print ("Terminate Pilot Compute Service")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Execute the script using command

python compute_data.py

Can you extend the script to use multiple Pilot-Jobs and see how data is moved along with compute unit? Hint: use mulitple_pilotjobs.py example