Skip to content

Commit

Permalink
using bigmpi again
Browse files Browse the repository at this point in the history
  • Loading branch information
arunjose696 committed Nov 8, 2023
1 parent 3cbeddd commit 42de84e
Showing 1 changed file with 33 additions and 34 deletions.
67 changes: 33 additions & 34 deletions unidist/core/backends/mpi/core/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

# Logger configuration
logger = common.get_logger("communication", "communication.log")
logger1 = common.get_logger("communication1", "communication1.log")
is_logger_header_printed = False


Expand Down Expand Up @@ -465,18 +464,20 @@ def mpi_send_buffer(comm, buffer, dest_rank, data_type=MPI.CHAR, buffer_size=Non
if buffer_size:
comm.send(buffer_size, dest=dest_rank, tag=common.MPITag.OBJECT)
else:
buffer_size = len(buffer)
PARTITION_SIZE = 100000
partitions=list(range(0, buffer_size,PARTITION_SIZE))
buffer_size = len(buffer)
block_size = pkl5._bigmpi.blocksize
partitions = list(range(0, buffer_size, block_size))
partitions.append(buffer_size)
#with pkl5._bigmpi() as bigmpi:
if True:
for i,_ in enumerate(partitions):
if i+1<len(partitions):
logger1.debug(f" in send i={i} current={comm.rank}, destination={dest_rank}")
temp=buffer[partitions[i]:partitions[i+1]]
comm.Send([buffer[partitions[i]:partitions[i+1]],data_type], dest=dest_rank, tag=common.MPITag.BUFFER )

with pkl5._bigmpi as bigmpi:
for i, _ in enumerate(partitions):
if i + 1 < len(partitions):
comm.Send(
bigmpi(buffer[partitions[i] : partitions[i + 1]]),
dest=dest_rank,
tag=common.MPITag.BUFFER,
)


def mpi_isend_buffer(comm, buffer_size, buffer, dest_rank):
"""
Send buffer object to another MPI rank in a non-blocking way.
Expand Down Expand Up @@ -505,18 +506,19 @@ def mpi_isend_buffer(comm, buffer_size, buffer, dest_rank):
requests = []
h1 = comm.isend(buffer_size, dest=dest_rank, tag=common.MPITag.OBJECT)
requests.append((h1, None))

PARTITION_SIZE = 100000
partitions=list(range(0, buffer_size,PARTITION_SIZE))
block_size = pkl5._bigmpi.blocksize
partitions = list(range(0, buffer_size, block_size))
partitions.append(buffer_size)
#with pkl5._bigmpi() as bigmpi:
if True:
for i,_ in enumerate(partitions):
if i+1<len(partitions):
temp=buffer[partitions[i]:partitions[i+1]]
h2=comm.Isend([buffer[partitions[i]:partitions[i+1]],MPI.CHAR], dest=dest_rank, tag=common.MPITag.BUFFER )
with pkl5._bigmpi as bigmpi:
for i, _ in enumerate(partitions):
if i + 1 < len(partitions):
h2 = comm.Isend(
bigmpi(buffer[partitions[i] : partitions[i + 1]]),
dest=dest_rank,
tag=common.MPITag.BUFFER,
)
requests.append((h2, buffer))

return requests


Expand Down Expand Up @@ -548,19 +550,16 @@ def mpi_recv_buffer(comm, source_rank, result_buffer=None):
result_buffer = bytearray(buf_size)
else:
buf_size = len(result_buffer)
PARTITION_SIZE = 100000
partitions=list(range(0, buf_size,PARTITION_SIZE))
block_size = pkl5._bigmpi.blocksize
partitions = list(range(0, buf_size, block_size))
partitions.append(buf_size)
#with pkl5._bigmpi as bigmpi:
if True:
logger1.debug(f" in recv {list(partitions)}")
for i,_ in enumerate(partitions):
if i+1<len(partitions):
logger1.debug(f" in recv {i}")
temp=bytearray(partitions[i+1]-partitions[i])
comm.Recv(temp, source=source_rank, tag=common.MPITag.BUFFER )
result_buffer[partitions[i]:partitions[i+1]]=temp

with pkl5._bigmpi as bigmpi:
for i, _ in enumerate(partitions):
if i + 1 < len(partitions):
temp = bytearray(partitions[i + 1] - partitions[i])
comm.Recv(bigmpi(temp), source=source_rank, tag=common.MPITag.BUFFER)
result_buffer[partitions[i] : partitions[i + 1]] = temp

return result_buffer


Expand Down

0 comments on commit 42de84e

Please sign in to comment.