Skip to content

Commit

Permalink
Adding barriers in all processes so the init function exits only when…
Browse files Browse the repository at this point in the history
… the worker and monitor loops have started

Signed-off-by: arunjose696 <[email protected]>
  • Loading branch information
arunjose696 committed Oct 30, 2023
1 parent 2bc6034 commit 6382d53
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
2 changes: 2 additions & 0 deletions unidist/core/backends/mpi/core/controller/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ def init():
atexit.register(_termination_handler)
signal.signal(signal.SIGTERM, _termination_handler)
signal.signal(signal.SIGINT, _termination_handler)
# Exit the init function in root only after monitor and worker loops have started.
mpi_state.comm.Barrier()
return
elif mpi_state.is_monitor_process():
from unidist.core.backends.mpi.core.monitor.loop import monitor_loop
Expand Down
4 changes: 3 additions & 1 deletion unidist/core/backends/mpi/core/monitor/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def monitor_loop():
The loop exits on special cancelation operation.
``unidist.core.backends.mpi.core.common.Operations`` defines a set of supported operations.
"""
monitor_logger.debug("Monitor loop started")
task_counter = TaskCounter.get_instance()
mpi_state = communication.MPIState.get_instance()
wait_handler = WaitHandler.get_instance()
Expand All @@ -182,7 +183,8 @@ def monitor_loop():
# Once all workers excluding ``Root`` and ``Monitor`` ranks are ready to shutdown,
# ``Monitor` sends the shutdown signal to every worker, as well as notifies ``Root`` that
# it can exit the program.

# Barrier in monitor process to check if monitor loop has started
mpi_state.comm.Barrier()
while True:
# Listen receive operation from any source
operation_type, source_rank = communication.mpi_recv_operation(mpi_state.comm)
Expand Down
4 changes: 4 additions & 0 deletions unidist/core/backends/mpi/core/worker/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async def worker_loop():
The loop exits on special cancelation operation.
``unidist.core.backends.mpi.core.common.Operations`` defines a set of supported operations.
"""
w_logger.debug("Worker loop started")
task_store = TaskStore.get_instance()
local_store = LocalObjectStore.get_instance()
request_store = RequestStore.get_instance()
Expand All @@ -94,6 +95,9 @@ async def worker_loop():
# and the worker sends the ready to shutdown signal to ``Monitor``.
# Once all workers excluding ``Root`` and ``Monitor`` ranks are ready to shutdown,
# ``Monitor` sends the shutdown signal to every worker so they can exit the loop.

# Barrier in worker process to check if worker loop has started
mpi_state.comm.Barrier()
while True:
# Listen receive operation from any source
operation_type, source_rank = await async_wrap(
Expand Down

0 comments on commit 6382d53

Please sign in to comment.