Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#273: Scheduler with task tracking #274

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
7 changes: 5 additions & 2 deletions unidist/core/backends/mpi/core/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def mpi_send_object(comm, data, dest_rank):
comm.send(data, dest=dest_rank)


def mpi_isend_object(comm, data, dest_rank):
def mpi_isend_object(comm, data, dest_rank, tag=0):
"""
Send Python object to another MPI rank in a non-blocking way.

Expand All @@ -193,13 +193,16 @@ def mpi_isend_object(comm, data, dest_rank):
Data to send.
dest_rank : int
Target MPI process to transfer data.
tag : int
To recive only data with a label
Used when background thread polls for data with a specific label
YarShev marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
object
A handler to MPI_Isend communication result.
"""
return comm.isend(data, dest=dest_rank)
return comm.isend(data, dest=dest_rank, tag=tag)


def mpi_send_buffer(comm, buffer_size, buffer, dest_rank):
Expand Down
8 changes: 4 additions & 4 deletions unidist/core/backends/mpi/core/controller/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from unidist.core.backends.mpi.core.controller.garbage_collector import (
garbage_collector,
)
from unidist.core.backends.mpi.core.controller.common import push_data, RoundRobin
from unidist.core.backends.mpi.core.controller.common import push_data, Scheduler


class ActorMethod:
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(self, cls, *args, owner_rank=None, handler_id=None, **kwargs):
self._args = args
self._kwargs = kwargs
self._owner_rank = (
RoundRobin.get_instance().schedule_rank()
Scheduler.get_instance().schedule_rank()
if owner_rank is None
else owner_rank
)
Expand All @@ -103,7 +103,7 @@ def __init__(self, cls, *args, owner_rank=None, handler_id=None, **kwargs):
object_store.put_data_owner(self._handler_id, self._owner_rank)

# reserve a rank for actor execution only
RoundRobin.get_instance().reserve_rank(self._owner_rank)
Scheduler.get_instance().reserve_rank(self._owner_rank)

# submit `ACTOR_CREATE` task to a worker only once
if owner_rank is None and handler_id is None:
Expand Down Expand Up @@ -186,4 +186,4 @@ def __del__(self):
"""
This is defined to release the rank reserved for the actor when it gets out of scope.
"""
RoundRobin.get_instance().release_rank(self._owner_rank)
Scheduler.get_instance().release_rank(self._owner_rank)
67 changes: 64 additions & 3 deletions unidist/core/backends/mpi/core/controller/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import atexit
import signal
import asyncio
import time
from collections import defaultdict
import threading

try:
import mpi4py
Expand All @@ -24,7 +26,7 @@
from unidist.core.backends.mpi.core.controller.common import (
request_worker_data,
push_data,
RoundRobin,
Scheduler,
)
import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
Expand All @@ -49,6 +51,56 @@
topology = dict()
# The global variable is responsible for if MPI backend has already been initialized
is_mpi_initialized = False
threads = []
YarShev marked this conversation as resolved.
Show resolved Hide resolved
BACKOFF = 0.001
YarShev marked this conversation as resolved.
Show resolved Hide resolved
exitFlag = False
YarShev marked this conversation as resolved.
Show resolved Hide resolved


def _getopt_backoff(options):
YarShev marked this conversation as resolved.
Show resolved Hide resolved
backoff = options.get("backoff")
if backoff is None:
backoff = BACKOFF
return float(backoff)


class Backoff:
def __init__(self, seconds=BACKOFF):
self.tval = 0.0
self.tmax = max(float(seconds), 0.0)
self.tmin = self.tmax / (1 << 10)

def reset(self):
self.tval = 0.0

def sleep(self):
time.sleep(self.tval)
self.tval = min(self.tmax, max(self.tmin, self.tval * 2))


class myThread(threading.Thread):
YarShev marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, threadID, name, comm):
YarShev marked this conversation as resolved.
Show resolved Hide resolved
threading.Thread.__init__(self, daemon=True)
self.threadID = threadID
self.name = name
self.comm = comm

def run(self):
print("Starting " + self.name)
poll_tasks_completed(self.name, self.comm)
print("Exiting " + self.name)
YarShev marked this conversation as resolved.
Show resolved Hide resolved


def poll_tasks_completed(threadName, comm):
global exitFlag
scheduler = Scheduler.get_instance()
backoff = Backoff()
while not exitFlag:
if comm.iprobe(source=communication.MPIRank.MONITOR, tag=1):
task_completed_rank = comm.recv(source=communication.MPIRank.MONITOR, tag=1)
scheduler.decrement_tasks_on_worker(task_completed_rank)
backoff.reset()
else:
backoff.sleep()


def init():
Expand Down Expand Up @@ -126,6 +178,10 @@ def init():
mpi_state = communication.MPIState.get_instance(
comm, comm.Get_rank(), comm.Get_size()
)
if rank == 0 and not threads and parent_comm == MPI.COMM_NULL:
thread = myThread(1, "tName", comm)
thread.start()
threads.append(thread)

global topology
if not topology:
Expand Down Expand Up @@ -179,7 +235,11 @@ def shutdown():
-----
Sends cancelation operation to all workers and monitor processes.
"""
global exitFlag, threads
exitFlag = True
mpi_state = communication.MPIState.get_instance()
for thread in threads:
thread.join()
# Send shutdown commands to all ranks
for rank_id in range(communication.MPIRank.MONITOR, mpi_state.world_size):
# We use a blocking send here because we have to wait for
Expand Down Expand Up @@ -367,8 +427,9 @@ def submit(task, *args, num_returns=1, **kwargs):
# if all the tasks were completed
garbage_collector.regular_cleanup()

dest_rank = RoundRobin.get_instance().schedule_rank()

scheduler = Scheduler.get_instance()
dest_rank = scheduler.schedule_rank()
scheduler.increment_tasks_on_worker(dest_rank)
output_ids = object_store.generate_output_data_id(
dest_rank, garbage_collector, num_returns
)
Expand Down
86 changes: 56 additions & 30 deletions unidist/core/backends/mpi/core/controller/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

"""Common functionality related to `controller`."""

import itertools

from unidist.core.backends.common.data_id import is_data_id
import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
Expand All @@ -20,38 +18,43 @@
initial_worker_number = 2


class RoundRobin:
class Scheduler:
__instance = None

def __init__(self):
self.reserved_ranks = []
self.rank_to_schedule = itertools.cycle(
(
rank
for rank in range(
initial_worker_number,
communication.MPIState.get_instance().world_size,
)
# check if a rank to schedule is not equal to the rank
# of the current process to not get into recursive scheduling
if rank != communication.MPIState.get_instance().rank
self.task_per_worker = {
k: 0
for k in range(
initial_worker_number, communication.MPIState.get_instance().world_size
)
)
}

self.rank_to_schedule = [
rank
for rank in range(
initial_worker_number,
communication.MPIState.get_instance().world_size,
)
# check if a rank to schedule is not equal to the rank
# of the current process to not get into recursive scheduling
if rank != communication.MPIState.get_instance().rank
]
logger.debug(
f"RoundRobin init for {communication.MPIState.get_instance().rank} rank"
f"Scheduler init for {communication.MPIState.get_instance().rank} rank"
)

@classmethod
def get_instance(cls):
"""
Get instance of ``RoundRobin``.
Get instance of ``Scheduler``.

Returns
-------
RoundRobin
Scheduler
"""
if cls.__instance is None:
cls.__instance = RoundRobin()
cls.__instance = Scheduler()
return cls.__instance

def schedule_rank(self):
Expand All @@ -63,16 +66,9 @@ def schedule_rank(self):
int
A rank number.
"""
next_rank = None

# Go rank by rank to find the first one non-reserved
for _ in range(
initial_worker_number, communication.MPIState.get_instance().world_size
):
rank = next(self.rank_to_schedule)
if rank not in self.reserved_ranks:
next_rank = rank
break
next_rank = min(
self.rank_to_schedule, key=self.task_per_worker.get, default=None
)

if next_rank is None:
raise Exception("All ranks blocked")
Expand All @@ -91,9 +87,11 @@ def reserve_rank(self, rank):
rank : int
A rank number.
"""
if rank in self.rank_to_schedule:
self.rank_to_schedule.remove(rank)
self.reserved_ranks.append(rank)
logger.debug(
f"RoundRobin reserve rank {rank} for actor "
f"Scheduler reserve rank {rank} for actor "
+ f"on worker with rank {communication.MPIState.get_instance().rank}"
)

Expand All @@ -108,12 +106,40 @@ def release_rank(self, rank):
rank : int
A rank number.
"""

self.reserved_ranks.remove(rank)
self.rank_to_schedule.append(rank)
logger.debug(
f"RoundRobin release rank {rank} reserved for actor "
f"Scheduler release rank {rank} reserved for actor "
+ f"on worker with rank {communication.MPIState.get_instance().rank}"
)

def increment_tasks_on_worker(self, rank):
"""
Increments the count of tasks submitted to a worker.

This helps to track tasks submitted per workers

Parameters
----------
rank : int
A rank number.
"""
self.task_per_worker[rank] += 1

def decrement_tasks_on_worker(self, rank):
"""
Decrement the count of tasks submitted to a worker.

This helps to track tasks submitted per workers

Parameters
----------
rank : int
A rank number.
"""
self.task_per_worker[rank] -= 1


def request_worker_data(data_id):
"""
Expand Down
3 changes: 3 additions & 0 deletions unidist/core/backends/mpi/core/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def monitor_loop():
# Proceed the request
if operation_type == common.Operation.TASK_DONE:
task_counter.increment()
communication.mpi_isend_object(
YarShev marked this conversation as resolved.
Show resolved Hide resolved
mpi_state.comm, source_rank, communication.MPIRank.ROOT, 1
)
elif operation_type == common.Operation.GET_TASK_COUNT:
# We use a blocking send here because the receiver is waiting for the result.
communication.mpi_send_object(
Expand Down