Skip to content

XSEDE Tutorial Part 3: BigJob

pradeepmantha edited this page Jul 12, 2012 · 83 revisions

This page contains material for part 3 of the XSEDE 2012 Tutorial. It covers BigJob, a SAGA-based pilot-job implementation, how it is installed and used to submit jobs to and transfer files across XSEDE resources.

Introduction to BigJob

BigJob, a SAGA-based Pilot-Job, is a general purpose pilot-job framework. Pilot-Jobs support the use of container jobs with sophisticated workflow management to coordinate the launch and interaction of actual computational tasks within the container. This results in the decoupling of workload submission from resource assignment, allowing a flexible execution model that enables the distributed scale-out of applications on multiple and possibly heterogeneous resources. It allows the execution of jobs without the necessity to queue each individual job.

Additional information about BigJob can be found on the website: http://saga-project.github.com/BigJob/. A comprehensive API documentation is available at http://saga-project.github.com/BigJob/apidoc/.

Pilot Compute Description

Pilot Compute description defines the resource specification for managing the jobs on that resources. The following are the resource specification need to be provided.

  • service_url - Provides the SAGA Bliss job adaptor and resource hostname on which jobs can be executed. For remote hosts password less login need to be enabled.
  • number_of_processes - It specifies the total number of processes need to be allocated to run the jobs.
  • queue - Job queue to be used.
  • working_directory - Directory in which the PilotJob agent executes
  • walltime - Specifies the number of minutes the resources are requested.
  • file_transfers - Files that need to be transferred in order to execute the jobs successfully. Generally files common to all the jobs need to be listed here.
pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
                                   "number_of_processes": 12,
                                   "allocation": "XSEDE12-SAGA",
                                   "queue": "development",
                                   "working_directory": os.getenv("HOME")+"/agent",
                                   "walltime":10
                                })

Compute Unit Description

Compute Unit Description allows to specify the actual job parameters and data needed to execute the job.

  • executable - specifies executable of the job.
  • arguments - specifies the list of arguments to be passed to executable.
  • environment - specifies the list of environment variables to be set for the successful of job execution.
  • working_directory - specifies the directory in which the job has to execute. If not specified PilotJob creates a default directory.
  • number_of_processes - specifies the number of processers to be assigned for the job execution.
  • spmd_variation - specifies the type of job. By default it is single job.
  • output - specifies the file in which the standard output of the job execution to be stored.
  • error - specifies the file in which the standard error of the job execution to be stored.
  • file_transfers - Files that need to be transferred in order to execute the job successfully. Generally files specific to the job need to be listed here.
compute_unit_description = { "executable": "/bin/echo",
                             "arguments": ["Hello","$ENV1","$ENV2"],
                             "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                             "number_of_processes": 4,            
                             "spmd_variation":"mpi",
                             "output": "stdout.txt",
                             "error": "stderr.txt"
                           }    

Environment Setup and Installation

For the hands-on part of this tutorial, you have to be logged in to Lonestar and have your virtualenv activated. Part 2 of this tutorial explains this in more detail.

Install BigJob

BigJob is available via PyPi and can be installed via easy install by typing:

pip install BigJob

To make sure that your installation works, run the following command to check if the BigJob module can be imported by the interpreter:

python -c "import bigjob; print bigjob.version"

Create BigJob Agent Directory

Prior to running these examples, you will need to create a directory called 'agent' in the same location that you are running your scripts from. BigJob uses this as its working directory. For this tutorial, we'll create the agent directory in the $HOME directory by typing:

mkdir $HOME/agent

Note: For larger jobs on your own account, it is recommended to run your scripts out of $SCRATCH or $WORK, since the agent directory grows quickly.

Hands-On: Submitting Multiple Jobs to Resources using BigJob

Simple Ensembles

The below example submits N jobs using SAGA PilotJob. It demonstrates mapping of a simple echo job using all parameters of a compute unit description.

In your $HOME directory, open a new file simple_ensembles.py with your favorite editor (e.g., vim) and paste the following content:

import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
	
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://[email protected]:6379"

if __name__ == "__main__":

    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    pilot_compute_description=[]

    pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
                                       "number_of_processes": 12,
                                       "allocation": "XSEDE12-SAGA",
                                       "queue": "development",                                      
                                       "working_directory": os.getenv("HOME")+"/agent",
                                       "walltime":10
                                     })

    for pcd in pilot_compute_description:
        pilot_compute_service.create_pilot(pilot_compute_description=pcd)

    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)

    print ("Finished Pilot-Job setup. Submitting compute units")

    # submit compute units
    for i in range(NUMBER_JOBS):
        compute_unit_description = {
                "executable": "/bin/echo",
                "arguments": ["Hello","$ENV1","$ENV2"],
                "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                "number_of_processes": 4,            
                "spmd_variation":"mpi",
                "output": "stdout.txt",
                "error": "stderr.txt",
                }    
        compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)

    print ("Waiting for compute units to complete")
    compute_data_service.wait()

    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

MandelBrot

The below example executes Mandelbrot execution via PilotJob. Make sure you have all the setup needed mentioned in section 5 of **https://github.com/saga-project/BigJob/wiki/XSEDE-Tutorial-Part-2%3A-SAGA SAGA Bliss tutorial to execute Mandelbrot.

Try to find the execution time difference via SAGA Bliss Mandelbrot and PilotJob Mandelbrot.

import os, time, sys
from PIL import Image
import bliss.saga as saga 
from pilot import PilotComputeService, ComputeDataService, State

# the dimension (in pixel) of the whole fractal
imgx = 8192 
imgy = 8192

# the number of tiles in X and Y direction
tilesx = 2
tilesy = 2

	
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://[email protected]:6379"

if __name__ == "__main__":

    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    pilot_compute_description=[]
    
    # copy image tiles back to our 'local' directory
    dirname = 'sftp://localhost/%s/PJ-mbrot/' % os.getenv('SCRATCH')
    workdir = saga.filesystem.Directory(dirname, saga.filesystem.Create)

    pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
                                       "number_of_processes": 12,
                                       "allocation": "XSEDE12-SAGA",
                                       "queue": "development",
                                       "processes_per_node":12,
                                       "working_directory": workdir.get_url().path,
                                       "walltime":10
                                     })

    for pcd in pilot_compute_description:
        pilot_compute_service.create_pilot(pilot_compute_description=pcd)

    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)

    print ("Finished Pilot-Job setup. Submitting compute units")

    # submit compute units
    for x in range(0, tilesx):
            for y in range(0, tilesy):                
                # describe a single Mandelbrot job. we're using the 
                # directory created above as the job's working directory
                outputfile = 'tile_x%s_y%s.png' % (x,y)
                
                compute_unit_description = {
                        "executable": "python",
                        "arguments": [os.getenv("HOME")+'/mandelbrot.py', str(imgx), str(imgy), 
                                        str(imgx/tilesx*x), str(imgx/tilesx*(x+1)),
                                        str(imgy/tilesy*y), str(imgy/tilesy*(y+1)),
                                        outputfile],
                        "number_of_processes": 1,    
                        "working_directory":workdir.get_url().path,        
                        "output": "stdout_x%s_y%s.txt" % (x,y),
                        "error": "stderr_x%s_y%s.txt" % (x,y),
                        }    
                compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)

    print ("Waiting for compute units to complete")
    compute_data_service.wait()
                


    for image in workdir.list('*.png'):
        print ' * Copying %s/%s back to %s' % (workdir.get_url(), image, os.getcwd())
        workdir.copy(image, 'sftp://localhost/%s/' % os.getcwd()) 

    # stitch together the final image
    fullimage = Image.new('RGB',(imgx, imgy),(255,255,255))
    print ' * Stitching together the whole fractal: mandelbrot_full.png'
    for x in range(0, tilesx):
        for y in range(0, tilesy):
            partimage = Image.open('tile_x%s_y%s.png' % (x, y))
            fullimage.paste(partimage, (imgx/tilesx*x, imgy/tilesy*y, imgx/tilesx*(x+1), imgy/tilesy*(y+1)) )
    fullimage.save("mandelbrot_full.png", "PNG")

    print ("Waiting for compute units to complete")
    compute_data_service.wait()

    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Chained Ensembles

The below example submit set of echo jobs(A) using SAGA PilotJob, and for every successful job (with state Done) it submit another /bin/echo job to the same PilotJob.

In your $HOME directory, open a new file chained_example.py with your favorite editor (e.g., vim) and paste the following content:

import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
import pdb


### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://[email protected]:6379"

if __name__ == "__main__":

    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    pilot_compute_description=[]
    all_A_cus = []

    pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
                                       "number_of_processes": 12,
                                       "allocation": "XSEDE12-SAGA",
                                       "queue": "development",
                                       "working_directory": os.getenv("HOME")+"/agent",
                                       "walltime":10,
                                     })

    for pcd in pilot_compute_description:
        pilot_compute_service.create_pilot(pilot_compute_description=pcd)

    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)

    print ("Finished Pilot-Job setup. Submit compute units")
    # submit Set A compute units
    for i in range(NUMBER_JOBS):
        compute_unit_description = { "executable": "/bin/echo",
                                      "arguments": ["Hello","$ENV1","$ENV2"],
                                      "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                                      "number_of_processes": 4,            
                                      "spmd_variation":"mpi",
                                      "output": "A_stdout.txt",
                                      "error": "A_stderr.txt"
                                    }    
        compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
        all_A_cus.append(compute_unit)

    # Chaining tasks i.e submit a compute unit, when compute unit from A is successfully executed.

    while 1:
        for i in all_A_cus:
            if i.get_state() == "Done":
                compute_unit_description = { "executable": "/bin/echo",
                                             "arguments": ["$ENV1","$ENV2"],
                                             "environment": ['ENV1=task_B'+str(i),'ENV2=after_task_A'+str(i)],
                                             "number_of_processes": 2,
                                             "spmd_variation":"mpi",
                                             "output": "B_stdout.txt",
                                             "error": "B_stderr.txt"
                                           }
                compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
                all_A_cus.remove(i)
    
        if len(all_A_cus) == 0:
            break
 
    # Wait for set B jobs.
    compute_data_service.wait()

    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Coupled Ensembles

The script provides a simple workflow which submit a set of jobs(A) and jobs(B) and wait until they are completed and then submits set of jobs(C). It demonstrates synchronization mechanisms provided by SAGA Pilot-API.

In your $HOME directory, open a new file coupled_ensembles.py with your favorite editor (e.g., vim) and paste the following content:

import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
    	
### This is the number of jobs you want to run
NUMBER_JOBS=4
COORDINATION_URL = "redis://[email protected]:6379"
    
if __name__ == "__main__":
    
    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    pilot_compute_description=[]
    
    pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
                                       "number_of_processes": 12,
                                       "allocation": "XSEDE12-SAGA",
                                       "queue": "development",
                                       "working_directory": os.getenv("HOME")+"/agent",
                                       "walltime":10
                                     })
    
    for pcd in pilot_compute_description:
        pilot_compute_service.create_pilot(pilot_compute_description=pcd)
    
    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)
    
    print ("Finished Pilot-Job setup. Submitting compute units")
    
    # submit a set of CUs, call it A
    for i in range(NUMBER_JOBS):
        compute_unit_description = { "executable": "/bin/echo",
                                     "arguments": ["Hello","$ENV1","$ENV2"],
                                     "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                                     "number_of_processes": 4,            
                                     "spmd_variation":"mpi",
                                     "output": "A_stdout.txt",
                                     "error": "A_stderr.txt"
                                    }    
        compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
    
    
    # submit a set of CUs, call it B
    for i in range(NUMBER_JOBS):
        compute_unit_description = { "executable": "/bin/date",
                                     "arguments": [],
                                     "environment": {},
                                     "number_of_processes": 1,
                                     "spmd_variation":"single",
                                     "output": "B_stdout.txt",
                                     "error": "B_stderr.txt",
                                    }
        compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
     
    print ("Wait for CUs of task set A & B to complete")
    compute_data_service.wait()
    
    # submit a set of CUs, call it C
    for i in range(NUMBER_JOBS):
        compute_unit_description = { "executable": "/bin/echo",
                                     "arguments": ["Hello","$ENV1","$ENV2"],
                                     "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                                     "number_of_processes": 1,
                                     "spmd_variation":"single",
                                     "output": "C_stdout.txt",
                                     "error": "C_stderr.txt",
                                    }
        compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
    
    print ("Wait for CUs of task set C to complete")
    compute_data_service.wait()
    
    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Scaling jobs

The script provides an example to submit jobs to multiple pilots. The Pilot-API manages the jobs across multiple pilots launched on same/different machines.

In your $HOME directory, open a new file scaling_jobs.py with your favorite editor (e.g., vim) and paste the following content:

import os
import time
import sys
from pilot import PilotComputeService, ComputeDataService, State
    
COORDINATION_URL = "redis://[email protected]:6379"
    	
    
### This is the number of jobs you want to run
NUMBER_JOBS=24
    
if __name__ == "__main__":
    
    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    pilot_compute_description=[]
    
    pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
                                       "number_of_processes": 12,
                                       "allocation": "XSEDE12-SAGA",
    	                               "queue": "development",
                                       "working_directory": os.getenv("HOME")+"/agent",
                                       "walltime":10,
                                     })
    
    pilot_compute_description.append({ "service_url": "sge+ssh://localhost",
                                       "number_of_processes": 12,
                                       "allocation": "XSEDE12-SAGA",
                                       "queue": "development",
                                       "working_directory": os.getenv("HOME")+"/agent",
                                       "walltime":10
                                     })
    
    for pcd in pilot_compute_description:
        pilot_compute_service.create_pilot(pilot_compute_description=pcd)
    
    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)
    
    print ("Finished Pilot-Job setup. Submitting compute units")
    
    # submit compute units
    for i in range(NUMBER_JOBS):
        compute_unit_description = {"executable": "/bin/echo",
                                    "arguments": ["Hello","$ENV1","$ENV2"],
                                    "environment": ['ENV1=env_arg1','ENV2=env_arg2'],
                                    "number_of_processes": 4,            
                                    "spmd_variation":"mpi",
                                    "output": "stdout.txt",
                                    "error": "stderr.txt",
                                    }    
        compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
    
    print ("Waiting for compute units to complete")
    compute_data_service.wait()
    
    print ("Terminate Pilot Jobs")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()

Compute Data

The example demonstrates the usage of In your $HOME directory, open a new file ensemble_data.py with your favorite editor (e.g., vim) and paste the following content:

import sys
import os
import time
import logging
from pilot import PilotComputeService, ComputeDataService, State
    
COORDINATION_URL = "redis://[email protected]:6379"
    
if __name__ == "__main__":      
    pilot_compute_service = PilotComputeService(COORDINATION_URL)
    pilot_compute_description = []
    
    # create pilot job service and initiate a pilot job
    pilot_compute_description.append({ "service_url": 'ssh://localhost',
                                       "number_of_processes": 12,                             
                                       "working_directory": os.getenv("HOME")+"/agent"
                                    })
    pilot_compute_description.append({ "service_url": 'ssh://ranger',
                                       "number_of_processes": 16,
                                       "working_directory": os.getenv("HOME")+"/agent"
                                    })
    
    for pcd in pilot_compute_description:
        pilotjob = pilot_compute_service.create_pilot(pilot_compute_description=pcd)
    
    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)
        
    # Submit 24 compute units
    for i in range(24):
        compute_unit_description = { "executable": "/bin/cat",
                                     "arguments": ["test.txt"],
                                     "number_of_processes": 1,
                                     "output": "stdout.txt",
                                     "error": "stderr.txt",   
                                     "file_transfer": ["ssh://" + os.path.dirname(os.path.abspath(__file__)) + "/test.txt > BIGJOB_WORK_DIR"]
                                    }    
        compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
    
    logging.debug("Finished submission. Waiting for completion of CU")
    compute_data_service.wait()
            
    logging.debug("Terminate Pilot Compute Service")
    compute_data_service.cancel()    
    pilot_compute_service.cancel()