diff --git a/proactive/ProactiveGateway.py b/proactive/ProactiveGateway.py index 66aea7e..577971f 100644 --- a/proactive/ProactiveGateway.py +++ b/proactive/ProactiveGateway.py @@ -21,19 +21,22 @@ class ProActiveGateway: """ + This class provides a client for interacting with the ProActive scheduler and resource manager. + It offers methods to create and manage jobs and tasks, handle data transfers, and execute workflows. + Simple client for the ProActive scheduler REST API See also https://try.activeeon.com/doc/rest/ """ def __init__(self, base_url, debug=False, javaopts=[], log4j_props_file=None, log4py_props_file=None): """ - Create a Proactive Gateway + Initializes a new instance of the ProActiveGateway class. - :param base_url: The Proactive server base URL - :param debug: If set True, the gateway will be created with the debug mode - :param javaopts: Some additional java options - :param log4j_props_file: The log4j properties file path - :param log4py_props_file: The log4py properties file path + :param base_url: The base URL of the ProActive server. + :param debug: If True, enables debug mode which provides additional logging information. + :param javaopts: A list of additional options to pass to the Java virtual machine. + :param log4j_props_file: The path to the log4j properties file for configuring logging. + :param log4py_props_file: The path to the log4py properties file for configuring logging. """ self.root_dir = os.path.dirname(os.path.abspath(__file__)) self.current_path = self.root_dir + "/java/lib/*" @@ -88,12 +91,12 @@ def __init__(self, base_url, debug=False, javaopts=[], log4j_props_file=None, lo def connect(self, username=None, password=None, credentials_path=None, insecure=True): """ - Connect to a Proactive server + Connects to the ProActive server using either provided credentials or a credentials file. - :param username: A valid username - :param password: A valid password - :param credentials_path: A credentials file path - :param insecure: If set True, the gateway will connect in insecure mode + :param username: The username for authentication. + :param password: The password for authentication. + :param credentials_path: The path to a credentials file. + :param insecure: If True, connects without verifying the SSL certificate. Use with caution. """ credentials_file = None if credentials_path is not None: @@ -115,15 +118,15 @@ def connect(self, username=None, password=None, credentials_path=None, insecure= def isConnected(self): """ - Verify if the gateway is connected to the ProActive server + Checks if the gateway is currently connected to the ProActive server. - :return: True or False + :return: True if connected, False otherwise. """ return self.proactive_scheduler_client.isConnected() def disconnect(self): """ - Disconnect the gateway from the ProActive server + Disconnects the gateway from the ProActive server and cleans up resources. """ self.logger.debug('Disconnecting from the ProActive server') self.proactive_scheduler_client.disconnect() @@ -141,7 +144,7 @@ def reconnect(self): def terminate(self): """ - Terminate the connection + Terminates the connection to the ProActive server and cleans up resources. """ self.proactive_rest_api.disconnect() self.proactive_scheduler_client.terminate() @@ -155,7 +158,7 @@ def terminate(self): def close(self): """ - Cleanup: disconnect and terminate + A convenience method for disconnecting and then terminating the gateway. """ self.disconnect() self.terminate() @@ -172,13 +175,13 @@ def getBucket(self, bucket_name): def submitWorkflowFromCatalog(self, bucket_name, workflow_name, workflow_variables={}, workflow_generic_info={}): """ - Submit a job from the catalog to the scheduler + Submits a job from the ProActive catalog to the scheduler. - :param bucket_name: The bucket in which the workflow is saved - :param workflow_name: The workflow name - :param workflow_variables: The workflow input variables - :param workflow_generic_info: The workflow generic information - :return: The submitted job id + :param bucket_name: The name of the bucket containing the workflow. + :param workflow_name: The name of the workflow to be submitted. + :param workflow_variables: A dictionary of variables to be passed to the workflow. + :param workflow_generic_info: A dictionary of generic information to be associated with the job. + :return: The ID of the submitted job. """ workflow_variables_java_map = MapConverter().convert(workflow_variables, self.runtime_gateway._gateway_client) workflow_generic_info_java_map = MapConverter().convert(workflow_generic_info, self.runtime_gateway._gateway_client) @@ -228,10 +231,10 @@ def createTask(self, language=None, task_name=''): def createPythonTask(self, task_name=''): """ - Create a workflow Python task + Creates a new task designed to execute Python scripts. - :param task_name: The task name - :return: A Python ProactiveTask object + :param task_name: The name of the task to be created. + :return: A ProactiveTask object set up for Python script execution. """ self.logger.info('Creating a Python task') return ProactivePythonTask(task_name) @@ -329,10 +332,10 @@ def createPostScript(self, language=None): def createJob(self, job_name=''): """ - Create a Proactive Job + Creates a new job with the specified name. - :param job_name: The job name - :return: A ProactiveJob object + :param job_name: The name of the job to be created. + :return: A ProactiveJob object representing the newly created job. """ self.logger.info('Creating a job') return ProactiveJob(job_name) @@ -350,11 +353,11 @@ def buildJob(self, job_model, debug=False): def submitJob(self, job_model, debug=False): """ - Submit a job to the Proactive Scheduler + Submits a job to the ProActive Scheduler. - :param job_model: A valid job model - :param debug: If set True, the submitted job will be printed for a debugging purpose - :return: The submitted job ID + :param job_model: The job model to be submitted. + :param debug: If True, prints the job configuration for debugging purposes. + :return: The ID of the submitted job. """ proactive_job = self.buildJob(job_model, debug) self.logger.info('Submitting the job' + job_model.getJobName()) @@ -463,38 +466,38 @@ def getTaskStatus(self, job_id, task_name): def getJobState(self, job_id): """ - Get the job state + Retrieves the current state of the specified job. - :param job_id: A valid job ID - :return: The job state + :param job_id: The ID of the job. + :return: The state of the job. """ return self.proactive_scheduler_client.getJobState(str(job_id)) def getJobStatus(self, job_id): """ - Get the job status + Retrieves the status of the specified job. - :param job_id: A valid job ID - :return: The job status + :param job_id: The ID of the job to check. + :return: The status of the job as a string. """ return str(self.getJobState(str(job_id)).getJobInfo().getStatus().toString()) def isJobFinished(self, job_id): """ - Verify if a job is finished + Checks if the specified job has finished execution. - :param job_id: A valid job ID - :return: True or False + :param job_id: The ID of the job to check. + :return: True if the job is finished, False otherwise. """ return self.proactive_scheduler_client.isJobFinished(str(job_id)) def isTaskFinished(self, job_id, task_name): """ - Verify if a task is finished + Checks if the specified task within a job has finished execution. - :param job_id: A valid job ID - :param task_name: A valid task name - :return: True or False + :param job_id: The ID of the job containing the task. + :param task_name: The name of the task to check. + :return: True if the task is finished, False otherwise. """ return self.proactive_scheduler_client.isTaskFinished(str(job_id), task_name) @@ -509,32 +512,32 @@ def getJobInfo(self, job_id): def waitForJob(self, job_id, timeout=60000): """ - Wait until the job is finished + Waits for a job to finish execution within the specified timeout. - :param job_id: A valid job ID - :param timeout: A timeout in milliseconds - :return: The job info + :param job_id: The ID of the job to wait for. + :param timeout: The timeout in milliseconds. Default is 60000 milliseconds (1 minute). + :return: The job info upon completion. """ return self.proactive_scheduler_client.waitForJob(str(job_id), timeout) def getAllJobs(self, max_number_of_jobs=1000, my_jobs_only=False, pending=False, running=True, finished=False, withIssuesOnly=False, child_jobs=True, job_name=None, project_name=None, user_name=None, tenant=None, parent_id=None): """ - Get all jobs from the ProActive scheduler - - :param my_jobs_only: Get only my jobs - :param pending: Include jobs in PENDING state - :param running: Include jobs in RUNNING state - :param finished: Include jobs in FINISHED state - :param withIssuesOnly: Include only jobs with issues - :param child_jobs: Include child jobs - :param job_name: Get jobs with specific job name - :param project_name: Get jobs with specific project name - :param user_name: Get jobs of specific user - :param tenant: Get jobs having specific tenant - :param parent_id: Get jobs related to a specific parent job id - :param max_number_of_jobs: The maximal number of retrieved jobs - :return: A list of jobs + Retrieves a list of jobs from the ProActive scheduler based on the specified filters. + + :param max_number_of_jobs: The maximum number of jobs to retrieve. + :param my_jobs_only: If True, only retrieves jobs submitted by the current user. + :param pending: If True, includes jobs in PENDING state. + :param running: If True, includes jobs in RUNNING state. + :param finished: If True, includes jobs in FINISHED state. + :param withIssuesOnly: If True, only includes jobs that have issues. + :param child_jobs: If True, includes child jobs. + :param job_name: Filters jobs by name. + :param project_name: Filters jobs by project name. + :param user_name: Filters jobs by the submitting user's name. + :param tenant: Filters jobs by tenant. + :param parent_id: Filters jobs by parent job ID. + :return: A list of jobs matching the specified filters. """ job_filter_criteria = self.runtime_gateway.jvm.org.ow2.proactive.scheduler.common.JobFilterCriteriaBuilder().myJobsOnly(my_jobs_only).pending(pending).running(running).finished(finished).withIssuesOnly(withIssuesOnly).childJobs(child_jobs).jobName(job_name).projectName(project_name).userName(user_name).tenant(tenant).parentId(parent_id).build() jobs_page = self.proactive_scheduler_client.getJobs(0, max_number_of_jobs, job_filter_criteria, None) @@ -546,15 +549,12 @@ def __decode__(value): def getJobOutput(self, job_id, timeout=-1): """ - Retrieves the output of a specified job from the ProActive Scheduler. - This method supports both blocking and non-blocking execution modes. - - In blocking mode (default), it waits indefinitely until the job execution completes and then fetches the job's output. - - In non-blocking mode, specified by a non-negative timeout value, it attempts to retrieve the job's output within the given timeout period. - Parameters: - - job_id: The unique identifier of the job whose output is to be fetched. - - timeout: The maximum time in seconds to wait for job completion before attempting to fetch the output. A negative value indicates an indefinite wait (blocking mode). - Returns: - - The full log output of the job as a string. + Retrieves the output of the specified job. Can operate in blocking or non-blocking mode. + + :param job_id: The unique identifier of the job whose output is to be fetched. + :param timeout: The maximum time in seconds to wait for job completion before fetching the output. + A negative value indicates an indefinite wait (blocking mode). + :return: The full log output of the job as a string. """ if timeout < 0: self.logger.debug("Waiting job execution to be finished...") @@ -568,11 +568,11 @@ def getJobOutput(self, job_id, timeout=-1): def getJobResult(self, job_id, timeout=60000): """ - Get the job result + Retrieves the result of a completed job. - :param job_id: A valid job ID - :param timeout: A timeout in milliseconds - :return: The job results + :param job_id: The ID of the job to fetch the result for. + :param timeout: The timeout in milliseconds for waiting for the job to finish. + :return: The result of the job if available within the timeout period. """ self.logger.debug('Getting job\'s results') job_result = self.proactive_scheduler_client.waitForJob(str(job_id), timeout) @@ -587,12 +587,12 @@ def getJobResult(self, job_id, timeout=60000): def getTaskResult(self, job_id, task_name, timeout=60000): """ - Get the task result + Retrieves the result of a specified task from a job. - :param job_id: A valid job ID - :param task_name: A valid task name - :param timeout: A timeout in milliseconds - :return: The task results + :param job_id: The ID of the job containing the task. + :param task_name: The name of the task to fetch the result for. + :param timeout: The timeout in milliseconds for waiting for the task to finish. + :return: The result of the task if available within the timeout period. """ self.logger.debug('Getting results of the task \'' + task_name + '\'') task_result = self.proactive_scheduler_client.waitForTask(str(job_id), task_name, timeout) @@ -632,11 +632,11 @@ def printTaskOutput(self, job_id, task_name, timeout=60000): def exportJob2XML(self, job_model, debug=False): """ - Export the job to an XML file + Exports the specified job to an XML representation. - :param job_model: A valid job model - :param debug: If set True, the created job will be printed for a debugging purpose - :return: The created job as an XML script + :param job_model: The job model to export. + :param debug: If True, prints the job XML for debugging purposes. + :return: The XML representation of the job. """ proactive_job = self.buildJob(job_model, debug) self.logger.info('Transforming the job \'' + job_model.getJobName() + '\' to an XML string') @@ -645,11 +645,11 @@ def exportJob2XML(self, job_model, debug=False): def saveJob2XML(self, job_model, xml_file_path, debug=False): """ - Save a job to an external XML file + Saves the specified job model to an XML file. - :param job_model: A valid job model - :param xml_file_path: A valid XML file path - :param debug: If set True, the created job will be printed for a debugging purpose + :param job_model: The job model to save. + :param xml_file_path: The file path where the XML should be saved. + :param debug: If True, prints the job XML to the console for debugging purposes. """ self.logger.info('Saving the job \'' + job_model.getJobName() + '\' to the XML file \'' + xml_file_path + '\'') job_xml_data = self.exportJob2XML(job_model, debug)