Skip to content

Commit

Permalink
ARROW-2019: [JAVA] Control the memory allocated for inner vector in LIST
Browse files Browse the repository at this point in the history
  • Loading branch information
siddharthteotia committed Jan 25, 2018
1 parent 4a661ed commit 9e828ee
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,39 @@ public void setInitialCapacity(int valueCount) {
offsetAllocationSizeInBytes = (valueCount + 1) * OFFSET_WIDTH;
}

/**
* Sets the desired value capacity for the vector. This function doesn't
* allocate any memory for the vector.
* @param valueCount desired number of elements in the vector
* @param density average number of bytes per variable width element
*/
public void setInitialCapacity(int valueCount, double density) {
final long size = (long) (valueCount * density);
if (size > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
}
valueAllocationSizeInBytes = (int) size;
validityAllocationSizeInBytes = getValidityBufferSizeFromCount(valueCount);
/* to track the end offset of last data element in vector, we need
* an additional slot in offset buffer.
*/
offsetAllocationSizeInBytes = (valueCount + 1) * OFFSET_WIDTH;
}

/**
* Get the density of this ListVector
* @return density
*/
public double getDensity() {
if (valueCount == 0) {
return 0.0D;
}
final int startOffset = offsetBuffer.getInt(0);
final int endOffset = offsetBuffer.getInt(valueCount * OFFSET_WIDTH);
final double totalListSize = endOffset - startOffset;
return totalListSize/valueCount;
}

/**
* Get the current value capacity for the vector
* @return number of elements that vector can hold.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,38 @@ public void setInitialCapacity(int numRecords) {
}
}

/**
* Specialized version of setInitialCapacity() for ListVector. This is
* used by some callers when they want to explicitly control and be
* conservative about memory allocated for inner data vector. This is
* very useful when we are working with memory constraints for a query
* and have a fixed amount of memory reserved for the record batch. In
* such cases, we are likely to face OOM or related problems when
* we reserve memory for a record batch with value count x and
* do setInitialCapacity(x) such that each vector allocates only
* what is necessary and not the default amount but the multiplier
* forces the memory requirement to go beyond what was needed.
*
* @param numRecords value count
* @param density density of ListVector. Density is the average size of
* list per position in the List vector. For example, a
* density value of 10 implies each position in the list
* vector has a list of 10 values.
* A density value of 0.1 implies out of 10 positions in
* the list vector, 1 position has a list of size 1 and
* remaining positions are null (no lists) or empty lists.
* This helps in tightly controlling the memory we provision
* for inner data vector.
*/
public void setInitialCapacity(int numRecords, double density) {
if ((numRecords * density) >= 2_000_000_000) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
}
offsetAllocationSizeInBytes = (numRecords + 1) * OFFSET_WIDTH;
final int innerValueCapacity = (int)(numRecords * density);
vector.setInitialCapacity(innerValueCapacity);
}

@Override
public int getValueCapacity() {
final int offsetValueCapacity = Math.max(getOffsetBufferValueCapacity() - 1, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,7 @@
import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.impl.ComplexCopier;
import org.apache.arrow.vector.complex.impl.UnionListReader;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
Expand Down Expand Up @@ -102,6 +97,57 @@ public void initializeChildrenFromFields(List<Field> children) {
addOrGetVector.getVector().initializeChildrenFromFields(field.getChildren());
}

@Override
public void setInitialCapacity(int numRecords) {
validityAllocationSizeInBytes = getValidityBufferSizeFromCount(numRecords);
super.setInitialCapacity(numRecords);
}

/**
* Specialized version of setInitialCapacity() for ListVector. This is
* used by some callers when they want to explicitly control and be
* conservative about memory allocated for inner data vector. This is
* very useful when we are working with memory constraints for a query
* and have a fixed amount of memory reserved for the record batch. In
* such cases, we are likely to face OOM or related problems when
* we reserve memory for a record batch with value count x and
* do setInitialCapacity(x) such that each vector allocates only
* what is necessary and not the default amount but the multiplier
* forces the memory requirement to go beyond what was needed.
*
* @param numRecords value count
* @param density density of ListVector. Density is the average size of
* list per position in the List vector. For example, a
* density value of 10 implies each position in the list
* vector has a list of 10 values.
* A density value of 0.1 implies out of 10 positions in
* the list vector, 1 position has a list of size 1 and
* remaining positions are null (no lists). This helps
* in tightly controlling the memory we provision for
* inner data vector.
* remaining positions are null (no lists) or empty lists.
* This helps in tightly controlling the memory we provision
* for inner data vector.
*/
public void setInitialCapacity(int numRecords, double density) {
validityAllocationSizeInBytes = getValidityBufferSizeFromCount(numRecords);
super.setInitialCapacity(numRecords, density);
}

/**
* Get the density of this ListVector
* @return density
*/
public double getDensity() {
if (valueCount == 0) {
return 0.0D;
}
final int startOffset = offsetBuffer.getInt(0);
final int endOffset = offsetBuffer.getInt(valueCount * OFFSET_WIDTH);
final double totalListSize = endOffset - startOffset;
return totalListSize/valueCount;
}

@Override
public List<FieldVector> getChildrenFromFields() {
return singletonList(getDataVector());
Expand Down Expand Up @@ -616,7 +662,7 @@ public int getNullCount() {
*/
@Override
public int getValueCapacity() {
return Math.min(getValidityBufferValueCapacity(), super.getValueCapacity());
return getValidityAndOffsetValueCapacity();
}

private int getValidityAndOffsetValueCapacity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public void testCopyFrom() throws Exception {
result = outVector.getObject(2);
resultSet = (ArrayList<Long>) result;
assertEquals(0, resultSet.size());

/* 3+0+0/3 */
assertEquals(1.0D, inVector.getDensity(), 0);
}
}

Expand Down Expand Up @@ -209,6 +212,9 @@ public void testSetLastSetUsage() throws Exception {
listVector.setLastSet(3);
listVector.setValueCount(10);

/* (3+2+3)/10 */
assertEquals(0.8D, listVector.getDensity(), 0);

index = 0;
offset = offsetBuffer.getInt(index * ListVector.OFFSET_WIDTH);
assertEquals(Integer.toString(0), Integer.toString(offset));
Expand Down Expand Up @@ -709,6 +715,8 @@ public void testGetBufferAddress() throws Exception {
listWriter.bigInt().writeBigInt(300);
listWriter.endList();

listVector.setValueCount(2);

/* check listVector contents */
Object result = listVector.getObject(0);
ArrayList<Long> resultSet = (ArrayList<Long>) result;
Expand Down Expand Up @@ -739,6 +747,9 @@ public void testGetBufferAddress() throws Exception {
assertEquals(2, buffers.size());
assertEquals(bitAddress, buffers.get(0).memoryAddress());
assertEquals(offsetAddress, buffers.get(1).memoryAddress());

/* (3+2)/2 */
assertEquals(2.5, listVector.getDensity(), 0);
}
}

Expand All @@ -753,4 +764,52 @@ public void testConsistentChildName() throws Exception {
assertTrue(emptyVectorStr.contains(ListVector.DATA_VECTOR_NAME));
}
}

@Test
public void testSetInitialCapacity() {
try (final ListVector vector = ListVector.empty("", allocator)) {
vector.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));

/**
* use the default multiplier of 5,
* 512 * 5 => 2560 * 4 => 10240 bytes => 16KB => 4096 value capacity.
*/
vector.setInitialCapacity(512);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
assertEquals(4096, vector.getDataVector().getValueCapacity());

/* use density as 4 */
vector.setInitialCapacity(512, 4);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
assertEquals(512*4, vector.getDataVector().getValueCapacity());

/**
* inner value capacity we pass to data vector is 512 * 0.1 => 51
* For an int vector this is 204 bytes of memory for data buffer
* and 7 bytes for validity buffer.
* and with power of 2 allocation, we allocate 256 bytes and 8 bytes
* for the data buffer and validity buffer of the inner vector. Thus
* value capacity of inner vector is 64
*/
vector.setInitialCapacity(512, 0.1);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
assertEquals(64, vector.getDataVector().getValueCapacity());

/**
* inner value capacity we pass to data vector is 512 * 0.01 => 5
* For an int vector this is 20 bytes of memory for data buffer
* and 1 byte for validity buffer.
* and with power of 2 allocation, we allocate 32 bytes and 1 bytes
* for the data buffer and validity buffer of the inner vector. Thus
* value capacity of inner vector is 8
*/
vector.setInitialCapacity(512, 0.01);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
assertEquals(8, vector.getDataVector().getValueCapacity());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1925,4 +1925,30 @@ public static void setBytes(int index, byte[] bytes, NullableVarCharVector vecto
vector.offsetBuffer.setInt((index + 1) * vector.OFFSET_WIDTH, currentOffset + bytes.length);
vector.valueBuffer.setBytes(currentOffset, bytes, 0, bytes.length);
}

@Test /* VarCharVector */
public void testSetInitialCapacity() {
try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
/* use the default 8 data bytes on average per element */
vector.setInitialCapacity(4096);
vector.allocateNew();
assertEquals(4096, vector.getValueCapacity());
assertEquals(4096 * 8, vector.getDataBuffer().capacity());

vector.setInitialCapacity(4096, 1);
vector.allocateNew();
assertEquals(4096, vector.getValueCapacity());
assertEquals(4096, vector.getDataBuffer().capacity());

vector.setInitialCapacity(4096, 0.1);
vector.allocateNew();
assertEquals(4096, vector.getValueCapacity());
assertEquals(512, vector.getDataBuffer().capacity());

vector.setInitialCapacity(4096, 0.01);
vector.allocateNew();
assertEquals(4096, vector.getValueCapacity());
assertEquals(64, vector.getDataBuffer().capacity());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,17 @@ public void testListType() {
vector.setInitialCapacity(512);
vector.allocateNew();

assertEquals(1023, vector.getValueCapacity());
assertEquals(512, vector.getValueCapacity());

try {
vector.getOffsetVector().getAccessor().get(2014);
vector.getOffsetBuffer().getInt(2014 * 4);
Assert.fail("Expected out of bounds exception");
} catch (Exception e) {
// ok
}

vector.reAlloc();
assertEquals(2047, vector.getValueCapacity()); // note: size - 1
assertEquals(1024, vector.getValueCapacity());
assertEquals(0, vector.getOffsetBuffer().getInt(2014 * ListVector.OFFSET_WIDTH));
}
}
Expand Down

0 comments on commit 9e828ee

Please sign in to comment.