Skip to content

Commit

Permalink
PERF-#367: Use std::fill to fill service shared buffer with a given v…
Browse files Browse the repository at this point in the history
…alue (#373)

Co-authored-by: Igoshev, Iaroslav <[email protected]>
Signed-off-by: arunjose696 <[email protected]>
  • Loading branch information
arunjose696 and YarShev authored Nov 7, 2023
1 parent 0c6896a commit abe15f5
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 10 deletions.
2 changes: 2 additions & 0 deletions unidist/core/backends/mpi/core/controller/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ def init():
atexit.register(_termination_handler)
signal.signal(signal.SIGTERM, _termination_handler)
signal.signal(signal.SIGINT, _termination_handler)
# Exit the init function in root only after monitor and worker loops have started
mpi_state.global_comm.Barrier()
return
elif mpi_state.is_monitor_process():
from unidist.core.backends.mpi.core.monitor.loop import monitor_loop
Expand Down
16 changes: 15 additions & 1 deletion unidist/core/backends/mpi/core/memory/_memory.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0

from libc.stdint cimport uint8_t
from libc.stdint cimport uint8_t, int64_t

cimport memory

Expand All @@ -25,3 +25,17 @@ def parallel_memcopy(const uint8_t[:] src, uint8_t[:] dst, int memcopy_threads):
len(src),
64,
memcopy_threads)

def fill(int64_t[:] buff, int64_t value):
"""
Fill a given buffer with a given value.
Parameters
----------
buff : int64_t[:]
Original data.
value : int64_t
Value to fill.
"""
with nogil:
memory.fill(&buff[0], len(buff), value)
4 changes: 4 additions & 0 deletions unidist/core/backends/mpi/core/memory/memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ namespace unidist {
return reinterpret_cast<uint8_t *>(value & bits);
}

void fill(int64_t *buff, int64_t size, int64_t value){
std::fill(buff, buff+size, value);
}

void parallel_memcopy(uint8_t *dst,
const uint8_t *src,
int64_t nbytes,
Expand Down
2 changes: 2 additions & 0 deletions unidist/core/backends/mpi/core/memory/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace unidist {
int64_t nbytes,
uintptr_t block_size,
int num_threads);

void fill(int64_t *buff, int64_t size, int64_t value);
} // namespace unidist

#endif // MEMORY_H
3 changes: 3 additions & 0 deletions unidist/core/backends/mpi/core/memory/memory.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ cdef extern from "memory.h" namespace "unidist" nogil:
int64_t nbytes,
uintptr_t block_size,
int num_threads)

void fill(int64_t *buff, int64_t size, int64_t value)

8 changes: 5 additions & 3 deletions unidist/core/backends/mpi/core/monitor/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,14 @@ def monitor_loop():
shared_store = SharedObjectStore.get_instance()
shm_manager = SharedMemoryManager()

workers_ready_to_shutdown = []
shutdown_workers = False
# Barrier to check if monitor process is ready to start the communication loop
mpi_state.global_comm.Barrier()
monitor_logger.debug("Monitor loop started")
# Once all workers excluding ``Root`` and ``Monitor`` ranks are ready to shutdown,
# ``Monitor` sends the shutdown signal to every worker, as well as notifies ``Root`` that
# it can exit the program.

workers_ready_to_shutdown = []
shutdown_workers = False
while True:
# Listen receive operation from any source
operation_type, source_rank = communication.mpi_recv_operation(
Expand Down
7 changes: 2 additions & 5 deletions unidist/core/backends/mpi/core/shared_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
import time
import psutil
import weakref
from array import array

from unidist.core.backends.mpi.core._memory import parallel_memcopy
from unidist.core.backends.mpi.core._memory import parallel_memcopy, fill
from unidist.core.backends.mpi.utils import ImmutableDict

try:
Expand Down Expand Up @@ -190,9 +189,7 @@ def _allocate_shared_memory(self):
self.service_shared_buffer = memoryview(service_buffer).cast("l")
# Set -1 to the service buffer because 0 is a valid value and may be recognized by mistake.
if mpi_state.is_monitor_process():
self.service_shared_buffer[:] = array(
"l", [-1] * len(self.service_shared_buffer)
)
fill(self.service_shared_buffer, -1)

def _parse_data_id(self, data_id):
"""
Expand Down
6 changes: 5 additions & 1 deletion unidist/core/backends/mpi/core/worker/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,16 @@ async def worker_loop():
local_store = LocalObjectStore.get_instance()
request_store = RequestStore.get_instance()
async_operations = AsyncOperations.get_instance()
ready_to_shutdown_posted = False

# Barrier to check if worker process is ready to start the communication loop
mpi_state.global_comm.Barrier()
w_logger.debug("Worker loop started")
# Once the worker receives the cancel signal from ``Root`` rank,
# it is getting to shutdown. All pending requests and communications are cancelled,
# and the worker sends the ready to shutdown signal to ``Monitor``.
# Once all workers excluding ``Root`` and ``Monitor`` ranks are ready to shutdown,
# ``Monitor` sends the shutdown signal to every worker so they can exit the loop.
ready_to_shutdown_posted = False
while True:
# Listen receive operation from any source
operation_type, source_rank = await async_wrap(
Expand Down

0 comments on commit abe15f5

Please sign in to comment.