Skip to content

Commit

Permalink
Revert "[FLINK-33668][runtime] Redefine the redistribution logic of t…
Browse files Browse the repository at this point in the history
…he NetworkBufferPool"

This reverts commit 012b893.
  • Loading branch information
TanYuxin-tyx authored and reswqa committed Jun 24, 2024
1 parent eaef742 commit 67d23fc
Show file tree
Hide file tree
Showing 44 changed files with 404 additions and 678 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,8 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
@Override
boolean isDestroyed();

/**
* Returns the number of expected memory segments of this buffer pool, representing a value that
* adequately satisfies the requirements for buffer usage.
*/
int getExpectedNumberOfMemorySegments();

/** Returns the number of guaranteed (minimum number of) memory segments of this buffer pool. */
int getMinNumberOfMemorySegments();
int getNumberOfRequiredMemorySegments();

/**
* Returns the maximum number of memory segments this buffer pool should use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,27 @@ public interface BufferPoolFactory {
* Tries to create a buffer pool, which is guaranteed to provide at least the number of required
* buffers.
*
* <p>The buffer pool is of dynamic size ranges from <tt>minUsedBuffers</tt> to
* <tt>maxUsedBuffers</tt>, with <tt>numExpectedBuffers</tt> serving as the weight.
* <p>The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
*
* @param numExpectedBuffers the number of expected network buffers of this pool
* @param minUsedBuffers minimum number of network buffers in this pool
* @param numRequiredBuffers minimum number of network buffers in this pool
* @param maxUsedBuffers maximum number of network buffers this pool offers
*/
BufferPool createBufferPool(int numExpectedBuffers, int minUsedBuffers, int maxUsedBuffers)
throws IOException;
BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException;

/**
* Tries to create a buffer pool with an owner, which is guaranteed to provide at least the
* number of required buffers.
*
* <p>The buffer pool is of dynamic size ranges from <tt>minUsedBuffers</tt> to
* <tt>maxUsedBuffers</tt>, with <tt>numExpectedBuffers</tt> serving as the weight.
* <p>The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
*
* @param numExpectedBuffers the number of expected network buffers of this pool
* @param minUsedBuffers minimum number of network buffers in this pool
* @param numRequiredBuffers minimum number of network buffers in this pool
* @param maxUsedBuffers maximum number of network buffers this pool offers
* @param numSubpartitions number of subpartitions in this pool
* @param maxBuffersPerChannel maximum number of buffers to use for each channel
* @param maxOverdraftBuffersPerGate maximum number of overdraft buffers to use for each gate
*/
BufferPool createBufferPool(
int numExpectedBuffers,
int minUsedBuffers,
int numRequiredBuffers,
int maxUsedBuffers,
int numSubpartitions,
int maxBuffersPerChannel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class LocalBufferPool implements BufferPool {
/** Global network buffer pool to get buffers from. */
private final NetworkBufferPool networkBufferPool;

/** The minimum number of required segments for this pool. */
private final int numberOfRequiredMemorySegments;

/**
* The currently available memory segments. These are segments, which have been requested from
* the network buffer pool and are currently not handed out as Buffer instances.
Expand All @@ -94,18 +97,6 @@ public class LocalBufferPool implements BufferPool {
*/
private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();

/**
* The number of expected memory segments of this buffer pool.
*
* <p>Usually, the buffers in {@link NetworkBufferPool} do not exactly meet the expectations of
* all {@link LocalBufferPool}s, so typically this value is used as a weight to allocate buffers
* to each {@link LocalBufferPool}.
*/
private final int expectedNumberOfMemorySegments;

/** The number of guaranteed (minimum number of) memory segments of this buffer pool. */
private final int minNumberOfMemorySegments;

/** Maximum number of network buffers to allocate. */
private final int maxNumberOfMemorySegments;

Expand Down Expand Up @@ -151,13 +142,12 @@ public class LocalBufferPool implements BufferPool {
* network buffers being available.
*
* @param networkBufferPool global network buffer pool to get buffers from
* @param minNumberOfMemorySegments minimum number of network buffers
* @param numberOfRequiredMemorySegments minimum number of network buffers
*/
LocalBufferPool(NetworkBufferPool networkBufferPool, int minNumberOfMemorySegments) {
LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
this(
networkBufferPool,
minNumberOfMemorySegments,
minNumberOfMemorySegments,
numberOfRequiredMemorySegments,
Integer.MAX_VALUE,
0,
Integer.MAX_VALUE,
Expand All @@ -169,17 +159,16 @@ public class LocalBufferPool implements BufferPool {
* number of network buffers being available.
*
* @param networkBufferPool global network buffer pool to get buffers from
* @param minNumberOfMemorySegments minimum number of network buffers
* @param numberOfRequiredMemorySegments minimum number of network buffers
* @param maxNumberOfMemorySegments maximum number of network buffers to allocate
*/
LocalBufferPool(
NetworkBufferPool networkBufferPool,
int minNumberOfMemorySegments,
int numberOfRequiredMemorySegments,
int maxNumberOfMemorySegments) {
this(
networkBufferPool,
minNumberOfMemorySegments,
minNumberOfMemorySegments,
numberOfRequiredMemorySegments,
maxNumberOfMemorySegments,
0,
Integer.MAX_VALUE,
Expand All @@ -191,48 +180,38 @@ public class LocalBufferPool implements BufferPool {
* with a minimal and maximal number of network buffers being available.
*
* @param networkBufferPool global network buffer pool to get buffers from
* @param expectedNumberOfMemorySegments expected number of network buffers
* @param minNumberOfMemorySegments minimum number of network buffers
* @param numberOfRequiredMemorySegments minimum number of network buffers
* @param maxNumberOfMemorySegments maximum number of network buffers to allocate
* @param numberOfSubpartitions number of subpartitions
* @param maxBuffersPerChannel maximum number of buffers to use for each channel
* @param maxOverdraftBuffersPerGate maximum number of overdraft buffers to use for each gate
*/
LocalBufferPool(
NetworkBufferPool networkBufferPool,
int expectedNumberOfMemorySegments,
int minNumberOfMemorySegments,
int numberOfRequiredMemorySegments,
int maxNumberOfMemorySegments,
int numberOfSubpartitions,
int maxBuffersPerChannel,
int maxOverdraftBuffersPerGate) {
checkArgument(
minNumberOfMemorySegments > 0,
"Minimum number of memory segments (%s) should be larger than 0.",
minNumberOfMemorySegments);
numberOfRequiredMemorySegments > 0,
"Required number of memory segments (%s) should be larger than 0.",
numberOfRequiredMemorySegments);

checkArgument(
expectedNumberOfMemorySegments >= minNumberOfMemorySegments,
"Minimum number of memory segments (%s) should not be larger than expected (%s).",
minNumberOfMemorySegments,
expectedNumberOfMemorySegments);

checkArgument(
maxNumberOfMemorySegments >= expectedNumberOfMemorySegments,
"Maximum number of memory segments (%s) should not be smaller than expected (%s).",
maxNumberOfMemorySegments >= numberOfRequiredMemorySegments,
"Maximum number of memory segments (%s) should not be smaller than minimum (%s).",
maxNumberOfMemorySegments,
expectedNumberOfMemorySegments);
numberOfRequiredMemorySegments);

LOG.debug(
"Using a local buffer pool who has the minimum-expected-maximum number of buffers as {}-{}-{}.",
minNumberOfMemorySegments,
expectedNumberOfMemorySegments,
"Using a local buffer pool with {}-{} buffers",
numberOfRequiredMemorySegments,
maxNumberOfMemorySegments);

this.networkBufferPool = networkBufferPool;
this.expectedNumberOfMemorySegments = expectedNumberOfMemorySegments;
this.currentPoolSize = minNumberOfMemorySegments;
this.minNumberOfMemorySegments = minNumberOfMemorySegments;
this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
this.currentPoolSize = numberOfRequiredMemorySegments;
this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;

if (numberOfSubpartitions > 0) {
Expand Down Expand Up @@ -268,8 +247,8 @@ public class LocalBufferPool implements BufferPool {
@Override
public void reserveSegments(int numberOfSegmentsToReserve) throws IOException {
checkArgument(
numberOfSegmentsToReserve <= minNumberOfMemorySegments,
"Can not reserve more segments than number of minimum segments.");
numberOfSegmentsToReserve <= numberOfRequiredMemorySegments,
"Can not reserve more segments than number of required segments.");

CompletableFuture<?> toNotify = null;
synchronized (availableMemorySegments) {
Expand All @@ -293,13 +272,8 @@ public boolean isDestroyed() {
}

@Override
public int getExpectedNumberOfMemorySegments() {
return expectedNumberOfMemorySegments;
}

@Override
public int getMinNumberOfMemorySegments() {
return minNumberOfMemorySegments;
public int getNumberOfRequiredMemorySegments() {
return numberOfRequiredMemorySegments;
}

@Override
Expand All @@ -312,13 +286,13 @@ public int getMaxNumberOfMemorySegments() {
*
* @return the same value as {@link #getMaxNumberOfMemorySegments()} for bounded pools. For
* unbounded pools it returns an approximation based upon {@link
* #getExpectedNumberOfMemorySegments()}
* #getNumberOfRequiredMemorySegments()}
*/
public int getEstimatedNumberOfRequestedMemorySegments() {
if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE) {
return maxNumberOfMemorySegments;
} else {
return getExpectedNumberOfMemorySegments();
return getNumberOfRequiredMemorySegments() * 2;
}
}

Expand Down Expand Up @@ -690,9 +664,9 @@ public void setNumBuffers(int numBuffers) {
CompletableFuture<?> toNotify;
synchronized (availableMemorySegments) {
checkArgument(
numBuffers >= minNumberOfMemorySegments,
numBuffers >= numberOfRequiredMemorySegments,
"Buffer pool needs at least %s buffers, but tried to set to %s",
minNumberOfMemorySegments,
numberOfRequiredMemorySegments,
numBuffers);

currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
Expand Down Expand Up @@ -730,14 +704,12 @@ public CompletableFuture<?> getAvailableFuture() {
public String toString() {
synchronized (availableMemorySegments) {
return String.format(
this.hashCode()
+ "[size: %d, requested: %d, available: %d, expected: %d, min: %d, max: %d, listeners: %d,"
"[size: %d, required: %d, requested: %d, available: %d, max: %d, listeners: %d,"
+ "subpartitions: %d, maxBuffersPerChannel: %d, destroyed: %s]",
currentPoolSize,
numberOfRequiredMemorySegments,
numberOfRequestedMemorySegments,
availableMemorySegments.size(),
expectedNumberOfMemorySegments,
minNumberOfMemorySegments,
maxNumberOfMemorySegments,
registeredListeners.size(),
subpartitionBuffersCount.length,
Expand Down
Loading

0 comments on commit 67d23fc

Please sign in to comment.