diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 6b607a39bf7ec..69c2df3f65ae3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -292,7 +292,7 @@ public void recycle(MemorySegment segment) { listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); - availableMemorySegments.notify(); + availableMemorySegments.notifyAll(); return; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/PartitionRequestClient.java index 3d6c1d81e886c..50e132b3d059e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/PartitionRequestClient.java @@ -215,7 +215,8 @@ class PartitionReaderClient implements Runnable { private int delayMs; private final RdmaShuffleClientEndpoint clientEndpoint; private final PartitionRequestClientHandler clientHandler; - ArrayDeque receivedBuffers = new ArrayDeque<>(); + private final Map receivedBuffers = new HashMap<>(); +// ArrayDeque receivedBuffers = new ArrayDeque<>(); Map inFlight = new HashMap(); // private long workRequestId; @@ -242,8 +243,9 @@ private int postBuffers(int credit) { // the credit if there is no backlog receiveBuffer = (NetworkBuffer) inputChannel.requestBuffer(); if (receiveBuffer != null) { - RdmaSendReceiveUtil.postReceiveReqWithChannelBuf(clientEndpoint,clientEndpoint.workRequestId.incrementAndGet(), receiveBuffer); - receivedBuffers.addLast(receiveBuffer); + long id = clientEndpoint.workRequestId.incrementAndGet(); + RdmaSendReceiveUtil.postReceiveReqWithChannelBuf(clientEndpoint,id, receiveBuffer); + receivedBuffers.put(id,receiveBuffer); } else { LOG.error("Buffer from the channel is null"); } @@ -279,7 +281,7 @@ public void run() { try { // TODO: send credit if credit is reached zero // for (int i=0;i 0) { int unannouncedCredit = inputChannel.getAndResetUnannouncedCredit(); @@ -307,12 +309,13 @@ public void run() { } IbvWC wc = clientEndpoint.getWcEvents().take(); +// LOG.info("took client event with wr_id {} on endpoint {}", wc.getWr_id(), clientEndpoint.getEndpointStr()); if (IbvWC.IbvWcOpcode.valueOf(wc.getOpcode()) == IbvWC.IbvWcOpcode.IBV_WC_RECV) { if (wc.getStatus() != IbvWC.IbvWcStatus.IBV_WC_SUCCESS.ordinal()) { LOG.error("Receive posting failed. reposting new receive request"); } else { // InfiniBand completes requests in FIFO, so we should have first buffer filled with the data - ByteBuf receiveBuffer = receivedBuffers.pollFirst(); + ByteBuf receiveBuffer = receivedBuffers.get(wc.getWr_id()); availableCredit--; receiveBuffer.readerIndex(); int segmentSize = ((NetworkBuffer) receiveBuffer).getMemorySegment().size(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaClient.java index c2494aea790e8..3aa875ef07908 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaClient.java @@ -54,7 +54,7 @@ public RdmaClient(NettyConfig rdmaConfig, PartitionRequestClientHandler clientHa this.rdmaConfig = rdmaConfig; this.clientHandler = clientHandler; this.bufferPool = bufferPool; - endpointGroup = new RdmaActiveEndpointGroup(1000, false, 2000, 2, 1000); + endpointGroup = new RdmaActiveEndpointGroup(1000, true, 2000, 2, 1000); endpointGroup.init(this); endpointGroup.getConnParam().setRnr_retry_count((byte)7); this.networkBufferPool=networkBufferPool; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaServer.java index b620812699b2f..6cfe982204465 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaServer.java @@ -53,7 +53,7 @@ public class RdmaServer implements RdmaEndpointFactory endpointGroup; private final NettyConfig rdmaConfig; - private int workRequestId = 1; +// private int workRequestId = 1; private RdmaServerEndpoint serverEndpoint; private InetSocketAddress address; private boolean stopped = false; @@ -112,7 +112,7 @@ private void registerMemoryRegions(RdmaServerEndpoint public void start(Map registerdMRs) throws IOException { // create a EndpointGroup. The RdmaActiveEndpointGroup contains CQ processing and delivers CQ event to the // endpoint.dispatchCqEvent() method. - endpointGroup = new RdmaActiveEndpointGroup(1000, false, 2000, 2, 100); + endpointGroup = new RdmaActiveEndpointGroup(1000, true, 2000, 2, 1000); endpointGroup.init(this); endpointGroup.getConnParam().setRnr_retry_count((byte)7); // create a server endpoint diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaServerRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaServerRequestHandler.java index 6cfbc6e617a5b..5c78270ffa0ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaServerRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaServerRequestHandler.java @@ -38,12 +38,13 @@ public class RdmaServerRequestHandler implements Runnable { public RdmaServerRequestHandler(RdmaServerEndpoint serverEndpoint, ResultPartitionProvider partitionProvider, TaskEventDispatcher - taskEventDispatcher, NettyBufferPool bufferPool, Map registerdMRs) { + taskEventDispatcher, NettyBufferPool bufferPool, Map + registerdMRs) { this.serverEndpoint = serverEndpoint; this.partitionProvider = partitionProvider; this.taskEventDispatcher = taskEventDispatcher; this.bufferPool = bufferPool; - this.registerdMRs= registerdMRs; + this.registerdMRs = registerdMRs; } @Override @@ -75,45 +76,49 @@ private NettyMessage decodeMessageFromBuffer(ByteBuffer clientReq) { return clientMessage; } - private NettyMessage readPartition( NetworkSequenceViewReader reader) throws + private NettyMessage readPartition(NetworkSequenceViewReader reader) throws IOException, InterruptedException { InputChannel.BufferAndAvailability next = null; - next = reader.getNextBuffer(); - if (next == null) { - Throwable cause = reader.getFailureCause(); - if (cause != null) { - NettyMessage.ErrorResponse msg = new NettyMessage.ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - return msg; - }else{ - // False available is set - return null; - } - } else { - // This channel was now removed from the available reader queue. - // We re-add it into the queue if it is still available -// if (next.moreAvailable()) { -// reader.setRegisteredAsAvailable(true); -// } else { -// reader.setRegisteredAsAvailable(false); -// } - NettyMessage.BufferResponse msg = new NettyMessage.BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(), - next.buffersInBacklog(), next.moreAvailable()); + next = reader.getNextBuffer(); + if (next == null) { + Throwable cause = reader.getFailureCause(); + if (cause != null) { + NettyMessage.ErrorResponse msg = new NettyMessage.ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); return msg; + } else { + // False available is set + return null; } + } else { + // This channel was now removed from the available reader queue. + // We re-add it into the queue if it is still available + if (next.moreAvailable()) { + reader.setRegisteredAsAvailable(true); + } else { + reader.setRegisteredAsAvailable(false); + } + NettyMessage.BufferResponse msg = new NettyMessage.BufferResponse( + next.buffer(), + reader.getSequenceNumber(), + reader.getReceiverId(), + next.buffersInBacklog(), next.moreAvailable()); + return msg; + } } private class HandleClientConnection implements Runnable { RdmaShuffleServerEndpoint clientEndpoint; + HandleClientConnection(RdmaShuffleServerEndpoint clientEndpoint) { this.clientEndpoint = clientEndpoint; } - private final ConcurrentHashMap inFlight = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap inFlight = new ConcurrentHashMap<>(); + long lastEventID = -1; + @Override public void run() { try { @@ -125,6 +130,12 @@ public void run() { // we need to do writing in seperate thread, otherwise buffers may not be release on // completion events IbvWC wc = clientEndpoint.getWcEvents().take(); +// if (lastEventID==-1){ +// lastEventID = wc.getWr_id(); +// }else if (lastEventID+1 != wc.getWr_id()){ +// LOG.info("Server: Did not get expected wr_id {} on endpoint {}", wc.getWr_id(), clientEndpoint +// .getEndpointStr()); +// } if (IbvWC.IbvWcOpcode.valueOf(wc.getOpcode()) == IbvWC.IbvWcOpcode.IBV_WC_RECV) { if (wc.getStatus() != IbvWC.IbvWcStatus.IBV_WC_SUCCESS.ordinal()) { LOG.error("Receive posting failed. reposting new receive request"); @@ -135,30 +146,31 @@ public void run() { Class msgClazz = clientRequest.getClass(); if (msgClazz == NettyMessage.CloseRequest.class) { clientClose = true; - LOG.info("closing the endpoint "+getEndpointStr(clientEndpoint)); + LOG.info("closing the endpoint " + getEndpointStr(clientEndpoint)); } else if (msgClazz == NettyMessage.PartitionRequest.class) { // prepare response and post it NettyMessage.PartitionRequest partitionRequest = (NettyMessage.PartitionRequest) clientRequest; reader = new SequenceNumberingViewReader(partitionRequest - .receiverId); + .receiverId, partitionRequest.credit); LOG.info("received partition request: " + partitionRequest.receiverId + " with " + - "initial credit: " + partitionRequest.credit + "at endpoint: " + - getEndpointStr(clientEndpoint)); - reader.addCredit(partitionRequest.credit); + "initial credit: " + partitionRequest.credit + "at endpoint: " + + getEndpointStr(clientEndpoint)); reader.requestSubpartitionView( partitionProvider, partitionRequest.partitionId, partitionRequest.queueIndex); // we need to post receive for next message. for example credit // TODO: We should do executor service here - new Thread(new RDMAWriter(reader,clientEndpoint,inFlight)).start(); - RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId.incrementAndGet()); + new Thread(new RDMAWriter(reader, clientEndpoint, inFlight)).start(); + RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId + .incrementAndGet()); // TODO(venkat): do something better here, we should not poll reader } else if (msgClazz == NettyMessage.TaskEventRequest.class) { NettyMessage.TaskEventRequest request = (NettyMessage.TaskEventRequest) clientRequest; LOG.error("Unhandled request type TaskEventRequest"); - RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId.incrementAndGet()); // post next + RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId + .incrementAndGet()); // post next // receive // TODO (venkat): Handle it if (!taskEventDispatcher.publish(request.partitionId, request.event)) { @@ -169,23 +181,26 @@ public void run() { NettyMessage.CancelPartitionRequest request = (NettyMessage.CancelPartitionRequest) clientRequest; LOG.error("Unhandled request type CancelPartitionRequest"); - RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId.incrementAndGet()); // post next + RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId + .incrementAndGet()); // post next // receive // TODO (venkat): Handle it // outboundQueue.cancel(request.receiverId); } else if (msgClazz == NettyMessage.AddCredit.class) { NettyMessage.AddCredit request = (NettyMessage.AddCredit) clientRequest; // NetworkSequenceViewReader reader = readers.get(request.receiverId); - if (reader!=null) { + if (reader != null) { // LOG.info("Add credit: credit {} on the reader {}",request.credit,reader); reader.addCredit(request.credit); } - RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId.incrementAndGet()); // post next + RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId + .incrementAndGet()); // post next // receive // TODO (venkat): Handle it // outboundQueue.addCredit(request.receiverId, request.credit); } else { - RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId.incrementAndGet()); // post next + RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, clientEndpoint.workRequestId + .incrementAndGet()); // post next // receive LOG.warn("Received unexpected client request: {}", clientRequest); } @@ -193,16 +208,18 @@ public void run() { clientEndpoint.getReceiveBuffer().clear(); } else if (IbvWC.IbvWcOpcode.valueOf(wc.getOpcode()) == IbvWC.IbvWcOpcode.IBV_WC_SEND) { if (wc.getStatus() != IbvWC.IbvWcStatus.IBV_WC_SUCCESS.ordinal()) { - LOG.error("Server:Send failed. reposting new send request request"+getEndpointStr(clientEndpoint)); + LOG.error("Server:Send failed. reposting new send request request" + getEndpointStr + (clientEndpoint)); // RdmaSendReceiveUtil.postSendReq(clientEndpoint, ++workRequestId); } NettyMessage.BufferResponse response; synchronized (inFlight) { - long wc_id=wc.getWr_id(); -// LOG.info("work request completed {}",wc_id); + long wc_id = wc.getWr_id(); +// LOG.info("send work request completed {}", wc_id); response = inFlight.remove(wc_id); - if (response !=null) { -// LOG.info("releasing buffer on send completion for WR {} address {}",wc_id,response.getBuffer().memoryAddress()); + if (response != null) { +// LOG.info("releasing buffer on send completion for WR {} address {}", wc_id, response +// .getBuffer().memoryAddress()); response.releaseBuffer(); } } @@ -224,82 +241,86 @@ public void run() { } } - private class RDMAWriter implements Runnable{ - private NetworkSequenceViewReader reader; + private class RDMAWriter implements Runnable { + private NetworkSequenceViewReader reader; private RdmaShuffleServerEndpoint clientEndpoint; - private ConcurrentHashMap inFlight; - public RDMAWriter(NetworkSequenceViewReader reader,RdmaShuffleServerEndpoint clientEndpoint,ConcurrentHashMap inFlight){ - this.reader= reader; + private ConcurrentHashMap inFlight; + + public RDMAWriter(NetworkSequenceViewReader reader, RdmaShuffleServerEndpoint clientEndpoint, + ConcurrentHashMap inFlight) { + this.reader = reader; this.clientEndpoint = clientEndpoint; this.inFlight = inFlight; } @Override public void run() { - while (!clientEndpoint.isClosed()) { +// while (!clientEndpoint.isClosed()) { try { -// for (NetworkSequenceViewReader reader : readers.values()) { -//// if (reader.isReleased()){ Should remove reader to exit, currently we are only doing single reader -// // so it should be fine for now. -//// readers.remove() -//// } -// if (reader.isReleased()){ -// break; -// } - while (!reader.isReleased()) { - if (reader.isAvailable()) { - if (((SequenceNumberingViewReader) reader).hasCredit()) { - NettyMessage response = readPartition(reader); - if (response == null) { - // False reader available is set, exit the reader here and write to the - // next reader with credit + while (!reader.isReleased()) { + if (reader.isAvailable()) { + NettyMessage response = readPartition(reader); + if (response == null) { + // False reader available is set, exit the reader here and write to the + // next reader with credit // break; + LOG.info("should not be null: reader {} ", reader); + if (!reader.isReleased()) { + // false positiv, wait for reader to update some data + synchronized (reader) { + reader.wait(); + } + } else { + break; + } // response = new NettyMessage.CloseRequest(); - LOG.info("should not be null"); - } - ((SequenceNumberingViewReader) reader).decrementCredit(); - if (response instanceof NettyMessage.BufferResponse) { - NettyMessage.BufferResponse tmpResp = (NettyMessage.BufferResponse) - response; - } else { - LOG.info("skip: sending error message/close request"); - } + + } + if (response instanceof NettyMessage.BufferResponse) { + NettyMessage.BufferResponse tmpResp = (NettyMessage.BufferResponse) + response; + } else { + LOG.info("skip: sending error message/close request"); + } // clientEndpoint.getSendBuffer().put(response.write(bufferPool).nioBuffer()); // clientEndpoint.getReceiveBuffer().clear(); // RdmaSendReceiveUtil.postReceiveReq(clientEndpoint, ++workRequestId); // post next - // receive + // receive - // hold references of the response until the send completes - if (response instanceof NettyMessage.BufferResponse) { - response.write(bufferPool); // creates the header info - long workRequestId= clientEndpoint.workRequestId.incrementAndGet(); - synchronized (inFlight) { -// LOG.info("Add buffer to inFlight: wr {} memory address: {}", workRequestId,((NettyMessage.BufferResponse) response).getBuffer().memoryAddress()); - inFlight.put(workRequestId, (NettyMessage.BufferResponse) response); - } - RdmaSendReceiveUtil.postSendReqForBufferResponse(clientEndpoint,workRequestId , (NettyMessage.BufferResponse) response); - } else { - clientEndpoint.getSendBuffer().put(response.write(bufferPool).nioBuffer()); - RdmaSendReceiveUtil.postSendReq(clientEndpoint, clientEndpoint.workRequestId.incrementAndGet()); - } - } - }else { - synchronized (reader) { - reader.wait(); - } + // hold references of the response until the send completes + if (response instanceof NettyMessage.BufferResponse) { + response.write(bufferPool); // creates the header info + synchronized (inFlight) { + long workRequestId = clientEndpoint.workRequestId.incrementAndGet(); +// LOG.info("Add buffer to inFlight: wr {} memory address: {}", workRequestId, ( +// (NettyMessage.BufferResponse) response).getBuffer().memoryAddress()); + inFlight.put(workRequestId, (NettyMessage.BufferResponse) response); + RdmaSendReceiveUtil.postSendReqForBufferResponse(clientEndpoint, workRequestId, + (NettyMessage.BufferResponse) response); } + } else { + clientEndpoint.getSendBuffer().put(response.write(bufferPool).nioBuffer()); + RdmaSendReceiveUtil.postSendReq(clientEndpoint, clientEndpoint.workRequestId + .incrementAndGet()); + } + } else { + synchronized (reader) { + reader.wait(); } - }catch(Exception e){ - LOG.error("failed to writing out the data ",e); + } } + } catch (Exception e) { + LOG.error("failed to writing out the data ", e); } } +// } } - private String getEndpointStr(RdmaShuffleServerEndpoint clientEndpoint) throws Exception{ - return "src: " + clientEndpoint.getSrcAddr() + " dst: " + - clientEndpoint.getDstAddr(); + private String getEndpointStr(RdmaShuffleServerEndpoint clientEndpoint) throws Exception { + return "src: " + clientEndpoint.getSrcAddr() + " dst: " + + clientEndpoint.getDstAddr(); } + public void stop() { stopped = true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaShuffleServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaShuffleServerEndpoint.java index 9a09a349cf5ca..92ddf5dd66d7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaShuffleServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/RdmaShuffleServerEndpoint.java @@ -123,15 +123,21 @@ public ByteBuffer getSendBuffer() { // public IbvMr getWholeMR(){ // return wholeMR; // } - +public String getEndpointStr() { + try { + return "src: " + this.getSrcAddr() + " dst: " + + this.getDstAddr(); + }catch (Exception e){ + LOG.error("Failed to get the address on client endpoint"); + } + return ""; +} public ByteBuffer getReceiveBuffer() { return this.receiveBuffer; } public ArrayBlockingQueue getWcEvents() { - synchronized (this) { return wcEvents; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/SequenceNumberingViewReader.java index 6f4df64e8913c..6d5e0395d023b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/SequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/rdma/SequenceNumberingViewReader.java @@ -18,12 +18,7 @@ package org.apache.flink.runtime.io.network.rdma; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.NetworkSequenceViewReader; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -34,6 +29,8 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; +import java.io.IOException; + /** * Simple wrapper for the subpartition view used in the old network mode. * @@ -41,22 +38,35 @@ * handler about non-emptiness, similar to the {@link LocalInputChannel}. */ class SequenceNumberingViewReader implements BufferAvailabilityListener, NetworkSequenceViewReader { - private static final Logger LOG = LoggerFactory.getLogger(SequenceNumberingViewReader.class); - private final Object requestLock = new Object(); private final InputChannelID receiverId; +// private final PartitionRequestQueue requestQueue; + private volatile ResultSubpartitionView subpartitionView; + /** + * The status indicating whether this reader is already enqueued in the pipeline for transferring + * data or not. + * + *

It is mainly used to avoid repeated registrations but should be accessed by a single + * thread only since there is no synchronisation. + */ + private boolean isRegisteredAsAvailable = false; + + /** The number of available buffers for holding data on the consumer side. */ + private int numCreditsAvailable; + private int sequenceNumber = -1; - private int credit= 0; - private boolean isRegisteredAvailable; - SequenceNumberingViewReader(InputChannelID receiverId) { + SequenceNumberingViewReader( + InputChannelID receiverId, + int initialCredit) { + this.receiverId = receiverId; -// this.requestQueue = requestQueue; + this.numCreditsAvailable = initialCredit; } @Override @@ -82,38 +92,49 @@ public void requestSubpartitionView( } @Override - public void addCredit(int credit) { + public void addCredit(int creditDeltas) { synchronized (this) { - this.credit = credit; + numCreditsAvailable += creditDeltas; + // Reader thread could be waiting for the release on credit this.notifyAll(); } } - public void decrementCredit(){ - this.credit--; - } - public boolean hasCredit(){ - return credit>0 ; - } - @Override public void setRegisteredAsAvailable(boolean isRegisteredAvailable) { - synchronized (this) { - this.isRegisteredAvailable = isRegisteredAvailable; - this.notifyAll(); - } + this.isRegisteredAsAvailable = isRegisteredAvailable; } @Override public boolean isRegisteredAsAvailable() { - synchronized (this) { - return isRegisteredAvailable; - } + return isRegisteredAsAvailable; } + /** + * Returns true only if the next buffer is an event or the reader has both available + * credits and buffers. + */ @Override public boolean isAvailable() { - return subpartitionView.isAvailable(); + // BEWARE: this must be in sync with #isAvailable(BufferAndBacklog)! + return hasBuffersAvailable() && + numCreditsAvailable > 0 ; + } + + /** + * Check whether this reader is available or not (internal use, in sync with + * {@link #isAvailable()}, but slightly faster). + * + *

Returns true only if the next buffer is an event or the reader has both available + * credits and buffers. + * + * @param bufferAndBacklog + * current buffer and backlog including information about the next buffer + */ + private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + // BEWARE: this must be in sync with #isAvailable()! + return bufferAndBacklog.isMoreAvailable() && + numCreditsAvailable > 0; } @Override @@ -126,12 +147,28 @@ public int getSequenceNumber() { return sequenceNumber; } + @VisibleForTesting + int getNumCreditsAvailable() { + return numCreditsAvailable; + } + + @VisibleForTesting + boolean hasBuffersAvailable() { + return subpartitionView.isAvailable(); + } + @Override public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { sequenceNumber++; - return new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()); + + if (next.buffer().isBuffer() && --numCreditsAvailable < 0) { + throw new IllegalStateException("no credit available"); + } + + return new BufferAndAvailability( + next.buffer(), isAvailable(next), next.buffersInBacklog()); } else { return null; } @@ -159,21 +196,19 @@ public void releaseAllResources() throws IOException { @Override public void notifyDataAvailable() { -// LOG.debug("Received data available notification "+this);// requestQueue.notifyReaderNonEmpty(this); // TODO (venkat): Might read the data before available - synchronized (this) { -// this.setRegisteredAsAvailable(true); + synchronized (this){ this.notifyAll(); } } @Override public String toString() { - return "SequenceNumberingViewReader{" + + return "CreditBasedSequenceNumberingViewReader{" + "requestLock=" + requestLock + ", receiverId=" + receiverId + ", sequenceNumber=" + sequenceNumber + - ", isAvailable=" + isAvailable() + - ", credit="+credit+ + ", numCreditsAvailable=" + numCreditsAvailable + + ", isRegisteredAsAvailable=" + isRegisteredAsAvailable + '}'; } }