Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use BlockingQueue in LazyMemorySegmentPool #119

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static com.alibaba.fluss.utils.Preconditions.checkArgument;
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
Expand All @@ -55,7 +59,7 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {
private final ReentrantLock lock = new ReentrantLock();

@GuardedBy("lock")
private final List<MemorySegment> cachePages;
private final BlockingQueue<MemorySegment> cachePages;

@GuardedBy("lock")
private final Deque<Condition> waiters;
Expand All @@ -79,7 +83,7 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {
"Page size should be less than PER_REQUEST_MEMORY_SIZE. Page size is:"
+ " %s KB, PER_REQUEST_MEMORY_SIZE is %s KB.",
pageSize / 1024, PER_REQUEST_MEMORY_SIZE / 1024));
this.cachePages = new ArrayList<>();
this.cachePages = new LinkedBlockingQueue<>();
this.pageUsage = 0;
this.maxPages = maxPages;
this.pageSize = pageSize;
Expand Down Expand Up @@ -107,52 +111,44 @@ public static LazyMemorySegmentPool create(Configuration conf) {

@Override
public MemorySegment nextSegment(boolean waiting) {
return inLock(
inLock(
lock,
() -> {
checkClosed();
int freePages = freePages();
if (freePages == 0) {
if (waiting) {
return waitForSegment();
} else {
return null;
}
}

if (cachePages.isEmpty()) {
int numPages = Math.min(freePages, perRequestPages);
for (int i = 0; i < numPages; i++) {
cachePages.add(MemorySegment.allocateHeapMemory(pageSize));
}
}

this.pageUsage++;
return cachePages.remove(this.cachePages.size() - 1);
if (freePages != 0) {
this.pageUsage++;
}
});
return waitForSegment(waiting);
}

private MemorySegment waitForSegment() {
private MemorySegment waitForSegment(boolean waiting) {
Condition moreMemory = lock.newCondition();
waiters.addLast(moreMemory);
try {
while (cachePages.isEmpty()) {
boolean success = moreMemory.await(maxTimeToBlockMs, TimeUnit.MILLISECONDS);
if (!success) {
throw new BufferExhaustedException(
"Failed to allocate new segment within the configured max blocking time "
+ maxTimeToBlockMs
+ " ms. Total memory: "
+ totalSize()
+ " bytes. Available memory: "
+ freePages() * pageSize
+ " bytes. page size: "
+ pageSize
+ " bytes");
}
checkClosed();
MemorySegment memorySegment = cachePages.poll(waiting ? maxTimeToBlockMs : 0, TimeUnit.MILLISECONDS);
if (Objects.isNull(memorySegment) && waiting) {
throw new BufferExhaustedException(
"Failed to allocate new segment within the configured max blocking time "
+ maxTimeToBlockMs
+ " ms. Total memory: "
+ totalSize()
+ " bytes. Available memory: "
+ freePages() * pageSize
+ " bytes. page size: "
+ pageSize
+ " bytes");
}
return cachePages.remove(cachePages.size() - 1);
checkClosed();
return memorySegment;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new FlussRuntimeException(e);
Expand Down Expand Up @@ -218,6 +214,6 @@ public int queued() {

@VisibleForTesting
public List<MemorySegment> getAllCachePages() {
return cachePages;
return new ArrayList<>(cachePages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;

import static com.alibaba.fluss.utils.function.ThrowingRunnable.unchecked;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -52,6 +53,7 @@ void testNextSegmentWaiter() throws InterruptedException {
CountDownLatch returnAllLatch = asyncReturnAll(source, Arrays.asList(ms1, ms2));
CountDownLatch getNextSegmentLatch = asyncGetNextSegment(source);
assertThat(getNextSegmentLatch.getCount()).isEqualTo(1);
assertThat(source.queued()).isEqualTo(1);
returnAllLatch.countDown();
assertThat(getNextSegmentLatch.await(Long.MAX_VALUE, TimeUnit.SECONDS)).isTrue();
}
Expand All @@ -77,6 +79,28 @@ void testIllegalArgument() {
.hasMessage("Return too more memories.");
}

@Test
void testMultiThreadCallNextSegment() throws InterruptedException {
LazyMemorySegmentPool source = buildLazyMemorySegmentSource(1, 10);
assertThat(source.pageSize()).isEqualTo(10);
assertThat(source.freePages()).isEqualTo(1);

CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
source.nextSegment(true);
countDownLatch.countDown();
}).start();
new Thread(() -> {
source.nextSegment(true);
countDownLatch.countDown();
}).start();

boolean await = countDownLatch.await(2000, TimeUnit.MILLISECONDS);
assertThat(await).isFalse();
assertThat(source.freePages()).isEqualTo(0);
assertThat(source.queued()).isEqualTo(1);
}

private LazyMemorySegmentPool buildLazyMemorySegmentSource(int maxPages, int pageSize) {
return new LazyMemorySegmentPool(maxPages, pageSize, Long.MAX_VALUE);
}
Expand Down