From 6c6ff7c27c53627f8832aa943d113251f8868e35 Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Mon, 3 Apr 2023 08:25:02 -0500 Subject: [PATCH 1/9] Tracking number of task on wach worker to schedule tasks --- .../core/backends/mpi/core/communication.py | 4 +- .../core/backends/mpi/core/controller/api.py | 69 ++++++++++++++++++- unidist/core/backends/mpi/core/monitor.py | 6 ++ 3 files changed, 74 insertions(+), 5 deletions(-) diff --git a/unidist/core/backends/mpi/core/communication.py b/unidist/core/backends/mpi/core/communication.py index ce65912f..3e3c3d4e 100755 --- a/unidist/core/backends/mpi/core/communication.py +++ b/unidist/core/backends/mpi/core/communication.py @@ -181,7 +181,7 @@ def mpi_send_object(comm, data, dest_rank): comm.send(data, dest=dest_rank) -def mpi_isend_object(comm, data, dest_rank): +def mpi_isend_object(comm, data, dest_rank, tag=0): """ Send Python object to another MPI rank in a non-blocking way. @@ -199,7 +199,7 @@ def mpi_isend_object(comm, data, dest_rank): object A handler to MPI_Isend communication result. """ - return comm.isend(data, dest=dest_rank) + return comm.isend(data, dest=dest_rank, tag=tag) def mpi_send_buffer(comm, buffer_size, buffer, dest_rank): diff --git a/unidist/core/backends/mpi/core/controller/api.py b/unidist/core/backends/mpi/core/controller/api.py index 11a7a6d5..80c8b474 100644 --- a/unidist/core/backends/mpi/core/controller/api.py +++ b/unidist/core/backends/mpi/core/controller/api.py @@ -8,7 +8,9 @@ import atexit import signal import asyncio +import time from collections import defaultdict +import threading try: import mpi4py @@ -49,6 +51,54 @@ topology = dict() # The global variable is responsible for if MPI backend has already been initialized is_mpi_initialized = False +threads=[] +BACKOFF = 0.001 +exitFlag = False +def _getopt_backoff(options): + backoff = options.get("backoff") + if backoff is None: + backoff = BACKOFF + return float(backoff) + + +class Backoff: + def __init__(self, seconds=BACKOFF): + self.tval = 0.0 + self.tmax = max(float(seconds), 0.0) + self.tmin = self.tmax / (1 << 10) + + def reset(self): + self.tval = 0.0 + + def sleep(self): + time.sleep(self.tval) + self.tval = min(self.tmax, max(self.tmin, self.tval * 2)) + + +class myThread(threading.Thread): + def __init__(self, threadID, name, comm): + threading.Thread.__init__(self, daemon=True) + self.threadID = threadID + self.name = name + self.comm = comm + + def run(self): + print("Starting " + self.name) + poll_tasks_completed(self.name, self.comm) + print("Exiting " + self.name) + + +def poll_tasks_completed(threadName, comm): + global task_per_worker,exitFlag + backoff = Backoff() + while not exitFlag: + if comm.iprobe(source=communication.MPIRank.MONITOR, tag=1): + task_completed = comm.recv(source=communication.MPIRank.MONITOR, tag=1) + task_per_worker[task_completed] -= 1 + backoff.reset() + else: + backoff.sleep() + def init(): @@ -122,6 +172,13 @@ def init(): # path for spawned MPI processes to be merged with the parent communicator if parent_comm != MPI.COMM_NULL: comm = parent_comm.Merge(high=True) + global task_per_worker + if rank == 0 and not threads and parent_comm == MPI.COMM_NULL: + thread = myThread(1, "tName",comm) + thread.start() + threads.append(thread) + world_size = comm.Get_size() + task_per_worker = {k: 0 for k in range(2,world_size)} mpi_state = communication.MPIState.get_instance( comm, comm.Get_rank(), comm.Get_size() @@ -179,7 +236,11 @@ def shutdown(): ----- Sends cancelation operation to all workers and monitor processes. """ + global exitFlag,threads + exitFlag = True mpi_state = communication.MPIState.get_instance() + for thread in threads: + thread.join() # Send shutdown commands to all ranks for rank_id in range(communication.MPIRank.MONITOR, mpi_state.world_size): # We use a blocking send here because we have to wait for @@ -366,9 +427,11 @@ def submit(task, *args, num_returns=1, **kwargs): # Initiate reference count based cleanup # if all the tasks were completed garbage_collector.regular_cleanup() - - dest_rank = RoundRobin.get_instance().schedule_rank() - + global task_per_worker + # dest_rank = RoundRobin.get_instance().schedule_rank() + worker_with_min_tasks = min(task_per_worker, key=task_per_worker.get) + task_per_worker[worker_with_min_tasks] += 1 + dest_rank = worker_with_min_tasks output_ids = object_store.generate_output_data_id( dest_rank, garbage_collector, num_returns ) diff --git a/unidist/core/backends/mpi/core/monitor.py b/unidist/core/backends/mpi/core/monitor.py index 5a749fac..ee2a13d7 100755 --- a/unidist/core/backends/mpi/core/monitor.py +++ b/unidist/core/backends/mpi/core/monitor.py @@ -66,6 +66,12 @@ def monitor_loop(): # Proceed the request if operation_type == common.Operation.TASK_DONE: task_counter.increment() + communication.mpi_isend_object( + mpi_state.comm, + source_rank, + communication.MPIRank.ROOT, + 1 + ) elif operation_type == common.Operation.GET_TASK_COUNT: # We use a blocking send here because the receiver is waiting for the result. communication.mpi_send_object( From 4d137ce52e9fd2c36e78732c8779e3308d8e40f1 Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Mon, 3 Apr 2023 16:38:42 -0500 Subject: [PATCH 2/9] making scheduler class --- .../backends/mpi/core/controller/actor.py | 8 +-- .../core/backends/mpi/core/controller/api.py | 30 ++++---- .../backends/mpi/core/controller/common.py | 71 +++++++++++++------ 3 files changed, 69 insertions(+), 40 deletions(-) diff --git a/unidist/core/backends/mpi/core/controller/actor.py b/unidist/core/backends/mpi/core/controller/actor.py index 1fb8ef03..8e887d44 100644 --- a/unidist/core/backends/mpi/core/controller/actor.py +++ b/unidist/core/backends/mpi/core/controller/actor.py @@ -11,7 +11,7 @@ from unidist.core.backends.mpi.core.controller.garbage_collector import ( garbage_collector, ) -from unidist.core.backends.mpi.core.controller.common import push_data, RoundRobin +from unidist.core.backends.mpi.core.controller.common import push_data, Scheduler class ActorMethod: @@ -91,7 +91,7 @@ def __init__(self, cls, *args, owner_rank=None, handler_id=None, **kwargs): self._args = args self._kwargs = kwargs self._owner_rank = ( - RoundRobin.get_instance().schedule_rank() + Scheduler.get_instance().schedule_rank() if owner_rank is None else owner_rank ) @@ -103,7 +103,7 @@ def __init__(self, cls, *args, owner_rank=None, handler_id=None, **kwargs): object_store.put_data_owner(self._handler_id, self._owner_rank) # reserve a rank for actor execution only - RoundRobin.get_instance().reserve_rank(self._owner_rank) + Scheduler.get_instance().reserve_rank(self._owner_rank) # submit `ACTOR_CREATE` task to a worker only once if owner_rank is None and handler_id is None: @@ -186,4 +186,4 @@ def __del__(self): """ This is defined to release the rank reserved for the actor when it gets out of scope. """ - RoundRobin.get_instance().release_rank(self._owner_rank) + Scheduler.get_instance().release_rank(self._owner_rank) diff --git a/unidist/core/backends/mpi/core/controller/api.py b/unidist/core/backends/mpi/core/controller/api.py index 80c8b474..98a73953 100644 --- a/unidist/core/backends/mpi/core/controller/api.py +++ b/unidist/core/backends/mpi/core/controller/api.py @@ -26,7 +26,7 @@ from unidist.core.backends.mpi.core.controller.common import ( request_worker_data, push_data, - RoundRobin, + Scheduler, ) import unidist.core.backends.mpi.core.common as common import unidist.core.backends.mpi.core.communication as communication @@ -89,12 +89,13 @@ def run(self): def poll_tasks_completed(threadName, comm): - global task_per_worker,exitFlag + global exitFlag + scheduler = Scheduler.get_instance() backoff = Backoff() while not exitFlag: if comm.iprobe(source=communication.MPIRank.MONITOR, tag=1): - task_completed = comm.recv(source=communication.MPIRank.MONITOR, tag=1) - task_per_worker[task_completed] -= 1 + task_completed_rank = comm.recv(source=communication.MPIRank.MONITOR, tag=1) + scheduler.decrement_tasks_on_worker(task_completed_rank) backoff.reset() else: backoff.sleep() @@ -172,17 +173,17 @@ def init(): # path for spawned MPI processes to be merged with the parent communicator if parent_comm != MPI.COMM_NULL: comm = parent_comm.Merge(high=True) - global task_per_worker + + + + mpi_state = communication.MPIState.get_instance( + comm, comm.Get_rank(), comm.Get_size() + ) if rank == 0 and not threads and parent_comm == MPI.COMM_NULL: thread = myThread(1, "tName",comm) thread.start() threads.append(thread) world_size = comm.Get_size() - task_per_worker = {k: 0 for k in range(2,world_size)} - - mpi_state = communication.MPIState.get_instance( - comm, comm.Get_rank(), comm.Get_size() - ) global topology if not topology: @@ -427,11 +428,10 @@ def submit(task, *args, num_returns=1, **kwargs): # Initiate reference count based cleanup # if all the tasks were completed garbage_collector.regular_cleanup() - global task_per_worker - # dest_rank = RoundRobin.get_instance().schedule_rank() - worker_with_min_tasks = min(task_per_worker, key=task_per_worker.get) - task_per_worker[worker_with_min_tasks] += 1 - dest_rank = worker_with_min_tasks + + scheduler = Scheduler.get_instance() + dest_rank = scheduler.schedule_rank() + scheduler.increment_tasks_on_worker(dest_rank) output_ids = object_store.generate_output_data_id( dest_rank, garbage_collector, num_returns ) diff --git a/unidist/core/backends/mpi/core/controller/common.py b/unidist/core/backends/mpi/core/controller/common.py index 46659a3e..c8732e35 100644 --- a/unidist/core/backends/mpi/core/controller/common.py +++ b/unidist/core/backends/mpi/core/controller/common.py @@ -20,13 +20,18 @@ initial_worker_number = 2 -class RoundRobin: +class Scheduler: __instance = None def __init__(self): self.reserved_ranks = [] - self.rank_to_schedule = itertools.cycle( - ( + self.task_per_worker = {k: 0 for k in range(initial_worker_number,communication.MPIState.get_instance().world_size)} + l= range( + initial_worker_number, + communication.MPIState.get_instance().world_size, + ) + + self.rank_to_schedule = [ rank for rank in range( initial_worker_number, @@ -35,23 +40,23 @@ def __init__(self): # check if a rank to schedule is not equal to the rank # of the current process to not get into recursive scheduling if rank != communication.MPIState.get_instance().rank - ) - ) + ] + print(self.rank_to_schedule) logger.debug( - f"RoundRobin init for {communication.MPIState.get_instance().rank} rank" + f"Scheduler init for {communication.MPIState.get_instance().rank} rank" ) @classmethod def get_instance(cls): """ - Get instance of ``RoundRobin``. + Get instance of ``Scheduler``. Returns ------- - RoundRobin + Scheduler """ if cls.__instance is None: - cls.__instance = RoundRobin() + cls.__instance = Scheduler() return cls.__instance def schedule_rank(self): @@ -63,17 +68,9 @@ def schedule_rank(self): int A rank number. """ - next_rank = None - - # Go rank by rank to find the first one non-reserved - for _ in range( - initial_worker_number, communication.MPIState.get_instance().world_size - ): - rank = next(self.rank_to_schedule) - if rank not in self.reserved_ranks: - next_rank = rank - break + next_rank = min(self.rank_to_schedule, key=self.task_per_worker.get,default=None) + if next_rank is None: raise Exception("All ranks blocked") @@ -91,9 +88,13 @@ def reserve_rank(self, rank): rank : int A rank number. """ + + self.rank_to_schedule.remove(rank) self.reserved_ranks.append(rank) + print(self.reserved_ranks) + print("======") logger.debug( - f"RoundRobin reserve rank {rank} for actor " + f"Scheduler reserve rank {rank} for actor " + f"on worker with rank {communication.MPIState.get_instance().rank}" ) @@ -108,11 +109,39 @@ def release_rank(self, rank): rank : int A rank number. """ + print("======") self.reserved_ranks.remove(rank) + self.rank_to_schedule.append(rank) logger.debug( - f"RoundRobin release rank {rank} reserved for actor " + f"Scheduler release rank {rank} reserved for actor " + f"on worker with rank {communication.MPIState.get_instance().rank}" ) + + def increment_tasks_on_worker(self, rank): + """ + Increments the count of tasks submitted to a worker. + + This helps to track tasks submitted per workers + + Parameters + ---------- + rank : int + A rank number. + """ + self.task_per_worker[rank] += 1 + + def decrement_tasks_on_worker(self, rank): + """ + Decrement the count of tasks submitted to a worker. + + This helps to track tasks submitted per workers + + Parameters + ---------- + rank : int + A rank number. + """ + self.task_per_worker[rank] -= 1 def request_worker_data(data_id): From 2019d78ab40e9b561110b9e236a600c3b75a7d33 Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Tue, 4 Apr 2023 04:18:48 -0500 Subject: [PATCH 3/9] removing rank from rank_to_schedule only if rank already present --- unidist/core/backends/mpi/core/controller/common.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/unidist/core/backends/mpi/core/controller/common.py b/unidist/core/backends/mpi/core/controller/common.py index c8732e35..819b89ff 100644 --- a/unidist/core/backends/mpi/core/controller/common.py +++ b/unidist/core/backends/mpi/core/controller/common.py @@ -41,7 +41,6 @@ def __init__(self): # of the current process to not get into recursive scheduling if rank != communication.MPIState.get_instance().rank ] - print(self.rank_to_schedule) logger.debug( f"Scheduler init for {communication.MPIState.get_instance().rank} rank" ) @@ -68,7 +67,6 @@ def schedule_rank(self): int A rank number. """ - next_rank = min(self.rank_to_schedule, key=self.task_per_worker.get,default=None) if next_rank is None: @@ -88,11 +86,9 @@ def reserve_rank(self, rank): rank : int A rank number. """ - - self.rank_to_schedule.remove(rank) + if rank in self.rank_to_schedule: + self.rank_to_schedule.remove(rank) self.reserved_ranks.append(rank) - print(self.reserved_ranks) - print("======") logger.debug( f"Scheduler reserve rank {rank} for actor " + f"on worker with rank {communication.MPIState.get_instance().rank}" @@ -109,7 +105,7 @@ def release_rank(self, rank): rank : int A rank number. """ - print("======") + self.reserved_ranks.remove(rank) self.rank_to_schedule.append(rank) logger.debug( From aeb22bdfa9bed720a616d603a1ad2c1d2af9f6a2 Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Tue, 4 Apr 2023 04:27:10 -0500 Subject: [PATCH 4/9] static check changes Signed-off-by: arunjose696 --- .../core/backends/mpi/core/communication.py | 3 ++ .../core/backends/mpi/core/controller/api.py | 20 ++++---- .../backends/mpi/core/controller/common.py | 47 ++++++++++--------- unidist/core/backends/mpi/core/monitor.py | 5 +- 4 files changed, 37 insertions(+), 38 deletions(-) diff --git a/unidist/core/backends/mpi/core/communication.py b/unidist/core/backends/mpi/core/communication.py index 3e3c3d4e..bf2dddc0 100755 --- a/unidist/core/backends/mpi/core/communication.py +++ b/unidist/core/backends/mpi/core/communication.py @@ -193,6 +193,9 @@ def mpi_isend_object(comm, data, dest_rank, tag=0): Data to send. dest_rank : int Target MPI process to transfer data. + tag : int + To recive only data with a label + Used when background thread polls for data with a specific label Returns ------- diff --git a/unidist/core/backends/mpi/core/controller/api.py b/unidist/core/backends/mpi/core/controller/api.py index 98a73953..b8e22198 100644 --- a/unidist/core/backends/mpi/core/controller/api.py +++ b/unidist/core/backends/mpi/core/controller/api.py @@ -51,9 +51,11 @@ topology = dict() # The global variable is responsible for if MPI backend has already been initialized is_mpi_initialized = False -threads=[] +threads = [] BACKOFF = 0.001 exitFlag = False + + def _getopt_backoff(options): backoff = options.get("backoff") if backoff is None: @@ -80,7 +82,7 @@ def __init__(self, threadID, name, comm): threading.Thread.__init__(self, daemon=True) self.threadID = threadID self.name = name - self.comm = comm + self.comm = comm def run(self): print("Starting " + self.name) @@ -97,11 +99,10 @@ def poll_tasks_completed(threadName, comm): task_completed_rank = comm.recv(source=communication.MPIRank.MONITOR, tag=1) scheduler.decrement_tasks_on_worker(task_completed_rank) backoff.reset() - else: + else: backoff.sleep() - def init(): """ Initialize MPI processes. @@ -173,17 +174,14 @@ def init(): # path for spawned MPI processes to be merged with the parent communicator if parent_comm != MPI.COMM_NULL: comm = parent_comm.Merge(high=True) - - mpi_state = communication.MPIState.get_instance( comm, comm.Get_rank(), comm.Get_size() ) - if rank == 0 and not threads and parent_comm == MPI.COMM_NULL: - thread = myThread(1, "tName",comm) + if rank == 0 and not threads and parent_comm == MPI.COMM_NULL: + thread = myThread(1, "tName", comm) thread.start() threads.append(thread) - world_size = comm.Get_size() global topology if not topology: @@ -237,7 +235,7 @@ def shutdown(): ----- Sends cancelation operation to all workers and monitor processes. """ - global exitFlag,threads + global exitFlag, threads exitFlag = True mpi_state = communication.MPIState.get_instance() for thread in threads: @@ -428,7 +426,7 @@ def submit(task, *args, num_returns=1, **kwargs): # Initiate reference count based cleanup # if all the tasks were completed garbage_collector.regular_cleanup() - + scheduler = Scheduler.get_instance() dest_rank = scheduler.schedule_rank() scheduler.increment_tasks_on_worker(dest_rank) diff --git a/unidist/core/backends/mpi/core/controller/common.py b/unidist/core/backends/mpi/core/controller/common.py index 819b89ff..e0fdb22e 100644 --- a/unidist/core/backends/mpi/core/controller/common.py +++ b/unidist/core/backends/mpi/core/controller/common.py @@ -4,8 +4,6 @@ """Common functionality related to `controller`.""" -import itertools - from unidist.core.backends.common.data_id import is_data_id import unidist.core.backends.mpi.core.common as common import unidist.core.backends.mpi.core.communication as communication @@ -25,21 +23,22 @@ class Scheduler: def __init__(self): self.reserved_ranks = [] - self.task_per_worker = {k: 0 for k in range(initial_worker_number,communication.MPIState.get_instance().world_size)} - l= range( - initial_worker_number, - communication.MPIState.get_instance().world_size, - ) - + self.task_per_worker = { + k: 0 + for k in range( + initial_worker_number, communication.MPIState.get_instance().world_size + ) + } + self.rank_to_schedule = [ - rank - for rank in range( - initial_worker_number, - communication.MPIState.get_instance().world_size, - ) - # check if a rank to schedule is not equal to the rank - # of the current process to not get into recursive scheduling - if rank != communication.MPIState.get_instance().rank + rank + for rank in range( + initial_worker_number, + communication.MPIState.get_instance().world_size, + ) + # check if a rank to schedule is not equal to the rank + # of the current process to not get into recursive scheduling + if rank != communication.MPIState.get_instance().rank ] logger.debug( f"Scheduler init for {communication.MPIState.get_instance().rank} rank" @@ -67,8 +66,10 @@ def schedule_rank(self): int A rank number. """ - next_rank = min(self.rank_to_schedule, key=self.task_per_worker.get,default=None) - + next_rank = min( + self.rank_to_schedule, key=self.task_per_worker.get, default=None + ) + if next_rank is None: raise Exception("All ranks blocked") @@ -87,7 +88,7 @@ def reserve_rank(self, rank): A rank number. """ if rank in self.rank_to_schedule: - self.rank_to_schedule.remove(rank) + self.rank_to_schedule.remove(rank) self.reserved_ranks.append(rank) logger.debug( f"Scheduler reserve rank {rank} for actor " @@ -105,14 +106,14 @@ def release_rank(self, rank): rank : int A rank number. """ - + self.reserved_ranks.remove(rank) self.rank_to_schedule.append(rank) logger.debug( f"Scheduler release rank {rank} reserved for actor " + f"on worker with rank {communication.MPIState.get_instance().rank}" ) - + def increment_tasks_on_worker(self, rank): """ Increments the count of tasks submitted to a worker. @@ -125,7 +126,7 @@ def increment_tasks_on_worker(self, rank): A rank number. """ self.task_per_worker[rank] += 1 - + def decrement_tasks_on_worker(self, rank): """ Decrement the count of tasks submitted to a worker. @@ -137,7 +138,7 @@ def decrement_tasks_on_worker(self, rank): rank : int A rank number. """ - self.task_per_worker[rank] -= 1 + self.task_per_worker[rank] -= 1 def request_worker_data(data_id): diff --git a/unidist/core/backends/mpi/core/monitor.py b/unidist/core/backends/mpi/core/monitor.py index ee2a13d7..eaf7fa1f 100755 --- a/unidist/core/backends/mpi/core/monitor.py +++ b/unidist/core/backends/mpi/core/monitor.py @@ -67,10 +67,7 @@ def monitor_loop(): if operation_type == common.Operation.TASK_DONE: task_counter.increment() communication.mpi_isend_object( - mpi_state.comm, - source_rank, - communication.MPIRank.ROOT, - 1 + mpi_state.comm, source_rank, communication.MPIRank.ROOT, 1 ) elif operation_type == common.Operation.GET_TASK_COUNT: # We use a blocking send here because the receiver is waiting for the result. From 301dd08771647089228141731c1a8eae980aabd1 Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Wed, 5 Apr 2023 05:14:08 -0500 Subject: [PATCH 5/9] pr comments --- unidist/config/__init__.py | 3 +- unidist/config/backends/mpi/__init__.py | 4 +-- unidist/config/backends/mpi/envvars.py | 7 ++++ .../core/backends/mpi/core/controller/api.py | 32 ++++++++----------- 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/unidist/config/__init__.py b/unidist/config/__init__.py index 78bd3ae7..7c893386 100644 --- a/unidist/config/__init__.py +++ b/unidist/config/__init__.py @@ -13,7 +13,7 @@ RayObjectStoreMemory, ) from .backends.dask import DaskMemoryLimit, IsDaskCluster, DaskSchedulerAddress -from .backends.mpi import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold +from .backends.mpi import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold, BackOff from .parameter import ValueSource __all__ = [ @@ -31,4 +31,5 @@ "MpiHosts", "ValueSource", "MpiPickleThreshold", + "BackOff", ] diff --git a/unidist/config/backends/mpi/__init__.py b/unidist/config/backends/mpi/__init__.py index 3e523bf3..d3f5f9d2 100644 --- a/unidist/config/backends/mpi/__init__.py +++ b/unidist/config/backends/mpi/__init__.py @@ -4,6 +4,6 @@ """Config entities specific for MPI backend which can be used for unidist behavior tuning.""" -from .envvars import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold +from .envvars import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold, BackOff -__all__ = ["IsMpiSpawnWorkers", "MpiHosts", "MpiPickleThreshold"] +__all__ = ["IsMpiSpawnWorkers", "MpiHosts", "MpiPickleThreshold", "BackOff"] diff --git a/unidist/config/backends/mpi/envvars.py b/unidist/config/backends/mpi/envvars.py index 4a8c130a..43c2d279 100644 --- a/unidist/config/backends/mpi/envvars.py +++ b/unidist/config/backends/mpi/envvars.py @@ -25,3 +25,10 @@ class MpiPickleThreshold(EnvironmentVariable, type=int): default = 1024**2 // 4 # 0.25 MiB varname = "UNIDIST_MPI_PICKLE_THRESHOLD" + + +class BackOff(EnvironmentVariable, type=int): + """Minimum buffer size for serialization with pickle 5 protocol""" + + default = 0.001 + varname = "BackOff" diff --git a/unidist/core/backends/mpi/core/controller/api.py b/unidist/core/backends/mpi/core/controller/api.py index b8e22198..ab67a30a 100644 --- a/unidist/core/backends/mpi/core/controller/api.py +++ b/unidist/core/backends/mpi/core/controller/api.py @@ -37,6 +37,7 @@ MpiHosts, ValueSource, MpiPickleThreshold, + BackOff, ) @@ -51,16 +52,11 @@ topology = dict() # The global variable is responsible for if MPI backend has already been initialized is_mpi_initialized = False +# List is used to keep keep track of th threads started so they could be later joined threads = [] -BACKOFF = 0.001 -exitFlag = False - - -def _getopt_backoff(options): - backoff = options.get("backoff") - if backoff is None: - backoff = BACKOFF - return float(backoff) +BACKOFF = BackOff.get_value_source() +# The global variable acts as a flag which when set true the function executing in background thread stops +exit_flag = False class Backoff: @@ -77,24 +73,22 @@ def sleep(self): self.tval = min(self.tmax, max(self.tmin, self.tval * 2)) -class myThread(threading.Thread): - def __init__(self, threadID, name, comm): +class Poller(threading.Thread): + def __init__(self, thread_id, name, comm): threading.Thread.__init__(self, daemon=True) - self.threadID = threadID + self.thread_id = thread_id self.name = name self.comm = comm def run(self): - print("Starting " + self.name) poll_tasks_completed(self.name, self.comm) - print("Exiting " + self.name) def poll_tasks_completed(threadName, comm): - global exitFlag + global exit_flag scheduler = Scheduler.get_instance() backoff = Backoff() - while not exitFlag: + while not exit_flag: if comm.iprobe(source=communication.MPIRank.MONITOR, tag=1): task_completed_rank = comm.recv(source=communication.MPIRank.MONITOR, tag=1) scheduler.decrement_tasks_on_worker(task_completed_rank) @@ -179,7 +173,7 @@ def init(): comm, comm.Get_rank(), comm.Get_size() ) if rank == 0 and not threads and parent_comm == MPI.COMM_NULL: - thread = myThread(1, "tName", comm) + thread = Poller(1, "Thread_Poll_Tasks", comm) thread.start() threads.append(thread) @@ -235,8 +229,8 @@ def shutdown(): ----- Sends cancelation operation to all workers and monitor processes. """ - global exitFlag, threads - exitFlag = True + global exit_flag, threads + exit_flag = True mpi_state = communication.MPIState.get_instance() for thread in threads: thread.join() From 939f2c74674c898cca5c6af771c05183c6f1f5c1 Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Wed, 5 Apr 2023 05:19:09 -0500 Subject: [PATCH 6/9] mend --- unidist/config/backends/mpi/envvars.py | 2 +- unidist/core/backends/mpi/core/communication.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/unidist/config/backends/mpi/envvars.py b/unidist/config/backends/mpi/envvars.py index 43c2d279..12e2e133 100644 --- a/unidist/config/backends/mpi/envvars.py +++ b/unidist/config/backends/mpi/envvars.py @@ -28,7 +28,7 @@ class MpiPickleThreshold(EnvironmentVariable, type=int): class BackOff(EnvironmentVariable, type=int): - """Minimum buffer size for serialization with pickle 5 protocol""" + """Backoff value for sleeping background threads when thread idle""" default = 0.001 varname = "BackOff" diff --git a/unidist/core/backends/mpi/core/communication.py b/unidist/core/backends/mpi/core/communication.py index bf2dddc0..d62838ac 100755 --- a/unidist/core/backends/mpi/core/communication.py +++ b/unidist/core/backends/mpi/core/communication.py @@ -194,8 +194,8 @@ def mpi_isend_object(comm, data, dest_rank, tag=0): dest_rank : int Target MPI process to transfer data. tag : int - To recive only data with a label - Used when background thread polls for data with a specific label + To recieve only data with a label. + Used when background thread polls for data with a specific label. Returns ------- From e360efc7a8beec1ac22a4763720065e510584d7f Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Thu, 13 Apr 2023 01:37:56 -0500 Subject: [PATCH 7/9] changing task tracking to GC not using background thread --- .../core/backends/mpi/core/controller/api.py | 8 +++--- .../backends/mpi/core/controller/common.py | 6 +++++ .../mpi/core/controller/garbage_collector.py | 10 +++++++- unidist/core/backends/mpi/core/monitor.py | 25 +++++++++++++++---- 4 files changed, 39 insertions(+), 10 deletions(-) diff --git a/unidist/core/backends/mpi/core/controller/api.py b/unidist/core/backends/mpi/core/controller/api.py index ab67a30a..54a68f7b 100644 --- a/unidist/core/backends/mpi/core/controller/api.py +++ b/unidist/core/backends/mpi/core/controller/api.py @@ -172,10 +172,10 @@ def init(): mpi_state = communication.MPIState.get_instance( comm, comm.Get_rank(), comm.Get_size() ) - if rank == 0 and not threads and parent_comm == MPI.COMM_NULL: - thread = Poller(1, "Thread_Poll_Tasks", comm) - thread.start() - threads.append(thread) + # if rank == 0 and not threads and parent_comm == MPI.COMM_NULL: + # thread = Poller(1, "Thread_Poll_Tasks", comm) + # thread.start() + # threads.append(thread) global topology if not topology: diff --git a/unidist/core/backends/mpi/core/controller/common.py b/unidist/core/backends/mpi/core/controller/common.py index e0fdb22e..932422ba 100644 --- a/unidist/core/backends/mpi/core/controller/common.py +++ b/unidist/core/backends/mpi/core/controller/common.py @@ -140,6 +140,12 @@ def decrement_tasks_on_worker(self, rank): """ self.task_per_worker[rank] -= 1 + def decrement_done_tasks(self, tasks_done): + self.task_per_worker = { + key: self.task_per_worker[key] - tasks_done.get(key, 0) + for key in self.task_per_worker + } + def request_worker_data(data_id): """ diff --git a/unidist/core/backends/mpi/core/controller/garbage_collector.py b/unidist/core/backends/mpi/core/controller/garbage_collector.py index 8bc74cad..86b210e3 100644 --- a/unidist/core/backends/mpi/core/controller/garbage_collector.py +++ b/unidist/core/backends/mpi/core/controller/garbage_collector.py @@ -11,7 +11,10 @@ from unidist.core.backends.mpi.core.async_operations import AsyncOperations from unidist.core.backends.mpi.core.serialization import SimpleDataSerializer from unidist.core.backends.mpi.core.controller.object_store import object_store -from unidist.core.backends.mpi.core.controller.common import initial_worker_number +from unidist.core.backends.mpi.core.controller.common import ( + initial_worker_number, + Scheduler, +) logger = common.get_logger("utils", "utils.log") @@ -131,6 +134,11 @@ def regular_cleanup(self): mpi_state.comm, communication.MPIRank.MONITOR, ) + tasks_completed = communication.recv_simple_operation( + mpi_state.comm, + communication.MPIRank.MONITOR, + ) + Scheduler.get_instance().decrement_done_tasks(tasks_completed) logger.debug( "Submitted task count {} vs executed task count {}".format( diff --git a/unidist/core/backends/mpi/core/monitor.py b/unidist/core/backends/mpi/core/monitor.py index eaf7fa1f..accf0488 100755 --- a/unidist/core/backends/mpi/core/monitor.py +++ b/unidist/core/backends/mpi/core/monitor.py @@ -19,12 +19,20 @@ mpi4py.rc(recv_mprobe=False, initialize=False) from mpi4py import MPI # noqa: E402 +initial_worker_number = 2 + class TaskCounter: __instance = None def __init__(self): self.task_counter = 0 + self.task_done_per_worker_unsend = { + k: 0 + for k in range( + initial_worker_number, communication.MPIState.get_instance().world_size + ) + } @classmethod def get_instance(cls): @@ -39,9 +47,10 @@ def get_instance(cls): cls.__instance = TaskCounter() return cls.__instance - def increment(self): + def increment(self, rank): """Increment task counter by one.""" self.task_counter += 1 + self.task_done_per_worker_unsend[rank] += 1 def monitor_loop(): @@ -65,10 +74,8 @@ def monitor_loop(): # Proceed the request if operation_type == common.Operation.TASK_DONE: - task_counter.increment() - communication.mpi_isend_object( - mpi_state.comm, source_rank, communication.MPIRank.ROOT, 1 - ) + task_counter.increment(source_rank) + elif operation_type == common.Operation.GET_TASK_COUNT: # We use a blocking send here because the receiver is waiting for the result. communication.mpi_send_object( @@ -76,6 +83,14 @@ def monitor_loop(): task_counter.task_counter, source_rank, ) + communication.mpi_send_object( + mpi_state.comm, + task_counter.task_done_per_worker_unsend, + source_rank, + ) + task_counter.task_done_per_worker_unsend = dict.fromkeys( + task_counter.task_done_per_worker_unsend, 0 + ) elif operation_type == common.Operation.CANCEL: async_operations.finish() if not MPI.Is_finalized(): From 9a3ec5ab7e9d0c918303d34063e00f58eb78de48 Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Thu, 13 Apr 2023 11:32:18 -0500 Subject: [PATCH 8/9] having only one mpisend from monitor --- .../backends/mpi/core/controller/garbage_collector.py | 9 ++++----- unidist/core/backends/mpi/core/monitor.py | 11 +++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/unidist/core/backends/mpi/core/controller/garbage_collector.py b/unidist/core/backends/mpi/core/controller/garbage_collector.py index 86b210e3..1cbe7438 100644 --- a/unidist/core/backends/mpi/core/controller/garbage_collector.py +++ b/unidist/core/backends/mpi/core/controller/garbage_collector.py @@ -130,14 +130,13 @@ def regular_cleanup(self): common.Operation.GET_TASK_COUNT, communication.MPIRank.MONITOR, ) - executed_task_counter = communication.recv_simple_operation( - mpi_state.comm, - communication.MPIRank.MONITOR, - ) - tasks_completed = communication.recv_simple_operation( + + info_tasks = communication.recv_simple_operation( mpi_state.comm, communication.MPIRank.MONITOR, ) + executed_task_counter = info_tasks["executed_task_counter"] + tasks_completed = info_tasks["tasks_completed"] Scheduler.get_instance().decrement_done_tasks(tasks_completed) logger.debug( diff --git a/unidist/core/backends/mpi/core/monitor.py b/unidist/core/backends/mpi/core/monitor.py index accf0488..98ed2a23 100755 --- a/unidist/core/backends/mpi/core/monitor.py +++ b/unidist/core/backends/mpi/core/monitor.py @@ -78,14 +78,13 @@ def monitor_loop(): elif operation_type == common.Operation.GET_TASK_COUNT: # We use a blocking send here because the receiver is waiting for the result. + info_tasks = { + "executed_task_counter": task_counter.task_counter, + "tasks_completed": task_counter.task_done_per_worker_unsend, + } communication.mpi_send_object( mpi_state.comm, - task_counter.task_counter, - source_rank, - ) - communication.mpi_send_object( - mpi_state.comm, - task_counter.task_done_per_worker_unsend, + info_tasks, source_rank, ) task_counter.task_done_per_worker_unsend = dict.fromkeys( From 79d5bea05156ec642e7be2429bb18313a7ed4201 Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Wed, 19 Apr 2023 05:18:14 -0500 Subject: [PATCH 9/9] tracking tasks outside the condition --- unidist/config/__init__.py | 3 +- unidist/config/backends/mpi/__init__.py | 4 +- unidist/config/backends/mpi/envvars.py | 7 --- .../core/backends/mpi/core/communication.py | 7 +-- .../core/backends/mpi/core/controller/api.py | 54 ------------------- .../mpi/core/controller/garbage_collector.py | 35 ++++++------ 6 files changed, 22 insertions(+), 88 deletions(-) diff --git a/unidist/config/__init__.py b/unidist/config/__init__.py index 7c893386..78bd3ae7 100644 --- a/unidist/config/__init__.py +++ b/unidist/config/__init__.py @@ -13,7 +13,7 @@ RayObjectStoreMemory, ) from .backends.dask import DaskMemoryLimit, IsDaskCluster, DaskSchedulerAddress -from .backends.mpi import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold, BackOff +from .backends.mpi import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold from .parameter import ValueSource __all__ = [ @@ -31,5 +31,4 @@ "MpiHosts", "ValueSource", "MpiPickleThreshold", - "BackOff", ] diff --git a/unidist/config/backends/mpi/__init__.py b/unidist/config/backends/mpi/__init__.py index d3f5f9d2..3e523bf3 100644 --- a/unidist/config/backends/mpi/__init__.py +++ b/unidist/config/backends/mpi/__init__.py @@ -4,6 +4,6 @@ """Config entities specific for MPI backend which can be used for unidist behavior tuning.""" -from .envvars import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold, BackOff +from .envvars import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold -__all__ = ["IsMpiSpawnWorkers", "MpiHosts", "MpiPickleThreshold", "BackOff"] +__all__ = ["IsMpiSpawnWorkers", "MpiHosts", "MpiPickleThreshold"] diff --git a/unidist/config/backends/mpi/envvars.py b/unidist/config/backends/mpi/envvars.py index 12e2e133..4a8c130a 100644 --- a/unidist/config/backends/mpi/envvars.py +++ b/unidist/config/backends/mpi/envvars.py @@ -25,10 +25,3 @@ class MpiPickleThreshold(EnvironmentVariable, type=int): default = 1024**2 // 4 # 0.25 MiB varname = "UNIDIST_MPI_PICKLE_THRESHOLD" - - -class BackOff(EnvironmentVariable, type=int): - """Backoff value for sleeping background threads when thread idle""" - - default = 0.001 - varname = "BackOff" diff --git a/unidist/core/backends/mpi/core/communication.py b/unidist/core/backends/mpi/core/communication.py index d62838ac..ce65912f 100755 --- a/unidist/core/backends/mpi/core/communication.py +++ b/unidist/core/backends/mpi/core/communication.py @@ -181,7 +181,7 @@ def mpi_send_object(comm, data, dest_rank): comm.send(data, dest=dest_rank) -def mpi_isend_object(comm, data, dest_rank, tag=0): +def mpi_isend_object(comm, data, dest_rank): """ Send Python object to another MPI rank in a non-blocking way. @@ -193,16 +193,13 @@ def mpi_isend_object(comm, data, dest_rank, tag=0): Data to send. dest_rank : int Target MPI process to transfer data. - tag : int - To recieve only data with a label. - Used when background thread polls for data with a specific label. Returns ------- object A handler to MPI_Isend communication result. """ - return comm.isend(data, dest=dest_rank, tag=tag) + return comm.isend(data, dest=dest_rank) def mpi_send_buffer(comm, buffer_size, buffer, dest_rank): diff --git a/unidist/core/backends/mpi/core/controller/api.py b/unidist/core/backends/mpi/core/controller/api.py index 54a68f7b..66af98b4 100644 --- a/unidist/core/backends/mpi/core/controller/api.py +++ b/unidist/core/backends/mpi/core/controller/api.py @@ -8,9 +8,7 @@ import atexit import signal import asyncio -import time from collections import defaultdict -import threading try: import mpi4py @@ -37,7 +35,6 @@ MpiHosts, ValueSource, MpiPickleThreshold, - BackOff, ) @@ -52,49 +49,6 @@ topology = dict() # The global variable is responsible for if MPI backend has already been initialized is_mpi_initialized = False -# List is used to keep keep track of th threads started so they could be later joined -threads = [] -BACKOFF = BackOff.get_value_source() -# The global variable acts as a flag which when set true the function executing in background thread stops -exit_flag = False - - -class Backoff: - def __init__(self, seconds=BACKOFF): - self.tval = 0.0 - self.tmax = max(float(seconds), 0.0) - self.tmin = self.tmax / (1 << 10) - - def reset(self): - self.tval = 0.0 - - def sleep(self): - time.sleep(self.tval) - self.tval = min(self.tmax, max(self.tmin, self.tval * 2)) - - -class Poller(threading.Thread): - def __init__(self, thread_id, name, comm): - threading.Thread.__init__(self, daemon=True) - self.thread_id = thread_id - self.name = name - self.comm = comm - - def run(self): - poll_tasks_completed(self.name, self.comm) - - -def poll_tasks_completed(threadName, comm): - global exit_flag - scheduler = Scheduler.get_instance() - backoff = Backoff() - while not exit_flag: - if comm.iprobe(source=communication.MPIRank.MONITOR, tag=1): - task_completed_rank = comm.recv(source=communication.MPIRank.MONITOR, tag=1) - scheduler.decrement_tasks_on_worker(task_completed_rank) - backoff.reset() - else: - backoff.sleep() def init(): @@ -172,10 +126,6 @@ def init(): mpi_state = communication.MPIState.get_instance( comm, comm.Get_rank(), comm.Get_size() ) - # if rank == 0 and not threads and parent_comm == MPI.COMM_NULL: - # thread = Poller(1, "Thread_Poll_Tasks", comm) - # thread.start() - # threads.append(thread) global topology if not topology: @@ -229,11 +179,7 @@ def shutdown(): ----- Sends cancelation operation to all workers and monitor processes. """ - global exit_flag, threads - exit_flag = True mpi_state = communication.MPIState.get_instance() - for thread in threads: - thread.join() # Send shutdown commands to all ranks for rank_id in range(communication.MPIRank.MONITOR, mpi_state.world_size): # We use a blocking send here because we have to wait for diff --git a/unidist/core/backends/mpi/core/controller/garbage_collector.py b/unidist/core/backends/mpi/core/controller/garbage_collector.py index 1cbe7438..b49a6f20 100644 --- a/unidist/core/backends/mpi/core/controller/garbage_collector.py +++ b/unidist/core/backends/mpi/core/controller/garbage_collector.py @@ -115,30 +115,29 @@ def regular_cleanup(self): async_operations = AsyncOperations.get_instance() # Check completion status of previous async MPI routines async_operations.check() + mpi_state = communication.MPIState.get_instance() + # Compare submitted and executed tasks + # We use a blocking send here because we have to wait for + # completion of the communication, which is necessary for the pipeline to continue. + communication.mpi_send_object( + mpi_state.comm, + common.Operation.GET_TASK_COUNT, + communication.MPIRank.MONITOR, + ) + info_tasks = communication.recv_simple_operation( + mpi_state.comm, + communication.MPIRank.MONITOR, + ) + executed_task_counter = info_tasks["executed_task_counter"] + tasks_completed = info_tasks["tasks_completed"] + Scheduler.get_instance().decrement_done_tasks(tasks_completed) if len(self._cleanup_list) > self._cleanup_list_threshold: if self._cleanup_counter % self._cleanup_threshold == 0: timestamp_snapshot = time.perf_counter() + if (timestamp_snapshot - self._timestamp) > self._time_threshold: logger.debug("Cleanup counter {}".format(self._cleanup_counter)) - mpi_state = communication.MPIState.get_instance() - # Compare submitted and executed tasks - # We use a blocking send here because we have to wait for - # completion of the communication, which is necessary for the pipeline to continue. - communication.mpi_send_object( - mpi_state.comm, - common.Operation.GET_TASK_COUNT, - communication.MPIRank.MONITOR, - ) - - info_tasks = communication.recv_simple_operation( - mpi_state.comm, - communication.MPIRank.MONITOR, - ) - executed_task_counter = info_tasks["executed_task_counter"] - tasks_completed = info_tasks["tasks_completed"] - Scheduler.get_instance().decrement_done_tasks(tasks_completed) - logger.debug( "Submitted task count {} vs executed task count {}".format( self._task_counter, executed_task_counter