Skip to content

Commit

Permalink
pending refinements to ray implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
drelu committed Jun 23, 2024
1 parent f63e2a6 commit 4d30530
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 50 deletions.
5 changes: 4 additions & 1 deletion examples/pq_ray.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
print("PYTHONPATH:", os.getenv('PYTHONPATH'))
import sys
import socket
import time
import pennylane as qml
import pilot.pilot_compute_service
import ray
import pilot
from pilot.pilot_compute_service import PilotComputeService

RESOURCE_URL_HPC = "slurm://localhost"
Expand All @@ -19,6 +21,7 @@
"walltime": 30,
"type": "ray",
"project": "m4408",
"conda_environment": "/pscratch/sd/l/luckow/conda/pilot-quantum",
"scheduler_script_commands": ["#SBATCH --constraint=cpu"]
}

Expand Down
92 changes: 53 additions & 39 deletions pilot/job/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,62 +77,73 @@ def __init__(self, job_description, resource_url):
self.pilot_compute_description['queue'] = job_description['queue']

logging.debug("Queue: %s"%self.pilot_compute_description['queue'])

# Defaults
self.pilot_compute_description['number_of_nodes'] = 1
self.pilot_compute_description['cores_per_node'] = 8

if 'qos' in job_description:
self.pilot_compute_description['qos'] = job_description['qos']

self.pilot_compute_description.update(job_description)

if 'project' in job_description:
self.pilot_compute_description['project'] = job_description['project']
self.pilot_compute_description['number_cores'] = self.pilot_compute_description['cores_per_node'] * self.pilot_compute_description['number_of_nodes']

if 'reservation' in job_description:
self.pilot_compute_description['reservation'] = job_description['reservation']
self.working_directory = self.pilot_compute_description["working_directory"]

self.pilot_compute_description['working_directory'] = os.getcwd()
if 'working_directory' in job_description:
self.pilot_compute_description['working_directory'] = job_description['working_directory']
### convert walltime in minutes to SLURM representation of time ###
walltime_slurm = "01:00:00"
if "walltime" in self.pilot_compute_description:
hrs = math.floor(int(self.pilot_compute_description["walltime"]) / 60)
minu = int(self.pilot_compute_description["walltime"]) % 60
walltime_slurm = "" + str(hrs) + ":" + str(minu) + ":00"
self.pilot_compute_description["walltime_slurm"]=walltime_slurm

self.pilot_compute_description['output'] = os.path.join(
self.pilot_compute_description['working_directory'],
"pq-%s.stdout"%self.job_uuid_short)
self.pilot_compute_description['working_directory'],
"%s.stdout"%self.job_uuid_short)
self.pilot_compute_description['error'] = os.path.join(self.pilot_compute_description['working_directory'], "%s.stderr"%self.job_uuid_short)

if 'output' in job_description:
self.pilot_compute_description['output'] = job_description['output']

if 'error' not in job_description:
self.pilot_compute_description['error'] = os.path.join(self.pilot_compute_description['working_directory'], "pq-%s.stderr"%self.job_uuid_short)
# if 'qos' in job_description:
# self.pilot_compute_description['qos'] = job_description['qos']

if 'error' in job_description:
self.pilot_compute_description['error'] = job_description['error']

if 'walltime' in job_description:
self.pilot_compute_description['walltime'] = job_description['walltime']
# if 'project' in job_description:
# self.pilot_compute_description['project'] = job_description['project']

# if 'reservation' in job_description:
# self.pilot_compute_description['reservation'] = job_description['reservation']

#if 'number_cores' in job_description:
# self.pilot_compute_description['number_cores'] = job_description['number_cores']
# self.pilot_compute_description['working_directory'] = os.getcwd()
# if 'working_directory' in job_description:
# self.pilot_compute_description['working_directory'] = job_description['working_directory']

self.pilot_compute_description['cores_per_node']=48
if 'cores_per_node' in job_description:
self.pilot_compute_description['cores_per_node'] = int(job_description['cores_per_node'])

# if 'output' in job_description:
# self.pilot_compute_description['output'] = job_description['output']

self.pilot_compute_description['number_of_nodes'] = 1
if 'number_of_nodes' in job_description:
self.pilot_compute_description['number_of_nodes'] = int(job_description['number_of_nodes'])
# if 'error' not in job_description:
# self.pilot_compute_description['error'] = os.path.join(self.pilot_compute_description['working_directory'], "pq-%s.stderr"%self.job_uuid_short)

self.pilot_compute_description['number_cores']=self.pilot_compute_description['cores_per_node'] * self.pilot_compute_description['number_of_nodes']
# if 'error' in job_description:
# self.pilot_compute_description['error'] = job_description['error']

# if 'walltime' in job_description:
# self.pilot_compute_description['walltime'] = job_description['walltime']

self.working_directory = self.pilot_compute_description["working_directory"]
### convert walltime in minutes to SLURM representation of time ###
walltime_slurm = "01:00:00"
if "walltime" in self.pilot_compute_description:
hrs = math.floor(int(self.pilot_compute_description["walltime"]) / 60)
minu = int(self.pilot_compute_description["walltime"]) % 60
walltime_slurm = "" + str(hrs) + ":" + str(minu) + ":00"
self.pilot_compute_description["walltime_slurm"]=walltime_slurm

self.pilot_compute_description["scheduler_script_commands"] = \
job_description.get("scheduler_script_commands", [])
# #if 'number_cores' in job_description:
# # self.pilot_compute_description['number_cores'] = job_description['number_cores']


# if 'cores_per_node' in job_description:
# self.pilot_compute_description['cores_per_node'] = int(job_description['cores_per_node'])


# if 'number_of_nodes' in job_description:
# self.pilot_compute_description['number_of_nodes'] = int(job_description['number_of_nodes'])


# self.pilot_compute_description["scheduler_script_commands"] = \
# job_description.get("scheduler_script_commands", [])



Expand Down Expand Up @@ -172,6 +183,9 @@ def run(self):
for sc in self.pilot_compute_description["scheduler_script_commands"]:
tmp.write("%s\n" % sc)

if "conda_environment" in self.pilot_compute_description:
tmp.write("conda activate %s\n"%self.pilot_compute_description["conda_environment"])

tmp.write("cd %s\n"%self.pilot_compute_description["working_directory"])
tmp.write("%s\n"%self.command)

Expand Down
1 change: 1 addition & 0 deletions pilot/plugins/dask/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def _setup_job(self, pilot_compute_description):

return js, jd


def submit_job(self, pilot_compute_description):
try:
self.pilot_compute_description = pilot_compute_description
Expand Down
20 changes: 12 additions & 8 deletions pilot/plugins/ray/bootstrap_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self,
self.job_id[:12] + "-" + self.job_timestamp + "_output.log"), "w")
self.job_error = open(os.path.join(self.working_directory,
"pilot-agent-" +
self.job_id[:12] + "-" + self.job_timestamp + "_output.log"), "w")
self.job_id[:12] + "-" + self.job_timestamp + "_error.log"), "w")
self.ray_headnode_address = ""
self.ray_process = None
self.cores_per_node=int(cores_per_node)
Expand Down Expand Up @@ -179,10 +179,10 @@ def start_ray(self):

self.ray_headnode_address = self.nodes[0]
# TODO clean conda env
cmd = "conda activate pilot-quantum; ray stop; export RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER=1; ray start --head --dashboard-host=%s --num-cpus=0 --num-gpus=0 --ray-client-server-port=10001"%(self.ray_headnode_address)
cmd = "ray stop; export RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER=1; ray start --head --dashboard-host=%s --num-cpus=0 --num-gpus=0 --ray-client-server-port=10001"%(self.ray_headnode_address)
print("Start Ray Head Node with command: %s"%(cmd))
result=execute_ssh_command(host=self.ray_headnode_address,
user=getpass.getuser(), command=cmd, arguments=None,
user=getpass.getuser(), command=cmd, arguments=None,
working_directory=self.working_directory,
job_output=self.job_output, job_error=self.job_error)

Expand All @@ -204,8 +204,7 @@ def start_ray(self):

#ray.shutdown()
#time.sleep(5)



print("Ray started.")


Expand Down Expand Up @@ -321,6 +320,13 @@ def stop(self):
ray_config_filename = "ray_config_" + run_timestamp
performance_trace_file = open(os.path.join(working_directory,
performance_trace_filename), "a")


# print conda environment script is running in
print("Conda Environment: %s"%(os.environ["CONDA_DEFAULT_ENV"]))
# print python interpreter path
print("Python Interpreter: %s"%(sys.executable))

start = time.time()
#performance_trace_file.write("start_time, %.5f"%(time.time()))

Expand All @@ -337,9 +343,7 @@ def stop(self):
ray_cluster.shutdown()
if options.clean:
pass
# directory = "/tmp/zookeeper/"
# logging.debug("delete: " + directory)
# shutil.rmtree(directory)

sys.exit(0)

print("Finished launching of Ray Cluster - Sleeping now")
Expand Down
6 changes: 4 additions & 2 deletions pilot/plugins/ray/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _setup_job(self, pilot_compute_description):

return js, jd

# Ray 2.12.0
# Ray 2.30.0
def submit_job(self,
pilot_compute_description=None
):
Expand Down Expand Up @@ -145,7 +145,9 @@ def wait(self):
except IOError as e:
print("Ray Client Connect Attempt {} failed".format(i))
time.sleep(5)
elif state == "Failed":
elif state.lower() == "queue":
pass
elif state.lower() == "failed" or state.lower() == "unknown":
break
time.sleep(6)

Expand Down

0 comments on commit 4d30530

Please sign in to comment.