diff --git a/unidist/core/backends/mpi/core/communication.py b/unidist/core/backends/mpi/core/communication.py index ce65912f..218075ba 100755 --- a/unidist/core/backends/mpi/core/communication.py +++ b/unidist/core/backends/mpi/core/communication.py @@ -94,11 +94,24 @@ class MPIState: __instance = None - def __init__(self, comm, rank, world_sise): + def __init__(self, comm, host_comm): # attributes get actual values when MPI is initialized in `init` function self.comm = comm - self.rank = rank - self.world_size = world_sise + self.host_comm = host_comm + self.rank = comm.Get_rank() + self.host_rank = host_comm.Get_rank() + self.host = socket.gethostbyname(socket.gethostname()) + self.world_size = comm.Get_size() + self.topology = self.__get_topology() + self.workers = [] + for hosted_ranks in self.topology.values(): + self.workers.extend( + [ + r + for i, r in enumerate(hosted_ranks) + if r != MPIRank.ROOT and i != MPIRank.MONITOR + ] + ) @classmethod def get_instance(cls, *args): @@ -119,39 +132,40 @@ def get_instance(cls, *args): cls.__instance = MPIState(*args) return cls.__instance + def __get_topology(self): + """ + Get topology of MPI cluster. -class MPIRank: - """Class that describes ranks assignment.""" + Returns + ------- + dict + Dictionary, containing workers ranks assignments by IP-addresses in + the form: `{"node_ip0": [rank_2, rank_3, ...], "node_ip1": [rank_i, ...], ...}`. + """ + cluster_info = self.comm.allgather((self.host, self.rank, self.host_rank)) + topology = defaultdict(lambda: [""] * len(cluster_info)) - ROOT = 0 - MONITOR = 1 - FIRST_WORKER = 2 + for host, rank, host_rank in cluster_info: + topology[host][host_rank] = rank + for host in topology: + topology[host] = [r for r in topology[host] if r != ""] -def get_topology(): - """ - Get topology of MPI cluster. + return dict(topology) - Returns - ------- - dict - Dictionary, containing workers ranks assignments by IP-addresses in - the form: `{"node_ip0": [rank_2, rank_3, ...], "node_ip1": [rank_i, ...], ...}`. - """ - mpi_state = MPIState.get_instance() - comm = mpi_state.comm - rank = mpi_state.rank + def get_monitor_by_worker_rank(self, rank): + for hosted_ranks in self.topology.values(): + if rank in hosted_ranks: + return hosted_ranks[MPIRank.MONITOR] + raise ValueError("Unknown rank of workers") - hostname = socket.gethostname() - host = socket.gethostbyname(hostname) - cluster_info = comm.allgather((host, rank)) - topology = defaultdict(list) - for host, rank in cluster_info: - if rank not in [MPIRank.ROOT, MPIRank.MONITOR]: - topology[host].append(rank) +class MPIRank: + """Class that describes ranks assignment.""" - return dict(topology) + ROOT = 0 + MONITOR = 1 + FIRST_WORKER = 2 # ---------------------------- # diff --git a/unidist/core/backends/mpi/core/controller/api.py b/unidist/core/backends/mpi/core/controller/api.py index 34a817f6..40b03354 100644 --- a/unidist/core/backends/mpi/core/controller/api.py +++ b/unidist/core/backends/mpi/core/controller/api.py @@ -137,7 +137,6 @@ def init(): # path to dynamically spawn MPI processes if rank == 0 and parent_comm == MPI.COMM_NULL: if IsMpiSpawnWorkers.get(): - nprocs_to_spawn = CpuCount.get() + 1 # +1 for monitor process args = _get_py_flags() args += ["-c"] py_str = [ @@ -157,15 +156,20 @@ def init(): py_str = "; ".join(py_str) args += [py_str] + cpu_count = CpuCount.get() hosts = MpiHosts.get() info = MPI.Info.Create() + host_list = hosts.split(",") if hosts is not None else ["localhost"] + hosts_count = len(host_list) + # +1 for monitor process on each host + nprocs_to_spawn = cpu_count + len(host_list) if hosts: if "Open MPI" in MPI.Get_library_version(): host_list = str(hosts).split(",") workers_per_host = [ - int(nprocs_to_spawn / len(host_list)) - + (1 if i < nprocs_to_spawn % len(host_list) else 0) - for i in range(len(host_list)) + int(nprocs_to_spawn / hosts_count) + + (1 if i < nprocs_to_spawn % hosts_count else 0) + for i in range(hosts_count) ] hosts = ",".join( [ @@ -193,13 +197,9 @@ def init(): if parent_comm != MPI.COMM_NULL: comm = parent_comm.Merge(high=True) - mpi_state = communication.MPIState.get_instance( - comm, comm.Get_rank(), comm.Get_size() - ) + host_comm = comm.Split_type(MPI.COMM_TYPE_SHARED) - global topology - if not topology: - topology = communication.get_topology() + mpi_state = communication.MPIState.get_instance(comm, host_comm) global is_mpi_initialized if not is_mpi_initialized: @@ -210,8 +210,7 @@ def init(): signal.signal(signal.SIGTERM, _termination_handler) signal.signal(signal.SIGINT, _termination_handler) return - - if mpi_state.rank == communication.MPIRank.MONITOR: + elif mpi_state.host_rank == communication.MPIRank.MONITOR: from unidist.core.backends.mpi.core.monitor import monitor_loop monitor_loop() @@ -221,11 +220,7 @@ def init(): if not IsMpiSpawnWorkers.get(): sys.exit() return - - if mpi_state.rank not in ( - communication.MPIRank.ROOT, - communication.MPIRank.MONITOR, - ): + else: from unidist.core.backends.mpi.core.worker.loop import worker_loop asyncio.run(worker_loop()) @@ -287,13 +282,19 @@ def cluster_resources(): Dictionary with cluster nodes info in the form `{"node_ip0": {"CPU": x0}, "node_ip1": {"CPU": x1}, ...}`. """ - global topology - if not topology: + mpi_state = communication.MPIState.get_instance() + if mpi_state is None: raise RuntimeError("'unidist.init()' has not been called yet") cluster_resources = defaultdict(dict) - for host, ranks_list in topology.items(): - cluster_resources[host]["CPU"] = len(ranks_list) + for host, ranks_list in mpi_state.topology.items(): + cluster_resources[host]["CPU"] = len( + [ + r + for r in ranks_list + if r not in (communication.MPIRank.ROOT, communication.MPIRank.MONITOR) + ] + ) return dict(cluster_resources) @@ -407,17 +408,18 @@ def wait(data_ids, num_returns=1): "num_returns": pending_returns, } mpi_state = communication.MPIState.get_instance() + root_monitor = mpi_state.get_monitor_by_worker_rank(communication.MPIRank.ROOT) # We use a blocking send and recv here because we have to wait for # completion of the communication, which is necessary for the pipeline to continue. communication.send_simple_operation( mpi_state.comm, operation_type, operation_data, - communication.MPIRank.MONITOR, + root_monitor, ) data = communication.recv_simple_operation( mpi_state.comm, - communication.MPIRank.MONITOR, + root_monitor, ) ready.extend(data["ready"]) not_ready = data["not_ready"] diff --git a/unidist/core/backends/mpi/core/controller/common.py b/unidist/core/backends/mpi/core/controller/common.py index 46659a3e..9975e60f 100644 --- a/unidist/core/backends/mpi/core/controller/common.py +++ b/unidist/core/backends/mpi/core/controller/common.py @@ -14,32 +14,23 @@ logger = common.get_logger("common", "common.log") -# initial worker number is equal to rank 2 because -# rank 0 is for controller process -# rank 1 is for monitor process -initial_worker_number = 2 - class RoundRobin: __instance = None def __init__(self): self.reserved_ranks = [] + mpi_state = communication.MPIState.get_instance() self.rank_to_schedule = itertools.cycle( ( rank - for rank in range( - initial_worker_number, - communication.MPIState.get_instance().world_size, - ) + for rank in mpi_state.workers # check if a rank to schedule is not equal to the rank # of the current process to not get into recursive scheduling - if rank != communication.MPIState.get_instance().rank + if rank != mpi_state.rank ) ) - logger.debug( - f"RoundRobin init for {communication.MPIState.get_instance().rank} rank" - ) + logger.debug(f"RoundRobin init for {mpi_state.rank} rank") @classmethod def get_instance(cls): @@ -64,11 +55,10 @@ def schedule_rank(self): A rank number. """ next_rank = None + mpi_state = communication.MPIState.get_instance() # Go rank by rank to find the first one non-reserved - for _ in range( - initial_worker_number, communication.MPIState.get_instance().world_size - ): + for _ in mpi_state.workers: rank = next(self.rank_to_schedule) if rank not in self.reserved_ranks: next_rank = rank diff --git a/unidist/core/backends/mpi/core/controller/garbage_collector.py b/unidist/core/backends/mpi/core/controller/garbage_collector.py index 8bc74cad..5925fea5 100644 --- a/unidist/core/backends/mpi/core/controller/garbage_collector.py +++ b/unidist/core/backends/mpi/core/controller/garbage_collector.py @@ -11,7 +11,6 @@ from unidist.core.backends.mpi.core.async_operations import AsyncOperations from unidist.core.backends.mpi.core.serialization import SimpleDataSerializer from unidist.core.backends.mpi.core.controller.object_store import object_store -from unidist.core.backends.mpi.core.controller.common import initial_worker_number logger = common.get_logger("utils", "utils.log") @@ -63,7 +62,7 @@ def _send_cleanup_request(self, cleanup_list): # Cache serialized list of data IDs s_cleanup_list = SimpleDataSerializer().serialize_pickle(cleanup_list) async_operations = AsyncOperations.get_instance() - for rank_id in range(initial_worker_number, mpi_state.world_size): + for rank_id in mpi_state.workers: if rank_id != mpi_state.rank: h_list = communication.isend_serialized_operation( mpi_state.comm, @@ -119,17 +118,20 @@ def regular_cleanup(self): logger.debug("Cleanup counter {}".format(self._cleanup_counter)) mpi_state = communication.MPIState.get_instance() + root_monitor = mpi_state.get_monitor_by_worker_rank( + communication.MPIRank.ROOT + ) # Compare submitted and executed tasks # We use a blocking send here because we have to wait for # completion of the communication, which is necessary for the pipeline to continue. communication.mpi_send_object( mpi_state.comm, common.Operation.GET_TASK_COUNT, - communication.MPIRank.MONITOR, + root_monitor, ) executed_task_counter = communication.recv_simple_operation( mpi_state.comm, - communication.MPIRank.MONITOR, + root_monitor, ) logger.debug( diff --git a/unidist/core/backends/mpi/core/worker/task_store.py b/unidist/core/backends/mpi/core/worker/task_store.py index bf99c9e4..3315330f 100644 --- a/unidist/core/backends/mpi/core/worker/task_store.py +++ b/unidist/core/backends/mpi/core/worker/task_store.py @@ -268,11 +268,14 @@ async def execute(): # Monitor the task execution # We use a blocking send here because we have to wait for # completion of the communication, which is necessary for the pipeline to continue. + root_monitor = mpi_state.get_monitor_by_worker_rank( + communication.MPIRank.ROOT + ) communication.send_simple_operation( communication.MPIState.get_instance().comm, common.Operation.TASK_DONE, completed_data_ids, - communication.MPIRank.MONITOR, + root_monitor, ) async_task = asyncio.create_task(execute()) @@ -336,11 +339,14 @@ async def execute(): # Monitor the task execution. # We use a blocking send here because we have to wait for # completion of the communication, which is necessary for the pipeline to continue. + root_monitor = mpi_state.get_monitor_by_worker_rank( + communication.MPIRank.ROOT + ) communication.send_simple_operation( communication.MPIState.get_instance().comm, common.Operation.TASK_DONE, completed_data_ids, - communication.MPIRank.MONITOR, + root_monitor, ) def process_task_request(self, request):