Skip to content

Commit

Permalink
Improved docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewssobral committed Mar 27, 2024
1 parent 111cbca commit 6c6bafc
Showing 1 changed file with 90 additions and 90 deletions.
180 changes: 90 additions & 90 deletions proactive/ProactiveGateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@

class ProActiveGateway:
"""
This class provides a client for interacting with the ProActive scheduler and resource manager.
It offers methods to create and manage jobs and tasks, handle data transfers, and execute workflows.
Simple client for the ProActive scheduler REST API
See also https://try.activeeon.com/doc/rest/
"""

def __init__(self, base_url, debug=False, javaopts=[], log4j_props_file=None, log4py_props_file=None):
"""
Create a Proactive Gateway
Initializes a new instance of the ProActiveGateway class.
:param base_url: The Proactive server base URL
:param debug: If set True, the gateway will be created with the debug mode
:param javaopts: Some additional java options
:param log4j_props_file: The log4j properties file path
:param log4py_props_file: The log4py properties file path
:param base_url: The base URL of the ProActive server.
:param debug: If True, enables debug mode which provides additional logging information.
:param javaopts: A list of additional options to pass to the Java virtual machine.
:param log4j_props_file: The path to the log4j properties file for configuring logging.
:param log4py_props_file: The path to the log4py properties file for configuring logging.
"""
self.root_dir = os.path.dirname(os.path.abspath(__file__))
self.current_path = self.root_dir + "/java/lib/*"
Expand Down Expand Up @@ -88,12 +91,12 @@ def __init__(self, base_url, debug=False, javaopts=[], log4j_props_file=None, lo

def connect(self, username=None, password=None, credentials_path=None, insecure=True):
"""
Connect to a Proactive server
Connects to the ProActive server using either provided credentials or a credentials file.
:param username: A valid username
:param password: A valid password
:param credentials_path: A credentials file path
:param insecure: If set True, the gateway will connect in insecure mode
:param username: The username for authentication.
:param password: The password for authentication.
:param credentials_path: The path to a credentials file.
:param insecure: If True, connects without verifying the SSL certificate. Use with caution.
"""
credentials_file = None
if credentials_path is not None:
Expand All @@ -115,15 +118,15 @@ def connect(self, username=None, password=None, credentials_path=None, insecure=

def isConnected(self):
"""
Verify if the gateway is connected to the ProActive server
Checks if the gateway is currently connected to the ProActive server.
:return: True or False
:return: True if connected, False otherwise.
"""
return self.proactive_scheduler_client.isConnected()

def disconnect(self):
"""
Disconnect the gateway from the ProActive server
Disconnects the gateway from the ProActive server and cleans up resources.
"""
self.logger.debug('Disconnecting from the ProActive server')
self.proactive_scheduler_client.disconnect()
Expand All @@ -141,7 +144,7 @@ def reconnect(self):

def terminate(self):
"""
Terminate the connection
Terminates the connection to the ProActive server and cleans up resources.
"""
self.proactive_rest_api.disconnect()
self.proactive_scheduler_client.terminate()
Expand All @@ -155,7 +158,7 @@ def terminate(self):

def close(self):
"""
Cleanup: disconnect and terminate
A convenience method for disconnecting and then terminating the gateway.
"""
self.disconnect()
self.terminate()
Expand All @@ -172,13 +175,13 @@ def getBucket(self, bucket_name):

def submitWorkflowFromCatalog(self, bucket_name, workflow_name, workflow_variables={}, workflow_generic_info={}):
"""
Submit a job from the catalog to the scheduler
Submits a job from the ProActive catalog to the scheduler.
:param bucket_name: The bucket in which the workflow is saved
:param workflow_name: The workflow name
:param workflow_variables: The workflow input variables
:param workflow_generic_info: The workflow generic information
:return: The submitted job id
:param bucket_name: The name of the bucket containing the workflow.
:param workflow_name: The name of the workflow to be submitted.
:param workflow_variables: A dictionary of variables to be passed to the workflow.
:param workflow_generic_info: A dictionary of generic information to be associated with the job.
:return: The ID of the submitted job.
"""
workflow_variables_java_map = MapConverter().convert(workflow_variables, self.runtime_gateway._gateway_client)
workflow_generic_info_java_map = MapConverter().convert(workflow_generic_info, self.runtime_gateway._gateway_client)
Expand Down Expand Up @@ -228,10 +231,10 @@ def createTask(self, language=None, task_name=''):

def createPythonTask(self, task_name=''):
"""
Create a workflow Python task
Creates a new task designed to execute Python scripts.
:param task_name: The task name
:return: A Python ProactiveTask object
:param task_name: The name of the task to be created.
:return: A ProactiveTask object set up for Python script execution.
"""
self.logger.info('Creating a Python task')
return ProactivePythonTask(task_name)
Expand Down Expand Up @@ -329,10 +332,10 @@ def createPostScript(self, language=None):

def createJob(self, job_name=''):
"""
Create a Proactive Job
Creates a new job with the specified name.
:param job_name: The job name
:return: A ProactiveJob object
:param job_name: The name of the job to be created.
:return: A ProactiveJob object representing the newly created job.
"""
self.logger.info('Creating a job')
return ProactiveJob(job_name)
Expand All @@ -350,11 +353,11 @@ def buildJob(self, job_model, debug=False):

def submitJob(self, job_model, debug=False):
"""
Submit a job to the Proactive Scheduler
Submits a job to the ProActive Scheduler.
:param job_model: A valid job model
:param debug: If set True, the submitted job will be printed for a debugging purpose
:return: The submitted job ID
:param job_model: The job model to be submitted.
:param debug: If True, prints the job configuration for debugging purposes.
:return: The ID of the submitted job.
"""
proactive_job = self.buildJob(job_model, debug)
self.logger.info('Submitting the job' + job_model.getJobName())
Expand Down Expand Up @@ -463,38 +466,38 @@ def getTaskStatus(self, job_id, task_name):

def getJobState(self, job_id):
"""
Get the job state
Retrieves the current state of the specified job.
:param job_id: A valid job ID
:return: The job state
:param job_id: The ID of the job.
:return: The state of the job.
"""
return self.proactive_scheduler_client.getJobState(str(job_id))

def getJobStatus(self, job_id):
"""
Get the job status
Retrieves the status of the specified job.
:param job_id: A valid job ID
:return: The job status
:param job_id: The ID of the job to check.
:return: The status of the job as a string.
"""
return str(self.getJobState(str(job_id)).getJobInfo().getStatus().toString())

def isJobFinished(self, job_id):
"""
Verify if a job is finished
Checks if the specified job has finished execution.
:param job_id: A valid job ID
:return: True or False
:param job_id: The ID of the job to check.
:return: True if the job is finished, False otherwise.
"""
return self.proactive_scheduler_client.isJobFinished(str(job_id))

def isTaskFinished(self, job_id, task_name):
"""
Verify if a task is finished
Checks if the specified task within a job has finished execution.
:param job_id: A valid job ID
:param task_name: A valid task name
:return: True or False
:param job_id: The ID of the job containing the task.
:param task_name: The name of the task to check.
:return: True if the task is finished, False otherwise.
"""
return self.proactive_scheduler_client.isTaskFinished(str(job_id), task_name)

Expand All @@ -509,32 +512,32 @@ def getJobInfo(self, job_id):

def waitForJob(self, job_id, timeout=60000):
"""
Wait until the job is finished
Waits for a job to finish execution within the specified timeout.
:param job_id: A valid job ID
:param timeout: A timeout in milliseconds
:return: The job info
:param job_id: The ID of the job to wait for.
:param timeout: The timeout in milliseconds. Default is 60000 milliseconds (1 minute).
:return: The job info upon completion.
"""
return self.proactive_scheduler_client.waitForJob(str(job_id), timeout)

def getAllJobs(self, max_number_of_jobs=1000, my_jobs_only=False, pending=False, running=True, finished=False,
withIssuesOnly=False, child_jobs=True, job_name=None, project_name=None, user_name=None, tenant=None, parent_id=None):
"""
Get all jobs from the ProActive scheduler
:param my_jobs_only: Get only my jobs
:param pending: Include jobs in PENDING state
:param running: Include jobs in RUNNING state
:param finished: Include jobs in FINISHED state
:param withIssuesOnly: Include only jobs with issues
:param child_jobs: Include child jobs
:param job_name: Get jobs with specific job name
:param project_name: Get jobs with specific project name
:param user_name: Get jobs of specific user
:param tenant: Get jobs having specific tenant
:param parent_id: Get jobs related to a specific parent job id
:param max_number_of_jobs: The maximal number of retrieved jobs
:return: A list of jobs
Retrieves a list of jobs from the ProActive scheduler based on the specified filters.
:param max_number_of_jobs: The maximum number of jobs to retrieve.
:param my_jobs_only: If True, only retrieves jobs submitted by the current user.
:param pending: If True, includes jobs in PENDING state.
:param running: If True, includes jobs in RUNNING state.
:param finished: If True, includes jobs in FINISHED state.
:param withIssuesOnly: If True, only includes jobs that have issues.
:param child_jobs: If True, includes child jobs.
:param job_name: Filters jobs by name.
:param project_name: Filters jobs by project name.
:param user_name: Filters jobs by the submitting user's name.
:param tenant: Filters jobs by tenant.
:param parent_id: Filters jobs by parent job ID.
:return: A list of jobs matching the specified filters.
"""
job_filter_criteria = self.runtime_gateway.jvm.org.ow2.proactive.scheduler.common.JobFilterCriteriaBuilder().myJobsOnly(my_jobs_only).pending(pending).running(running).finished(finished).withIssuesOnly(withIssuesOnly).childJobs(child_jobs).jobName(job_name).projectName(project_name).userName(user_name).tenant(tenant).parentId(parent_id).build()
jobs_page = self.proactive_scheduler_client.getJobs(0, max_number_of_jobs, job_filter_criteria, None)
Expand All @@ -546,15 +549,12 @@ def __decode__(value):

def getJobOutput(self, job_id, timeout=-1):
"""
Retrieves the output of a specified job from the ProActive Scheduler.
This method supports both blocking and non-blocking execution modes.
- In blocking mode (default), it waits indefinitely until the job execution completes and then fetches the job's output.
- In non-blocking mode, specified by a non-negative timeout value, it attempts to retrieve the job's output within the given timeout period.
Parameters:
- job_id: The unique identifier of the job whose output is to be fetched.
- timeout: The maximum time in seconds to wait for job completion before attempting to fetch the output. A negative value indicates an indefinite wait (blocking mode).
Returns:
- The full log output of the job as a string.
Retrieves the output of the specified job. Can operate in blocking or non-blocking mode.
:param job_id: The unique identifier of the job whose output is to be fetched.
:param timeout: The maximum time in seconds to wait for job completion before fetching the output.
A negative value indicates an indefinite wait (blocking mode).
:return: The full log output of the job as a string.
"""
if timeout < 0:
self.logger.debug("Waiting job execution to be finished...")
Expand All @@ -568,11 +568,11 @@ def getJobOutput(self, job_id, timeout=-1):

def getJobResult(self, job_id, timeout=60000):
"""
Get the job result
Retrieves the result of a completed job.
:param job_id: A valid job ID
:param timeout: A timeout in milliseconds
:return: The job results
:param job_id: The ID of the job to fetch the result for.
:param timeout: The timeout in milliseconds for waiting for the job to finish.
:return: The result of the job if available within the timeout period.
"""
self.logger.debug('Getting job\'s results')
job_result = self.proactive_scheduler_client.waitForJob(str(job_id), timeout)
Expand All @@ -587,12 +587,12 @@ def getJobResult(self, job_id, timeout=60000):

def getTaskResult(self, job_id, task_name, timeout=60000):
"""
Get the task result
Retrieves the result of a specified task from a job.
:param job_id: A valid job ID
:param task_name: A valid task name
:param timeout: A timeout in milliseconds
:return: The task results
:param job_id: The ID of the job containing the task.
:param task_name: The name of the task to fetch the result for.
:param timeout: The timeout in milliseconds for waiting for the task to finish.
:return: The result of the task if available within the timeout period.
"""
self.logger.debug('Getting results of the task \'' + task_name + '\'')
task_result = self.proactive_scheduler_client.waitForTask(str(job_id), task_name, timeout)
Expand Down Expand Up @@ -632,11 +632,11 @@ def printTaskOutput(self, job_id, task_name, timeout=60000):

def exportJob2XML(self, job_model, debug=False):
"""
Export the job to an XML file
Exports the specified job to an XML representation.
:param job_model: A valid job model
:param debug: If set True, the created job will be printed for a debugging purpose
:return: The created job as an XML script
:param job_model: The job model to export.
:param debug: If True, prints the job XML for debugging purposes.
:return: The XML representation of the job.
"""
proactive_job = self.buildJob(job_model, debug)
self.logger.info('Transforming the job \'' + job_model.getJobName() + '\' to an XML string')
Expand All @@ -645,11 +645,11 @@ def exportJob2XML(self, job_model, debug=False):

def saveJob2XML(self, job_model, xml_file_path, debug=False):
"""
Save a job to an external XML file
Saves the specified job model to an XML file.
:param job_model: A valid job model
:param xml_file_path: A valid XML file path
:param debug: If set True, the created job will be printed for a debugging purpose
:param job_model: The job model to save.
:param xml_file_path: The file path where the XML should be saved.
:param debug: If True, prints the job XML to the console for debugging purposes.
"""
self.logger.info('Saving the job \'' + job_model.getJobName() + '\' to the XML file \'' + xml_file_path + '\'')
job_xml_data = self.exportJob2XML(job_model, debug)
Expand Down

0 comments on commit 6c6bafc

Please sign in to comment.