Skip to content

Commit

Permalink
scripts to launch dask clusters on slurm
Browse files Browse the repository at this point in the history
  • Loading branch information
pradeepmantha committed Aug 21, 2024
1 parent 87778e6 commit 7bfe942
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 18 deletions.
4 changes: 2 additions & 2 deletions examples/pq_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
"resource": RESOURCE_URL_HPC,
"working_directory": WORKING_DIRECTORY,
"type": "dask",
"number_of_nodes": 10,
"number_of_nodes": 2,
"cores_per_node": 10,
}


def start_pilot():
pcs = PilotComputeService()
pcs = PilotComputeService(working_directory=WORKING_DIRECTORY)
pcs.create_pilot(pilot_compute_description=pilot_compute_description_dask)
return pcs

Expand Down
6 changes: 3 additions & 3 deletions examples/pq_multi_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ def circuit(parameters):

# Submit tasks to pcs
tasks = []
for i in range(1000):
for i in range(10):
k = pcs.submit_task(pennylane_quantum_circuit, task_name = f"task_pennylane-{i}" )
tasks.append(k)

for i in range(1000):
k = pcs.submit_task(pennylane_quantum_circuit, task_name = f"task_pennylane-{i}", pilot=pilots[0])
for i in range(10):
k = pcs.submit_task(pennylane_quantum_circuit, task_name = f"task_{pilots[0]}_pennylane-{i}", pilot=pilots[0])
tasks.append(k)


Expand Down
6 changes: 1 addition & 5 deletions examples/qugen/pq_qugen.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ def train_model(data_set_name, data, model):
dask_client = dask_pilot.get_client()
print(dask_client.scheduler_info())


# Get Dask client details
dask_client.run()

a = dask_pilot.submit_task(train_model, data_set_name, data, model)
a = dask_pilot.submit_task("train_model", train_model, data_set_name, data, model)
print(f"{a.result()}\n")
finally:
if dask_pilot:
Expand Down
15 changes: 11 additions & 4 deletions pilot/pilot_compute_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def create_pilot(self, pilot_compute_description):
batch_job = worker_cluster_manager.submit_job(pilot_compute_description)
self.pilot_id = batch_job.get_id()

self.metrics_file_name = os.path.join(self.pcs_working_directory, f"{self.pilot_id}-metrics.csv")
self.metrics_file_name = os.path.join(self.pcs_working_directory, "metrics.csv")


details = worker_cluster_manager.get_config_data()
Expand Down Expand Up @@ -158,7 +158,7 @@ def submit_task(self, func, *args, **kwargs):
if self.client is None:
raise PilotAPIException("Cluster client isn't ready/provisioned yet")

print(f"Running task {task_name} with details func:{func.__name__};args {args};kwargs {kwargs}")
self.logger.info(f"Running task {task_name} with details func:{func.__name__};args {args};kwargs {kwargs}")


metrics = {
Expand Down Expand Up @@ -199,7 +199,11 @@ def task_func(metrics_fn, *args, **kwargs):
return result

if pilot_scheduled != 'ANY':
task_future = self.client.submit(task_func, self.metrics_file_name, *args, **kwargs, workers=pilot_scheduled)
# find all the wokers in the pilot
workers = self.client.scheduler_info()['workers']
pilot_workers = [workers[worker]['name'] for worker in workers if workers[worker]['name'].startswith(pilot_scheduled)]

task_future = self.client.submit(task_func, self.metrics_file_name, *args, **kwargs, workers=pilot_workers)
else:
task_future = self.client.submit(task_func, self.metrics_file_name, *args, **kwargs)

Expand All @@ -218,13 +222,16 @@ def run(self, func, *args, **kwargs):
if self.client is None:
raise PilotAPIException("Cluster client isn't ready/provisioned yet")

print(f"Running qtask with args {args}, kwargs {kwargs}")
self.logger.info(f"Running qtask with args {args}, kwargs {kwargs}")
wrapper_func = self.task(func)
return wrapper_func(*args, **kwargs).result()

def wait_tasks(self, tasks):
wait(tasks)

for task in tasks:
if task.done() and task.exception() is not None:
self.logger.info(f"Task {task} completed {task.status} with exception: {task.exception()}")



Expand Down
2 changes: 1 addition & 1 deletion pilot/plugins/dask/bootstrap_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def start_dask(self):
# self.dask_process = subprocess.Popen(command, shell=True)
# print("Dask started.")
self.kill_dask_processes_on_nodes(self.nodes)
self.launch_dask_scheduler_via_command_line()
# self.launch_dask_scheduler_via_command_line()
self.launch_dask_workers_via_command_line()

def check_dask(self):
Expand Down
14 changes: 11 additions & 3 deletions pilot/plugins/dask/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ def stop_existing_processes(self, process_name):
print(f"Error stopping existing schedulers: {e}")

def start_scheduler(self):
# Stop any existing Dask workers
self.stop_existing_processes('dask worker')

# Stop any existing Dask schedulers
self.stop_existing_processes('dask')
self.stop_existing_processes('dask scheduler')



# Start a new Dask scheduler in the background
log_file = 'dask_scheduler.log'
Expand Down Expand Up @@ -190,7 +195,7 @@ def start_dask_workers(self):
self.logger.info(f"Starting Dask workers with scheduler address: {self.host}")

#Start dask workers on all nodes in background and write the worker address to a file
command = f"dask worker --nthreads {self.pilot_compute_description.get('cores_per_node', 1)} --name {self.pilot_compute_description['name']} --memory-limit 3GB {self.host} &"
command = f"dask worker --nworkers {self.pilot_compute_description.get('number_of_nodes',1)} --nthreads {self.pilot_compute_description.get('cores_per_node', 1)} --name {self.pilot_compute_description['name']} --memory-limit 3GB {self.host} &"
self.logger.info(f"Starting worker with command: {command}")
subprocess.Popen(command, shell=True)

Expand Down Expand Up @@ -235,8 +240,11 @@ def cancel(self):

time.sleep(2)

# Stop any existing Dask workers
self.stop_existing_processes('dask worker')

# Stop the scheduler
self.stop_existing_processes('dask')
self.stop_existing_processes('dask scheduler')


def submit_compute_unit(function_name):
Expand Down

0 comments on commit 7bfe942

Please sign in to comment.