Skip to content

Commit

Permalink
Gigabyte throughput fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
venkatsc committed Oct 27, 2019
1 parent aca5f49 commit d45badc
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;

import static org.apache.flink.util.Preconditions.checkNotNull;

//import scala.collection.mutable.HashMap;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -212,7 +216,8 @@ class PartitionReaderClient implements Runnable {
private final RdmaShuffleClientEndpoint clientEndpoint;
private final PartitionRequestClientHandler clientHandler;
ArrayDeque<ByteBuf> receivedBuffers = new ArrayDeque<>();
private long workRequestId;
Map<Long,ByteBuf> inFlight = new HashMap<Long,ByteBuf>();
// private long workRequestId;

public PartitionReaderClient(final ResultPartitionID partitionId,
final int subpartitionIndex,
Expand All @@ -237,7 +242,7 @@ private int postBuffers(int credit) {
// the credit if there is no backlog
receiveBuffer = (NetworkBuffer) inputChannel.requestBuffer();
if (receiveBuffer != null) {
RdmaSendReceiveUtil.postReceiveReqWithChannelBuf(clientEndpoint, ++workRequestId, receiveBuffer);
RdmaSendReceiveUtil.postReceiveReqWithChannelBuf(clientEndpoint,clientEndpoint.workRequestId.incrementAndGet(), receiveBuffer);
receivedBuffers.addLast(receiveBuffer);
} else {
LOG.error("Buffer from the channel is null");
Expand Down Expand Up @@ -265,7 +270,7 @@ public void run() {
try {
buf = msg.write(clientEndpoint.getNettyBufferpool());
clientEndpoint.getSendBuffer().put(buf.nioBuffer());
RdmaSendReceiveUtil.postSendReq(clientEndpoint, ++workRequestId);
RdmaSendReceiveUtil.postSendReq(clientEndpoint, clientEndpoint.workRequestId.incrementAndGet());
} catch (Exception ioe) {
LOG.error("Failed to serialize partition request");
return;
Expand All @@ -285,8 +290,12 @@ public void run() {
unannouncedCredit - failed,
inputChannel.getInputChannelId());
availableCredit += unannouncedCredit;
clientEndpoint.getSendBuffer().put(msg.write(clientEndpoint.getNettyBufferpool()).nioBuffer());
RdmaSendReceiveUtil.postSendReq(clientEndpoint, ++workRequestId);
ByteBuf message = msg.write(clientEndpoint.getNettyBufferpool());
// TODO: lurcking bug, if credit posted before sending out the previous credit, we might hold
clientEndpoint.getSendBuffer().put(message.nioBuffer());
long workID = clientEndpoint.workRequestId.incrementAndGet();
inFlight.put(workID,message);
RdmaSendReceiveUtil.postSendReq(clientEndpoint, workID);
} else {
// LOG.info("No credit available on channel {}",availableCredit,inputChannel);
// wait for the credit to be available, otherwise connection stucks in blocking
Expand Down Expand Up @@ -319,7 +328,11 @@ public void run() {
byte ID = receiveBuffer.readByte();
switch (ID) {
case NettyMessage.BufferResponse.ID:
clientHandler.decodeMsg(NettyMessage.BufferResponse.readFrom(receiveBuffer),
NettyMessage.BufferResponse bufferOrEvent =NettyMessage.BufferResponse.readFrom(receiveBuffer);
// LOG.info("Receive complete: " + wc.getWr_id() + "buff address: "+ receiveBuffer.memoryAddress() + " seq:" + bufferOrEvent
// .sequenceNumber + " receiver id " + bufferOrEvent.receiverId + " backlog: " +
// bufferOrEvent.backlog);
clientHandler.decodeMsg(bufferOrEvent,
false, clientEndpoint, inputChannel, finished);
break;
case NettyMessage.CloseRequest.ID:
Expand All @@ -338,6 +351,10 @@ public void run() {
LOG.error("Client: Send failed. reposting new send request request " + clientEndpoint
.getEndpointStr());
}
ByteBuf sendBufNetty =inFlight.get(wc.getWr_id());
if (sendBufNetty!=null){
sendBufNetty.release();
}
clientEndpoint.getSendBuffer().clear();
} else {
LOG.error("failed to match any condition " + wc.getOpcode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ private NettyMessage readPartition( NetworkSequenceViewReader reader) throws
} else {
// This channel was now removed from the available reader queue.
// We re-add it into the queue if it is still available
if (next.moreAvailable()) {
reader.setRegisteredAsAvailable(true);
} else {
reader.setRegisteredAsAvailable(false);
}
// if (next.moreAvailable()) {
// reader.setRegisteredAsAvailable(true);
// } else {
// reader.setRegisteredAsAvailable(false);
// }
NettyMessage.BufferResponse msg = new NettyMessage.BufferResponse(
next.buffer(),
reader.getSequenceNumber(),
Expand Down Expand Up @@ -177,7 +177,7 @@ public void run() {
NettyMessage.AddCredit request = (NettyMessage.AddCredit) clientRequest;
// NetworkSequenceViewReader reader = readers.get(request.receiverId);
if (reader!=null) {
LOG.info("Add credit: credit {} on the reader {}",request.credit,reader);
// LOG.info("Add credit: credit {} on the reader {}",request.credit,reader);
reader.addCredit(request.credit);
}
RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId.incrementAndGet()); // post next
Expand All @@ -199,10 +199,11 @@ public void run() {
NettyMessage.BufferResponse response;
synchronized (inFlight) {
long wc_id=wc.getWr_id();
// LOG.info("work request completed {}",wc_id);
response = inFlight.remove(wc_id);
if (response !=null) {
// LOG.info("releasing buffer on send completion for WR {} address {}",wc_id,response.getBuffer().memoryAddress());
response.releaseBuffer();
// LOG.info("releasing buffer on send completion for WR {}",wc_id);
}
}

Expand Down Expand Up @@ -246,14 +247,15 @@ public void run() {
// break;
// }
while (!reader.isReleased()) {
if (reader.isRegisteredAsAvailable()) {
if (reader.isAvailable()) {
if (((SequenceNumberingViewReader) reader).hasCredit()) {
NettyMessage response = readPartition(reader);
if (response == null) {
// False reader available is set, exit the reader here and write to the
// next reader with credit
break;
// break;
// response = new NettyMessage.CloseRequest();
LOG.info("should not be null");
}
((SequenceNumberingViewReader) reader).decrementCredit();
if (response instanceof NettyMessage.BufferResponse) {
Expand All @@ -272,7 +274,7 @@ public void run() {
response.write(bufferPool); // creates the header info
long workRequestId= clientEndpoint.workRequestId.incrementAndGet();
synchronized (inFlight) {
// LOG.info("Add buffer to inFlight "+ workRequestId);
// LOG.info("Add buffer to inFlight: wr {} memory address: {}", workRequestId,((NettyMessage.BufferResponse) response).getBuffer().memoryAddress());
inFlight.put(workRequestId, (NettyMessage.BufferResponse) response);
}
RdmaSendReceiveUtil.postSendReqForBufferResponse(clientEndpoint,workRequestId , (NettyMessage.BufferResponse) response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void dispatchCqEvent(IbvWC wc) throws IOException {
// throw new RuntimeException("*******client got "+ IbvWC.IbvWcOpcode.valueOf(newOpCode) +" event twice in a row. last id = "+old.getWr_id()+", current id "+old.getWr_id()+"***********");
// }
// lastEvent.set(wc.clone());
wcEvents.add(wc);
wcEvents.add(wc.clone());
}

public void init() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void dispatchCqEvent(IbvWC wc) throws IOException {
// throw new RuntimeException("*******server got "+ IbvWC.IbvWcOpcode.valueOf(newOpCode) +" event twice in a row. last id = "+old.getWr_id()+", current id "+old.getWr_id()+"***********");
// }
// lastEvent.set(wc.clone());
wcEvents.add(wc);
wcEvents.add(wc.clone());
}

public void init() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void releaseAllResources() throws IOException {
public void notifyDataAvailable() {
// LOG.debug("Received data available notification "+this);// requestQueue.notifyReaderNonEmpty(this); // TODO (venkat): Might read the data before available
synchronized (this) {
this.setRegisteredAsAvailable(true);
// this.setRegisteredAsAvailable(true);
this.notifyAll();
}
}
Expand All @@ -172,7 +172,7 @@ public String toString() {
"requestLock=" + requestLock +
", receiverId=" + receiverId +
", sequenceNumber=" + sequenceNumber +
", isRegisteredAsAvailable=" + isRegisteredAvailable +
", isAvailable=" + isAvailable() +
", credit="+credit+
'}';
}
Expand Down

0 comments on commit d45badc

Please sign in to comment.