Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-9229: Reducing memory used by HashIdentityMap initialization #3

Open
wants to merge 10 commits into
base: dremio
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class AllocationManager {
private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
private final int size;
private final UnsafeDirectLittleEndian underlying;
private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>();
private final AllocatorLedgerMapWrap map = new AllocatorLedgerMapWrap();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
Expand Down Expand Up @@ -140,7 +140,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
return existingLedger;
}

final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
final BufferLedger ledger = new BufferLedger(allocator);
if (retain) {
ledger.inc();
}
Expand All @@ -151,54 +151,44 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
}
}


/**
* The way that a particular BufferLedger communicates back to the AllocationManager that it
* now longer needs to hold
* a reference to particular piece of memory.
* Can only be called when you already hold the writeLock.
*/
private class ReleaseListener {

private final BufferAllocator allocator;

public ReleaseListener(BufferAllocator allocator) {
this.allocator = allocator;
}

/**
* Can only be called when you already hold the writeLock.
*/
public void release() {
allocator.assertOpen();
private void release(final BufferLedger ledger) {
final BaseAllocator allocator = ledger.getAllocator();
allocator.assertOpen();

final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);
final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);

if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
BufferLedger newLedger = map.values().iterator().next();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
// AllocatorLedgerMapWrap doe snot support map.values().iterator().next(); in some cases
// use custom method: getNextValue()
//BufferLedger newLedger = map.values().iterator().next();
BufferLedger newLedger = map.getNextValue();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}


}
}

Expand All @@ -220,17 +210,23 @@ public class BufferLedger {
// manage request for retain
// correctly
private final long lCreationTime = System.nanoTime();
private final BaseAllocator allocator;
private final ReleaseListener listener;
final BaseAllocator allocator;
private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog
(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1)
: null;
private volatile long lDestructionTime = 0;

private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
private BufferLedger(BaseAllocator allocator) {
this.allocator = allocator;
this.listener = listener;
}

/**
* Get the allocator for this ledger
* @return allocator
*/
private BaseAllocator getAllocator() {
return allocator;
}

/**
Expand Down Expand Up @@ -339,7 +335,7 @@ public int decrement(int decrement) {
outcome = bufRefCnt.addAndGet(-decrement);
if (outcome == 0) {
lDestructionTime = System.nanoTime();
listener.release();
release(this);
}
}

Expand Down Expand Up @@ -463,4 +459,4 @@ boolean isOwningLedger() {

}

}
}
Loading