From 7bfe942ce08ffbeb1cb46a99c667e23633503b59 Mon Sep 17 00:00:00 2001 From: Pradeep Mantha Date: Tue, 20 Aug 2024 21:29:11 -0700 Subject: [PATCH] scripts to launch dask clusters on slurm --- examples/pq_dask.py | 4 ++-- examples/pq_multi_dask.py | 6 +++--- examples/qugen/pq_qugen.py | 6 +----- pilot/pilot_compute_service.py | 15 +++++++++++---- pilot/plugins/dask/bootstrap_dask.py | 2 +- pilot/plugins/dask/cluster.py | 14 +++++++++++--- 6 files changed, 29 insertions(+), 18 deletions(-) diff --git a/examples/pq_dask.py b/examples/pq_dask.py index 0da7043..b211d0f 100644 --- a/examples/pq_dask.py +++ b/examples/pq_dask.py @@ -11,13 +11,13 @@ "resource": RESOURCE_URL_HPC, "working_directory": WORKING_DIRECTORY, "type": "dask", - "number_of_nodes": 10, + "number_of_nodes": 2, "cores_per_node": 10, } def start_pilot(): - pcs = PilotComputeService() + pcs = PilotComputeService(working_directory=WORKING_DIRECTORY) pcs.create_pilot(pilot_compute_description=pilot_compute_description_dask) return pcs diff --git a/examples/pq_multi_dask.py b/examples/pq_multi_dask.py index 5871c28..c4d99f3 100644 --- a/examples/pq_multi_dask.py +++ b/examples/pq_multi_dask.py @@ -50,12 +50,12 @@ def circuit(parameters): # Submit tasks to pcs tasks = [] - for i in range(1000): + for i in range(10): k = pcs.submit_task(pennylane_quantum_circuit, task_name = f"task_pennylane-{i}" ) tasks.append(k) - for i in range(1000): - k = pcs.submit_task(pennylane_quantum_circuit, task_name = f"task_pennylane-{i}", pilot=pilots[0]) + for i in range(10): + k = pcs.submit_task(pennylane_quantum_circuit, task_name = f"task_{pilots[0]}_pennylane-{i}", pilot=pilots[0]) tasks.append(k) diff --git a/examples/qugen/pq_qugen.py b/examples/qugen/pq_qugen.py index 83a1ff3..642f95c 100644 --- a/examples/qugen/pq_qugen.py +++ b/examples/qugen/pq_qugen.py @@ -80,11 +80,7 @@ def train_model(data_set_name, data, model): dask_client = dask_pilot.get_client() print(dask_client.scheduler_info()) - - # Get Dask client details - dask_client.run() - - a = dask_pilot.submit_task(train_model, data_set_name, data, model) + a = dask_pilot.submit_task("train_model", train_model, data_set_name, data, model) print(f"{a.result()}\n") finally: if dask_pilot: diff --git a/pilot/pilot_compute_service.py b/pilot/pilot_compute_service.py index cb28c35..8936221 100644 --- a/pilot/pilot_compute_service.py +++ b/pilot/pilot_compute_service.py @@ -91,7 +91,7 @@ def create_pilot(self, pilot_compute_description): batch_job = worker_cluster_manager.submit_job(pilot_compute_description) self.pilot_id = batch_job.get_id() - self.metrics_file_name = os.path.join(self.pcs_working_directory, f"{self.pilot_id}-metrics.csv") + self.metrics_file_name = os.path.join(self.pcs_working_directory, "metrics.csv") details = worker_cluster_manager.get_config_data() @@ -158,7 +158,7 @@ def submit_task(self, func, *args, **kwargs): if self.client is None: raise PilotAPIException("Cluster client isn't ready/provisioned yet") - print(f"Running task {task_name} with details func:{func.__name__};args {args};kwargs {kwargs}") + self.logger.info(f"Running task {task_name} with details func:{func.__name__};args {args};kwargs {kwargs}") metrics = { @@ -199,7 +199,11 @@ def task_func(metrics_fn, *args, **kwargs): return result if pilot_scheduled != 'ANY': - task_future = self.client.submit(task_func, self.metrics_file_name, *args, **kwargs, workers=pilot_scheduled) + # find all the wokers in the pilot + workers = self.client.scheduler_info()['workers'] + pilot_workers = [workers[worker]['name'] for worker in workers if workers[worker]['name'].startswith(pilot_scheduled)] + + task_future = self.client.submit(task_func, self.metrics_file_name, *args, **kwargs, workers=pilot_workers) else: task_future = self.client.submit(task_func, self.metrics_file_name, *args, **kwargs) @@ -218,13 +222,16 @@ def run(self, func, *args, **kwargs): if self.client is None: raise PilotAPIException("Cluster client isn't ready/provisioned yet") - print(f"Running qtask with args {args}, kwargs {kwargs}") + self.logger.info(f"Running qtask with args {args}, kwargs {kwargs}") wrapper_func = self.task(func) return wrapper_func(*args, **kwargs).result() def wait_tasks(self, tasks): wait(tasks) + for task in tasks: + if task.done() and task.exception() is not None: + self.logger.info(f"Task {task} completed {task.status} with exception: {task.exception()}") diff --git a/pilot/plugins/dask/bootstrap_dask.py b/pilot/plugins/dask/bootstrap_dask.py index 8b11317..e5c7f86 100755 --- a/pilot/plugins/dask/bootstrap_dask.py +++ b/pilot/plugins/dask/bootstrap_dask.py @@ -154,7 +154,7 @@ def start_dask(self): # self.dask_process = subprocess.Popen(command, shell=True) # print("Dask started.") self.kill_dask_processes_on_nodes(self.nodes) - self.launch_dask_scheduler_via_command_line() + # self.launch_dask_scheduler_via_command_line() self.launch_dask_workers_via_command_line() def check_dask(self): diff --git a/pilot/plugins/dask/cluster.py b/pilot/plugins/dask/cluster.py index 37a47ee..4d8ad7e 100644 --- a/pilot/plugins/dask/cluster.py +++ b/pilot/plugins/dask/cluster.py @@ -63,8 +63,13 @@ def stop_existing_processes(self, process_name): print(f"Error stopping existing schedulers: {e}") def start_scheduler(self): + # Stop any existing Dask workers + self.stop_existing_processes('dask worker') + # Stop any existing Dask schedulers - self.stop_existing_processes('dask') + self.stop_existing_processes('dask scheduler') + + # Start a new Dask scheduler in the background log_file = 'dask_scheduler.log' @@ -190,7 +195,7 @@ def start_dask_workers(self): self.logger.info(f"Starting Dask workers with scheduler address: {self.host}") #Start dask workers on all nodes in background and write the worker address to a file - command = f"dask worker --nthreads {self.pilot_compute_description.get('cores_per_node', 1)} --name {self.pilot_compute_description['name']} --memory-limit 3GB {self.host} &" + command = f"dask worker --nworkers {self.pilot_compute_description.get('number_of_nodes',1)} --nthreads {self.pilot_compute_description.get('cores_per_node', 1)} --name {self.pilot_compute_description['name']} --memory-limit 3GB {self.host} &" self.logger.info(f"Starting worker with command: {command}") subprocess.Popen(command, shell=True) @@ -235,8 +240,11 @@ def cancel(self): time.sleep(2) + # Stop any existing Dask workers + self.stop_existing_processes('dask worker') + # Stop the scheduler - self.stop_existing_processes('dask') + self.stop_existing_processes('dask scheduler') def submit_compute_unit(function_name):