diff --git a/examples/pq_ray.py b/examples/pq_ray.py index f7151a8..7041d0d 100644 --- a/examples/pq_ray.py +++ b/examples/pq_ray.py @@ -1,9 +1,11 @@ import os +print("PYTHONPATH:", os.getenv('PYTHONPATH')) +import sys import socket import time import pennylane as qml -import pilot.pilot_compute_service import ray +import pilot from pilot.pilot_compute_service import PilotComputeService RESOURCE_URL_HPC = "slurm://localhost" @@ -19,6 +21,7 @@ "walltime": 30, "type": "ray", "project": "m4408", + "conda_environment": "/pscratch/sd/l/luckow/conda/pilot-quantum", "scheduler_script_commands": ["#SBATCH --constraint=cpu"] } diff --git a/pilot/job/slurm.py b/pilot/job/slurm.py index f35a28d..4981b5c 100644 --- a/pilot/job/slurm.py +++ b/pilot/job/slurm.py @@ -77,62 +77,73 @@ def __init__(self, job_description, resource_url): self.pilot_compute_description['queue'] = job_description['queue'] logging.debug("Queue: %s"%self.pilot_compute_description['queue']) + + # Defaults + self.pilot_compute_description['number_of_nodes'] = 1 + self.pilot_compute_description['cores_per_node'] = 8 - if 'qos' in job_description: - self.pilot_compute_description['qos'] = job_description['qos'] - + self.pilot_compute_description.update(job_description) - if 'project' in job_description: - self.pilot_compute_description['project'] = job_description['project'] + self.pilot_compute_description['number_cores'] = self.pilot_compute_description['cores_per_node'] * self.pilot_compute_description['number_of_nodes'] - if 'reservation' in job_description: - self.pilot_compute_description['reservation'] = job_description['reservation'] + self.working_directory = self.pilot_compute_description["working_directory"] - self.pilot_compute_description['working_directory'] = os.getcwd() - if 'working_directory' in job_description: - self.pilot_compute_description['working_directory'] = job_description['working_directory'] + ### convert walltime in minutes to SLURM representation of time ### + walltime_slurm = "01:00:00" + if "walltime" in self.pilot_compute_description: + hrs = math.floor(int(self.pilot_compute_description["walltime"]) / 60) + minu = int(self.pilot_compute_description["walltime"]) % 60 + walltime_slurm = "" + str(hrs) + ":" + str(minu) + ":00" + self.pilot_compute_description["walltime_slurm"]=walltime_slurm self.pilot_compute_description['output'] = os.path.join( - self.pilot_compute_description['working_directory'], - "pq-%s.stdout"%self.job_uuid_short) + self.pilot_compute_description['working_directory'], + "%s.stdout"%self.job_uuid_short) + self.pilot_compute_description['error'] = os.path.join(self.pilot_compute_description['working_directory'], "%s.stderr"%self.job_uuid_short) - if 'output' in job_description: - self.pilot_compute_description['output'] = job_description['output'] - if 'error' not in job_description: - self.pilot_compute_description['error'] = os.path.join(self.pilot_compute_description['working_directory'], "pq-%s.stderr"%self.job_uuid_short) + # if 'qos' in job_description: + # self.pilot_compute_description['qos'] = job_description['qos'] - if 'error' in job_description: - self.pilot_compute_description['error'] = job_description['error'] - if 'walltime' in job_description: - self.pilot_compute_description['walltime'] = job_description['walltime'] + # if 'project' in job_description: + # self.pilot_compute_description['project'] = job_description['project'] + # if 'reservation' in job_description: + # self.pilot_compute_description['reservation'] = job_description['reservation'] - #if 'number_cores' in job_description: - # self.pilot_compute_description['number_cores'] = job_description['number_cores'] + # self.pilot_compute_description['working_directory'] = os.getcwd() + # if 'working_directory' in job_description: + # self.pilot_compute_description['working_directory'] = job_description['working_directory'] - self.pilot_compute_description['cores_per_node']=48 - if 'cores_per_node' in job_description: - self.pilot_compute_description['cores_per_node'] = int(job_description['cores_per_node']) + + # if 'output' in job_description: + # self.pilot_compute_description['output'] = job_description['output'] - self.pilot_compute_description['number_of_nodes'] = 1 - if 'number_of_nodes' in job_description: - self.pilot_compute_description['number_of_nodes'] = int(job_description['number_of_nodes']) + # if 'error' not in job_description: + # self.pilot_compute_description['error'] = os.path.join(self.pilot_compute_description['working_directory'], "pq-%s.stderr"%self.job_uuid_short) - self.pilot_compute_description['number_cores']=self.pilot_compute_description['cores_per_node'] * self.pilot_compute_description['number_of_nodes'] + # if 'error' in job_description: + # self.pilot_compute_description['error'] = job_description['error'] + + # if 'walltime' in job_description: + # self.pilot_compute_description['walltime'] = job_description['walltime'] - self.working_directory = self.pilot_compute_description["working_directory"] - ### convert walltime in minutes to SLURM representation of time ### - walltime_slurm = "01:00:00" - if "walltime" in self.pilot_compute_description: - hrs = math.floor(int(self.pilot_compute_description["walltime"]) / 60) - minu = int(self.pilot_compute_description["walltime"]) % 60 - walltime_slurm = "" + str(hrs) + ":" + str(minu) + ":00" - self.pilot_compute_description["walltime_slurm"]=walltime_slurm - self.pilot_compute_description["scheduler_script_commands"] = \ - job_description.get("scheduler_script_commands", []) + # #if 'number_cores' in job_description: + # # self.pilot_compute_description['number_cores'] = job_description['number_cores'] + + + # if 'cores_per_node' in job_description: + # self.pilot_compute_description['cores_per_node'] = int(job_description['cores_per_node']) + + + # if 'number_of_nodes' in job_description: + # self.pilot_compute_description['number_of_nodes'] = int(job_description['number_of_nodes']) + + + # self.pilot_compute_description["scheduler_script_commands"] = \ + # job_description.get("scheduler_script_commands", []) @@ -172,6 +183,9 @@ def run(self): for sc in self.pilot_compute_description["scheduler_script_commands"]: tmp.write("%s\n" % sc) + if "conda_environment" in self.pilot_compute_description: + tmp.write("conda activate %s\n"%self.pilot_compute_description["conda_environment"]) + tmp.write("cd %s\n"%self.pilot_compute_description["working_directory"]) tmp.write("%s\n"%self.command) diff --git a/pilot/plugins/dask/cluster.py b/pilot/plugins/dask/cluster.py index 05a171b..287b29b 100644 --- a/pilot/plugins/dask/cluster.py +++ b/pilot/plugins/dask/cluster.py @@ -63,6 +63,7 @@ def _setup_job(self, pilot_compute_description): return js, jd + def submit_job(self, pilot_compute_description): try: self.pilot_compute_description = pilot_compute_description diff --git a/pilot/plugins/ray/bootstrap_ray.py b/pilot/plugins/ray/bootstrap_ray.py index 3cb36f4..282efdd 100755 --- a/pilot/plugins/ray/bootstrap_ray.py +++ b/pilot/plugins/ray/bootstrap_ray.py @@ -62,7 +62,7 @@ def __init__(self, self.job_id[:12] + "-" + self.job_timestamp + "_output.log"), "w") self.job_error = open(os.path.join(self.working_directory, "pilot-agent-" + - self.job_id[:12] + "-" + self.job_timestamp + "_output.log"), "w") + self.job_id[:12] + "-" + self.job_timestamp + "_error.log"), "w") self.ray_headnode_address = "" self.ray_process = None self.cores_per_node=int(cores_per_node) @@ -179,10 +179,10 @@ def start_ray(self): self.ray_headnode_address = self.nodes[0] # TODO clean conda env - cmd = "conda activate pilot-quantum; ray stop; export RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER=1; ray start --head --dashboard-host=%s --num-cpus=0 --num-gpus=0 --ray-client-server-port=10001"%(self.ray_headnode_address) + cmd = "ray stop; export RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER=1; ray start --head --dashboard-host=%s --num-cpus=0 --num-gpus=0 --ray-client-server-port=10001"%(self.ray_headnode_address) print("Start Ray Head Node with command: %s"%(cmd)) result=execute_ssh_command(host=self.ray_headnode_address, - user=getpass.getuser(), command=cmd, arguments=None, + user=getpass.getuser(), command=cmd, arguments=None, working_directory=self.working_directory, job_output=self.job_output, job_error=self.job_error) @@ -204,8 +204,7 @@ def start_ray(self): #ray.shutdown() #time.sleep(5) - - + print("Ray started.") @@ -321,6 +320,13 @@ def stop(self): ray_config_filename = "ray_config_" + run_timestamp performance_trace_file = open(os.path.join(working_directory, performance_trace_filename), "a") + + + # print conda environment script is running in + print("Conda Environment: %s"%(os.environ["CONDA_DEFAULT_ENV"])) + # print python interpreter path + print("Python Interpreter: %s"%(sys.executable)) + start = time.time() #performance_trace_file.write("start_time, %.5f"%(time.time())) @@ -337,9 +343,7 @@ def stop(self): ray_cluster.shutdown() if options.clean: pass - # directory = "/tmp/zookeeper/" - # logging.debug("delete: " + directory) - # shutil.rmtree(directory) + sys.exit(0) print("Finished launching of Ray Cluster - Sleeping now") diff --git a/pilot/plugins/ray/cluster.py b/pilot/plugins/ray/cluster.py index ccb5296..ed52d9d 100644 --- a/pilot/plugins/ray/cluster.py +++ b/pilot/plugins/ray/cluster.py @@ -99,7 +99,7 @@ def _setup_job(self, pilot_compute_description): return js, jd - # Ray 2.12.0 + # Ray 2.30.0 def submit_job(self, pilot_compute_description=None ): @@ -145,7 +145,9 @@ def wait(self): except IOError as e: print("Ray Client Connect Attempt {} failed".format(i)) time.sleep(5) - elif state == "Failed": + elif state.lower() == "queue": + pass + elif state.lower() == "failed" or state.lower() == "unknown": break time.sleep(6)