Skip to content

Commit

Permalink
blocking read on unavailable
Browse files Browse the repository at this point in the history
  • Loading branch information
venkatsc committed Oct 28, 2019
1 parent d45badc commit 9a22e4b
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void recycle(MemorySegment segment) {
listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
availableMemorySegments.notifyAll();
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ class PartitionReaderClient implements Runnable {
private int delayMs;
private final RdmaShuffleClientEndpoint clientEndpoint;
private final PartitionRequestClientHandler clientHandler;
ArrayDeque<ByteBuf> receivedBuffers = new ArrayDeque<>();
private final Map<Long,ByteBuf> receivedBuffers = new HashMap<>();
// ArrayDeque<ByteBuf> receivedBuffers = new ArrayDeque<>();
Map<Long,ByteBuf> inFlight = new HashMap<Long,ByteBuf>();
// private long workRequestId;

Expand All @@ -242,8 +243,9 @@ private int postBuffers(int credit) {
// the credit if there is no backlog
receiveBuffer = (NetworkBuffer) inputChannel.requestBuffer();
if (receiveBuffer != null) {
RdmaSendReceiveUtil.postReceiveReqWithChannelBuf(clientEndpoint,clientEndpoint.workRequestId.incrementAndGet(), receiveBuffer);
receivedBuffers.addLast(receiveBuffer);
long id = clientEndpoint.workRequestId.incrementAndGet();
RdmaSendReceiveUtil.postReceiveReqWithChannelBuf(clientEndpoint,id, receiveBuffer);
receivedBuffers.put(id,receiveBuffer);
} else {
LOG.error("Buffer from the channel is null");
}
Expand Down Expand Up @@ -279,7 +281,7 @@ public void run() {
try {
// TODO: send credit if credit is reached zero
// for (int i=0;i<takeEventsCount;i++) {
long startTime = System.nanoTime();
// long startTime = System.nanoTime();
if (availableCredit == 0) {
if (inputChannel.getUnannouncedCredit() > 0) {
int unannouncedCredit = inputChannel.getAndResetUnannouncedCredit();
Expand Down Expand Up @@ -307,12 +309,13 @@ public void run() {
}

IbvWC wc = clientEndpoint.getWcEvents().take();
// LOG.info("took client event with wr_id {} on endpoint {}", wc.getWr_id(), clientEndpoint.getEndpointStr());
if (IbvWC.IbvWcOpcode.valueOf(wc.getOpcode()) == IbvWC.IbvWcOpcode.IBV_WC_RECV) {
if (wc.getStatus() != IbvWC.IbvWcStatus.IBV_WC_SUCCESS.ordinal()) {
LOG.error("Receive posting failed. reposting new receive request");
} else {
// InfiniBand completes requests in FIFO, so we should have first buffer filled with the data
ByteBuf receiveBuffer = receivedBuffers.pollFirst();
ByteBuf receiveBuffer = receivedBuffers.get(wc.getWr_id());
availableCredit--;
receiveBuffer.readerIndex();
int segmentSize = ((NetworkBuffer) receiveBuffer).getMemorySegment().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public RdmaClient(NettyConfig rdmaConfig, PartitionRequestClientHandler clientHa
this.rdmaConfig = rdmaConfig;
this.clientHandler = clientHandler;
this.bufferPool = bufferPool;
endpointGroup = new RdmaActiveEndpointGroup<RdmaShuffleClientEndpoint>(1000, false, 2000, 2, 1000);
endpointGroup = new RdmaActiveEndpointGroup<RdmaShuffleClientEndpoint>(1000, true, 2000, 2, 1000);
endpointGroup.init(this);
endpointGroup.getConnParam().setRnr_retry_count((byte)7);
this.networkBufferPool=networkBufferPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class RdmaServer implements RdmaEndpointFactory<RdmaShuffleServerEndpoint
private static final Logger LOG = LoggerFactory.getLogger(RdmaServer.class);
private RdmaActiveEndpointGroup<RdmaShuffleServerEndpoint> endpointGroup;
private final NettyConfig rdmaConfig;
private int workRequestId = 1;
// private int workRequestId = 1;
private RdmaServerEndpoint<RdmaShuffleServerEndpoint> serverEndpoint;
private InetSocketAddress address;
private boolean stopped = false;
Expand Down Expand Up @@ -112,7 +112,7 @@ private void registerMemoryRegions(RdmaServerEndpoint<RdmaShuffleServerEndpoint>
public void start(Map<Long,IbvMr> registerdMRs) throws IOException {
// create a EndpointGroup. The RdmaActiveEndpointGroup contains CQ processing and delivers CQ event to the
// endpoint.dispatchCqEvent() method.
endpointGroup = new RdmaActiveEndpointGroup<RdmaShuffleServerEndpoint>(1000, false, 2000, 2, 100);
endpointGroup = new RdmaActiveEndpointGroup<RdmaShuffleServerEndpoint>(1000, true, 2000, 2, 1000);
endpointGroup.init(this);
endpointGroup.getConnParam().setRnr_retry_count((byte)7);
// create a server endpoint
Expand Down
Loading

0 comments on commit 9a22e4b

Please sign in to comment.