Skip to content

Commit

Permalink
FEAT-#308: Add monitor to each host in the cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Kirill Suvorov <[email protected]>
  • Loading branch information
Retribution98 committed Jun 16, 2023
1 parent 4da67e4 commit 2b2d95b
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 73 deletions.
70 changes: 42 additions & 28 deletions unidist/core/backends/mpi/core/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,24 @@ class MPIState:

__instance = None

def __init__(self, comm, rank, world_sise):
def __init__(self, comm, host_comm):
# attributes get actual values when MPI is initialized in `init` function
self.comm = comm
self.rank = rank
self.world_size = world_sise
self.host_comm = host_comm
self.rank = comm.Get_rank()
self.host_rank = host_comm.Get_rank()
self.host = socket.gethostbyname(socket.gethostname())
self.world_size = comm.Get_size()
self.topology = self.__get_topology()
self.workers = []
for hosted_ranks in self.topology.values():
self.workers.extend(
[
r
for i, r in enumerate(hosted_ranks)
if r != MPIRank.ROOT and i != MPIRank.MONITOR
]
)

@classmethod
def get_instance(cls, *args):
Expand All @@ -119,39 +132,40 @@ def get_instance(cls, *args):
cls.__instance = MPIState(*args)
return cls.__instance

def __get_topology(self):
"""
Get topology of MPI cluster.
class MPIRank:
"""Class that describes ranks assignment."""
Returns
-------
dict
Dictionary, containing workers ranks assignments by IP-addresses in
the form: `{"node_ip0": [rank_2, rank_3, ...], "node_ip1": [rank_i, ...], ...}`.
"""
cluster_info = self.comm.allgather((self.host, self.rank, self.host_rank))
topology = defaultdict(lambda: [""] * len(cluster_info))

ROOT = 0
MONITOR = 1
FIRST_WORKER = 2
for host, rank, host_rank in cluster_info:
topology[host][host_rank] = rank

for host in topology:
topology[host] = [r for r in topology[host] if r != ""]

def get_topology():
"""
Get topology of MPI cluster.
return dict(topology)

Returns
-------
dict
Dictionary, containing workers ranks assignments by IP-addresses in
the form: `{"node_ip0": [rank_2, rank_3, ...], "node_ip1": [rank_i, ...], ...}`.
"""
mpi_state = MPIState.get_instance()
comm = mpi_state.comm
rank = mpi_state.rank
def get_monitor_by_worker_rank(self, rank):
for hosted_ranks in self.topology.values():
if rank in hosted_ranks:
return hosted_ranks[MPIRank.MONITOR]
raise ValueError("Unknown rank of workers")

hostname = socket.gethostname()
host = socket.gethostbyname(hostname)
cluster_info = comm.allgather((host, rank))
topology = defaultdict(list)

for host, rank in cluster_info:
if rank not in [MPIRank.ROOT, MPIRank.MONITOR]:
topology[host].append(rank)
class MPIRank:
"""Class that describes ranks assignment."""

return dict(topology)
ROOT = 0
MONITOR = 1
FIRST_WORKER = 2


# ---------------------------- #
Expand Down
48 changes: 25 additions & 23 deletions unidist/core/backends/mpi/core/controller/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ def init():
# path to dynamically spawn MPI processes
if rank == 0 and parent_comm == MPI.COMM_NULL:
if IsMpiSpawnWorkers.get():
nprocs_to_spawn = CpuCount.get() + 1 # +1 for monitor process
args = _get_py_flags()
args += ["-c"]
py_str = [
Expand All @@ -157,15 +156,20 @@ def init():
py_str = "; ".join(py_str)
args += [py_str]

cpu_count = CpuCount.get()
hosts = MpiHosts.get()
info = MPI.Info.Create()
host_list = hosts.split(",") if hosts is not None else ["localhost"]
hosts_count = len(host_list)
# +1 for monitor process on each host
nprocs_to_spawn = cpu_count + len(host_list)
if hosts:
if "Open MPI" in MPI.Get_library_version():
host_list = str(hosts).split(",")
workers_per_host = [
int(nprocs_to_spawn / len(host_list))
+ (1 if i < nprocs_to_spawn % len(host_list) else 0)
for i in range(len(host_list))
int(nprocs_to_spawn / hosts_count)
+ (1 if i < nprocs_to_spawn % hosts_count else 0)
for i in range(hosts_count)
]
hosts = ",".join(
[
Expand Down Expand Up @@ -193,13 +197,9 @@ def init():
if parent_comm != MPI.COMM_NULL:
comm = parent_comm.Merge(high=True)

mpi_state = communication.MPIState.get_instance(
comm, comm.Get_rank(), comm.Get_size()
)
host_comm = comm.Split_type(MPI.COMM_TYPE_SHARED)

global topology
if not topology:
topology = communication.get_topology()
mpi_state = communication.MPIState.get_instance(comm, host_comm)

global is_mpi_initialized
if not is_mpi_initialized:
Expand All @@ -210,8 +210,7 @@ def init():
signal.signal(signal.SIGTERM, _termination_handler)
signal.signal(signal.SIGINT, _termination_handler)
return

if mpi_state.rank == communication.MPIRank.MONITOR:
elif mpi_state.host_rank == communication.MPIRank.MONITOR:
from unidist.core.backends.mpi.core.monitor import monitor_loop

monitor_loop()
Expand All @@ -221,11 +220,7 @@ def init():
if not IsMpiSpawnWorkers.get():
sys.exit()
return

if mpi_state.rank not in (
communication.MPIRank.ROOT,
communication.MPIRank.MONITOR,
):
else:
from unidist.core.backends.mpi.core.worker.loop import worker_loop

asyncio.run(worker_loop())
Expand Down Expand Up @@ -287,13 +282,19 @@ def cluster_resources():
Dictionary with cluster nodes info in the form
`{"node_ip0": {"CPU": x0}, "node_ip1": {"CPU": x1}, ...}`.
"""
global topology
if not topology:
mpi_state = communication.MPIState.get_instance()
if mpi_state is None:
raise RuntimeError("'unidist.init()' has not been called yet")

cluster_resources = defaultdict(dict)
for host, ranks_list in topology.items():
cluster_resources[host]["CPU"] = len(ranks_list)
for host, ranks_list in mpi_state.topology.items():
cluster_resources[host]["CPU"] = len(
[
r
for r in ranks_list
if r not in (communication.MPIRank.ROOT, communication.MPIRank.MONITOR)
]
)

return dict(cluster_resources)

Expand Down Expand Up @@ -407,17 +408,18 @@ def wait(data_ids, num_returns=1):
"num_returns": pending_returns,
}
mpi_state = communication.MPIState.get_instance()
root_monitor = mpi_state.get_monitor_by_worker_rank(communication.MPIRank.ROOT)
# We use a blocking send and recv here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.send_simple_operation(
mpi_state.comm,
operation_type,
operation_data,
communication.MPIRank.MONITOR,
root_monitor,
)
data = communication.recv_simple_operation(
mpi_state.comm,
communication.MPIRank.MONITOR,
root_monitor,
)
ready.extend(data["ready"])
not_ready = data["not_ready"]
Expand Down
22 changes: 6 additions & 16 deletions unidist/core/backends/mpi/core/controller/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,23 @@

logger = common.get_logger("common", "common.log")

# initial worker number is equal to rank 2 because
# rank 0 is for controller process
# rank 1 is for monitor process
initial_worker_number = 2


class RoundRobin:
__instance = None

def __init__(self):
self.reserved_ranks = []
mpi_state = communication.MPIState.get_instance()
self.rank_to_schedule = itertools.cycle(
(
rank
for rank in range(
initial_worker_number,
communication.MPIState.get_instance().world_size,
)
for rank in mpi_state.workers
# check if a rank to schedule is not equal to the rank
# of the current process to not get into recursive scheduling
if rank != communication.MPIState.get_instance().rank
if rank != mpi_state.rank
)
)
logger.debug(
f"RoundRobin init for {communication.MPIState.get_instance().rank} rank"
)
logger.debug(f"RoundRobin init for {mpi_state.rank} rank")

@classmethod
def get_instance(cls):
Expand All @@ -64,11 +55,10 @@ def schedule_rank(self):
A rank number.
"""
next_rank = None
mpi_state = communication.MPIState.get_instance()

# Go rank by rank to find the first one non-reserved
for _ in range(
initial_worker_number, communication.MPIState.get_instance().world_size
):
for _ in mpi_state.workers:
rank = next(self.rank_to_schedule)
if rank not in self.reserved_ranks:
next_rank = rank
Expand Down
10 changes: 6 additions & 4 deletions unidist/core/backends/mpi/core/controller/garbage_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.serialization import SimpleDataSerializer
from unidist.core.backends.mpi.core.controller.object_store import object_store
from unidist.core.backends.mpi.core.controller.common import initial_worker_number


logger = common.get_logger("utils", "utils.log")
Expand Down Expand Up @@ -63,7 +62,7 @@ def _send_cleanup_request(self, cleanup_list):
# Cache serialized list of data IDs
s_cleanup_list = SimpleDataSerializer().serialize_pickle(cleanup_list)
async_operations = AsyncOperations.get_instance()
for rank_id in range(initial_worker_number, mpi_state.world_size):
for rank_id in mpi_state.workers:
if rank_id != mpi_state.rank:
h_list = communication.isend_serialized_operation(
mpi_state.comm,
Expand Down Expand Up @@ -119,17 +118,20 @@ def regular_cleanup(self):
logger.debug("Cleanup counter {}".format(self._cleanup_counter))

mpi_state = communication.MPIState.get_instance()
root_monitor = mpi_state.get_monitor_by_worker_rank(
communication.MPIRank.ROOT
)
# Compare submitted and executed tasks
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
communication.mpi_send_object(
mpi_state.comm,
common.Operation.GET_TASK_COUNT,
communication.MPIRank.MONITOR,
root_monitor,
)
executed_task_counter = communication.recv_simple_operation(
mpi_state.comm,
communication.MPIRank.MONITOR,
root_monitor,
)

logger.debug(
Expand Down
10 changes: 8 additions & 2 deletions unidist/core/backends/mpi/core/worker/task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,14 @@ async def execute():
# Monitor the task execution
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
root_monitor = mpi_state.get_monitor_by_worker_rank(
communication.MPIRank.ROOT
)
communication.send_simple_operation(
communication.MPIState.get_instance().comm,
common.Operation.TASK_DONE,
completed_data_ids,
communication.MPIRank.MONITOR,
root_monitor,
)

async_task = asyncio.create_task(execute())
Expand Down Expand Up @@ -336,11 +339,14 @@ async def execute():
# Monitor the task execution.
# We use a blocking send here because we have to wait for
# completion of the communication, which is necessary for the pipeline to continue.
root_monitor = mpi_state.get_monitor_by_worker_rank(
communication.MPIRank.ROOT
)
communication.send_simple_operation(
communication.MPIState.get_instance().comm,
common.Operation.TASK_DONE,
completed_data_ids,
communication.MPIRank.MONITOR,
root_monitor,
)

def process_task_request(self, request):
Expand Down

0 comments on commit 2b2d95b

Please sign in to comment.