diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index 70ca1dc32a1b3..d4214475fc6e3 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -74,7 +74,7 @@ public class AllocationManager { private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet(); private final int size; private final UnsafeDirectLittleEndian underlying; - private final IdentityHashMap map = new IdentityHashMap<>(); + private final AllocatorLedgerMapWrap map = new AllocatorLedgerMapWrap(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock()); private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock()); @@ -140,7 +140,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta return existingLedger; } - final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator)); + final BufferLedger ledger = new BufferLedger(allocator); if (retain) { ledger.inc(); } @@ -151,54 +151,44 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta } } - /** * The way that a particular BufferLedger communicates back to the AllocationManager that it * now longer needs to hold * a reference to particular piece of memory. + * Can only be called when you already hold the writeLock. */ - private class ReleaseListener { - - private final BufferAllocator allocator; - - public ReleaseListener(BufferAllocator allocator) { - this.allocator = allocator; - } - - /** - * Can only be called when you already hold the writeLock. - */ - public void release() { - allocator.assertOpen(); + private void release(final BufferLedger ledger) { + final BaseAllocator allocator = ledger.getAllocator(); + allocator.assertOpen(); - final BufferLedger oldLedger = map.remove(allocator); - oldLedger.allocator.dissociateLedger(oldLedger); + final BufferLedger oldLedger = map.remove(allocator); + oldLedger.allocator.dissociateLedger(oldLedger); - if (oldLedger == owningLedger) { - if (map.isEmpty()) { - // no one else owns, lets release. - oldLedger.allocator.releaseBytes(size); - underlying.release(); - amDestructionTime = System.nanoTime(); - owningLedger = null; - } else { - // we need to change the owning allocator. we've been removed so we'll get whatever is - // top of list - BufferLedger newLedger = map.values().iterator().next(); - - // we'll forcefully transfer the ownership and not worry about whether we exceeded the - // limit - // since this consumer can't do anything with this. - oldLedger.transferBalance(newLedger); - } + if (oldLedger == owningLedger) { + if (map.isEmpty()) { + // no one else owns, lets release. + oldLedger.allocator.releaseBytes(size); + underlying.release(); + amDestructionTime = System.nanoTime(); + owningLedger = null; } else { - if (map.isEmpty()) { - throw new IllegalStateException("The final removal of a ledger should be connected to " + - "the owning ledger."); - } + // we need to change the owning allocator. we've been removed so we'll get whatever is + // top of list + // AllocatorLedgerMapWrap doe snot support map.values().iterator().next(); in some cases + // use custom method: getNextValue() + //BufferLedger newLedger = map.values().iterator().next(); + BufferLedger newLedger = map.getNextValue(); + + // we'll forcefully transfer the ownership and not worry about whether we exceeded the + // limit + // since this consumer can't do anything with this. + oldLedger.transferBalance(newLedger); + } + } else { + if (map.isEmpty()) { + throw new IllegalStateException("The final removal of a ledger should be connected to " + + "the owning ledger."); } - - } } @@ -220,17 +210,23 @@ public class BufferLedger { // manage request for retain // correctly private final long lCreationTime = System.nanoTime(); - private final BaseAllocator allocator; - private final ReleaseListener listener; + final BaseAllocator allocator; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog (BaseAllocator.DEBUG_LOG_LENGTH, "BufferLedger[%d]", 1) : null; private volatile long lDestructionTime = 0; - private BufferLedger(BaseAllocator allocator, ReleaseListener listener) { + private BufferLedger(BaseAllocator allocator) { this.allocator = allocator; - this.listener = listener; + } + + /** + * Get the allocator for this ledger + * @return allocator + */ + private BaseAllocator getAllocator() { + return allocator; } /** @@ -339,7 +335,7 @@ public int decrement(int decrement) { outcome = bufRefCnt.addAndGet(-decrement); if (outcome == 0) { lDestructionTime = System.nanoTime(); - listener.release(); + release(this); } } @@ -463,4 +459,4 @@ boolean isOwningLedger() { } -} \ No newline at end of file +} diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocatorLedgerMapWrap.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorLedgerMapWrap.java new file mode 100644 index 0000000000000..4c72f43c228ed --- /dev/null +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocatorLedgerMapWrap.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import java.util.Collection; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Preconditions; + +/** + * Wrapper on top of IdentityHashMap that allows + * to deal with small number of items without going into real HashMap + * For the future - shrink map when size goes significantly down + */ +public class AllocatorLedgerMapWrap implements Map { + + private static int MAX_LINEAR_ELEMENTS = 3; + + private Object [] table; + private IdentityHashMap internalMap; + private int size; + + AllocatorLedgerMapWrap() { + table = new Object[2]; + } + + @Override + public int size() { + return (internalMap != null) ? internalMap.size() : size; + } + + @Override + public boolean isEmpty() { + return (internalMap != null) ? internalMap.isEmpty() : (size == 0) ? true : false; + } + + @Override + public boolean containsKey(Object key) { + if (internalMap != null) { + return internalMap.containsKey(key); + } + + Preconditions.checkState(size <= MAX_LINEAR_ELEMENTS); + final Object[] tab = table; + final int tabLength = tab.length; + for (int i = 0; i < tabLength; i++) { + if (tab[i] != null) { + AllocationManager.BufferLedger value = (AllocationManager.BufferLedger) tab[i]; + if (value.allocator == key) { + return true; + } + } + } + return false; + } + + @Override + public boolean containsValue(Object value) { + if (internalMap != null) { + return internalMap.containsValue(value); + } + + Preconditions.checkState(size <= MAX_LINEAR_ELEMENTS); + final Object[] tab = table; + final int tabLength = tab.length; + for (int i = 0; i < tabLength; i++) { + if (tab[i] != null) { + AllocationManager.BufferLedger bufLedger = (AllocationManager.BufferLedger) tab[i]; + if (bufLedger == value) { + return true; + } + } + } + return false; + } + + @Override + public AllocationManager.BufferLedger get(Object key) { + if (internalMap != null) { + return internalMap.get(key); + } + + Preconditions.checkState(size <= MAX_LINEAR_ELEMENTS); + final Object[] tab = table; + final int tabLength = tab.length; + for (int i = 0; i < tabLength; i++) { + if (tab[i] != null) { + AllocationManager.BufferLedger bufLedger = (AllocationManager.BufferLedger) tab[i]; + if (bufLedger.allocator == key) { + return bufLedger; + } + } + } + return null; + } + + @Override + public AllocationManager.BufferLedger put(BaseAllocator key, AllocationManager.BufferLedger value) { + if (internalMap != null) { + AllocationManager.BufferLedger oldValue = internalMap.put(key, value); + size = internalMap.size(); + return oldValue; + } + + final int currentSize = size; + if (currentSize < MAX_LINEAR_ELEMENTS) { + int emptyIndex = -1; + final Object[] tab = table; + final int tabLength = tab.length; + for (int i = 0; i < tabLength; i++) { + if (tab[i] != null) { + AllocationManager.BufferLedger bufLedger = (AllocationManager.BufferLedger) tab[i]; + if (bufLedger.allocator == key) { + return bufLedger; + } + } else { + emptyIndex = i; + } + } + // insert at empty index if available + if (emptyIndex == -1) { + // no empty index - increase size + Object[] newTable = new Object[currentSize + 1]; + for (int i = 0; i < tabLength; i++) { + newTable[i] = tab[i]; + tab[i] = null; + } + table = newTable; + emptyIndex = currentSize; + } + if (emptyIndex > -1) { + table[emptyIndex] = value; + size++; + return null; + } + } else { + // we crossed the boundary + internalMap = new IdentityHashMap<>(MAX_LINEAR_ELEMENTS); + for (int i = 0; i < table.length; i++) { + if (table[i] != null) { + AllocationManager.BufferLedger bufLedger = (AllocationManager.BufferLedger) table[i]; + internalMap.put(bufLedger.allocator, bufLedger); + table[i] = null; + } + } + table = null; + AllocationManager.BufferLedger oldValue = internalMap.put(key, value); + size = internalMap.size(); + return oldValue; + } + throw new IllegalStateException("Should not reach here"); + } + + @Override + public AllocationManager.BufferLedger remove(Object key) { + if (internalMap != null) { + AllocationManager.BufferLedger oldValue = internalMap.remove(key); + size = internalMap.size(); + return oldValue; + } + + Preconditions.checkState(size <= MAX_LINEAR_ELEMENTS); + final Object[] tab = table; + final int tabLength = tab.length; + for (int i = 0; i < tabLength; i++) { + if (tab[i] != null) { + AllocationManager.BufferLedger bufLedger = (AllocationManager.BufferLedger) tab[i]; + if (bufLedger.allocator == key) { + tab[i] = null; + size--; + return bufLedger; + } + } + } + return null; + } + + @Override + public void putAll(Map m) { + if (internalMap != null) { + internalMap.putAll(m); + return; + } + throw new UnsupportedOperationException("putAll() method is not supported for map size < " + MAX_LINEAR_ELEMENTS); + } + + @Override + public void clear() { + if (internalMap != null) { + internalMap.clear(); + internalMap = null; + } + Object[] tab = table; + for (int i = 0; i < tab.length; i++) { + tab[i] = null; + } + size = 0; + } + + @Override + public Set keySet() { + if (internalMap != null) { + return internalMap.keySet(); + } + throw new UnsupportedOperationException("keySet() method is not supported for map size < " + MAX_LINEAR_ELEMENTS); + } + + public AllocationManager.BufferLedger getNextValue() { + if (internalMap != null) { + return internalMap.values().iterator().next(); + } + Preconditions.checkState(size <= MAX_LINEAR_ELEMENTS); + final Object[] tab = table; + final int tabLength = tab.length; + for (int i = 0; i < tabLength; i++) { + if (tab[i] != null) { + AllocationManager.BufferLedger bufLedger = (AllocationManager.BufferLedger) tab[i]; + return bufLedger; + } + } + return null; + } + + @Override + public Collection values() { + if (internalMap != null) { + return internalMap.values(); + } + + throw new UnsupportedOperationException("values() method is not supported for map size < " + MAX_LINEAR_ELEMENTS); + } + + @Override + public Set> entrySet() { + if (internalMap != null) { + return internalMap.entrySet(); + } + throw new UnsupportedOperationException("entrySet() method is not supported for map size < " + MAX_LINEAR_ELEMENTS); + } +} diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index ddc78f03f0f72..a0c1c37060664 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -133,7 +133,7 @@ private static String createErrorMsg(final BufferAllocator allocator, final int * @param val An integer value. * @return The closest power of two of that value. */ - static int nextPowerOfTwo(int val) { + public static int nextPowerOfTwo(int val) { int highestBit = Integer.highestOneBit(val); if (highestBit == val) { return val; @@ -142,6 +142,21 @@ static int nextPowerOfTwo(int val) { } } + /** + * Rounds up the provided value to the nearest power of two. + * + * @param val A long value. + * @return The closest power of two of that value. + */ + public static long nextPowerOfTwo(long val) { + long highestBit = Long.highestOneBit(val); + if (highestBit == val) { + return val; + } else { + return highestBit << 1; + } + } + public static StringBuilder indent(StringBuilder sb, int indent) { final char[] indentation = new char[indent * 2]; Arrays.fill(indentation, ' '); diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 59b7be87e17be..78f72260c7c92 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -653,6 +653,71 @@ public void multiple() throws Exception { } } + @Test + public void testAllocator_transferSharedNoRelease() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferShared1", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferShared2", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("transferShared3", 0, MAX_ALLOCATION); + + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8); + + boolean allocationFit; + + ArrowBuf arrowBuf2 = arrowBuf1.retain(childAllocator2); + rootAllocator.verify(); + assertNotNull(arrowBuf2); + assertNotEquals(arrowBuf2, arrowBuf1); + assertEquiv(arrowBuf1, arrowBuf2); + + TransferResult result = arrowBuf1.transferOwnership(childAllocator3); + allocationFit = result.allocationFit; + final ArrowBuf arrowBuf3 = result.buffer; + assertTrue(allocationFit); + assertEquiv(arrowBuf1, arrowBuf3); + rootAllocator.verify(); + + final BufferAllocator childAllocator4 = rootAllocator.newChildAllocator("transferShared4", 0, MAX_ALLOCATION); + TransferResult result2 = arrowBuf3.transferOwnership(childAllocator4); + allocationFit = result.allocationFit; + final ArrowBuf arrowBuf4 = result2.buffer; + assertTrue(allocationFit); + assertEquiv(arrowBuf3, arrowBuf4); + rootAllocator.verify(); + + // Since childAllocator3 now has childAllocator1's buffer, 1, can close + arrowBuf1.release(); + childAllocator1.close(); + rootAllocator.verify(); + + arrowBuf2.release(); + childAllocator2.close(); + rootAllocator.verify(); + + + final BufferAllocator childAllocator5 = rootAllocator.newChildAllocator("transferShared5", 0, MAX_ALLOCATION); + TransferResult result3 = arrowBuf4.transferOwnership(childAllocator5); + allocationFit = result.allocationFit; + final ArrowBuf arrowBuf5 = result3.buffer; + assertTrue(allocationFit); + assertEquiv(arrowBuf4, arrowBuf5); + rootAllocator.verify(); + + arrowBuf3.release(); + childAllocator3.close(); + rootAllocator.verify(); + + arrowBuf4.release(); + childAllocator4.close(); + rootAllocator.verify(); + + arrowBuf5.release(); + childAllocator5.close(); + rootAllocator.verify(); + + } + } + public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) { assertEquals(origBuf.readerIndex(), newBuf.readerIndex()); assertEquals(origBuf.writerIndex(), newBuf.writerIndex()); diff --git a/java/vector/src/main/codegen/templates/FixedValueVectors.java b/java/vector/src/main/codegen/templates/FixedValueVectors.java index 5d92cd232efb3..b5e9f27badbd4 100644 --- a/java/vector/src/main/codegen/templates/FixedValueVectors.java +++ b/java/vector/src/main/codegen/templates/FixedValueVectors.java @@ -186,14 +186,21 @@ private void allocateBytes(final long size) { * @throws org.apache.arrow.memory.OutOfMemoryException if it can't allocate the new buffer */ public void reAlloc() { - final long newAllocationSize = allocationSizeInBytes * 2L; - if (newAllocationSize > MAX_ALLOCATION_SIZE) { + long baseSize = allocationSizeInBytes; + final int currentBufferCapacity = data.capacity(); + if (baseSize < (long)currentBufferCapacity) { + baseSize = (long)currentBufferCapacity; + } + long newAllocationSize = baseSize * 2L; + newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize); + + if (newAllocationSize > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached."); } logger.debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]", name, allocationSizeInBytes, newAllocationSize); final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize); - newBuf.setBytes(0, data, 0, data.capacity()); + newBuf.setBytes(0, data, 0, currentBufferCapacity); final int halfNewCapacity = newBuf.capacity() / 2; newBuf.setZero(halfNewCapacity, halfNewCapacity); newBuf.writerIndex(data.writerIndex()); diff --git a/java/vector/src/main/codegen/templates/MapWriters.java b/java/vector/src/main/codegen/templates/MapWriters.java index 14cc08d7db0e9..b89f91457e8b2 100644 --- a/java/vector/src/main/codegen/templates/MapWriters.java +++ b/java/vector/src/main/codegen/templates/MapWriters.java @@ -47,6 +47,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter { protected final ${containerClass} container; + private int initialCapacity; private final Map fields = Maps.newHashMap(); public ${mode}MapWriter(${containerClass} container) { <#if mode == "Single"> @@ -55,6 +56,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter { } this.container = container; + this.initialCapacity = 0; for (Field child : container.getField().getChildren()) { MinorType minorType = Types.getMinorTypeForArrowType(child.getType()); switch (minorType) { @@ -101,6 +103,11 @@ public int getValueCapacity() { return container.getValueCapacity(); } + public void setInitialCapacity(int initialCapacity) { + this.initialCapacity = initialCapacity; + container.setInitialCapacity(initialCapacity); + } + @Override public boolean isEmptyMap() { return 0 == container.size(); @@ -248,6 +255,9 @@ public void end() { writer = new PromotableWriter(v, container, getNullableMapWriterFactory()); vector = v; if (currentVector == null || currentVector != vector) { + if(this.initialCapacity > 0) { + vector.setInitialCapacity(this.initialCapacity); + } vector.allocateNewSafe(); } writer.setPosition(idx()); diff --git a/java/vector/src/main/codegen/templates/UnionReader.java b/java/vector/src/main/codegen/templates/UnionReader.java index 2246fb36642b8..fd3a766fc2cb4 100644 --- a/java/vector/src/main/codegen/templates/UnionReader.java +++ b/java/vector/src/main/codegen/templates/UnionReader.java @@ -18,6 +18,7 @@ import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Field; <@pp.dropOutputFile /> <@pp.changeOutputFile name="/org/apache/arrow/vector/complex/impl/UnionReader.java" /> @@ -53,6 +54,11 @@ public MinorType getMinorType() { } } + @Override + public Field getField() { + return data.getField(); + } + public boolean isSet(){ return !data.getAccessor().isNull(idx()); } diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index aa9d34d6e2666..eabe42a7c4ce5 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -321,10 +321,8 @@ public void transfer() { @Override public void splitAndTransfer(int startIndex, int length) { - to.allocateNew(); - for (int i = 0; i < length; i++) { - to.copyFromSafe(startIndex + i, i, org.apache.arrow.vector.complex.UnionVector.this); - } + internalMapVectorTransferPair.splitAndTransfer(startIndex, length); + typeVectorTransferPair.splitAndTransfer(startIndex, length); to.getMutator().setValueCount(length); } diff --git a/java/vector/src/main/codegen/templates/VariableLengthVectors.java b/java/vector/src/main/codegen/templates/VariableLengthVectors.java index 2ad7d20de2651..16caf26373d4e 100644 --- a/java/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/java/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -352,14 +352,20 @@ public void reset() { } public void reAlloc() { - offsetVector.reAlloc(); - final long newAllocationSize = allocationSizeInBytes*2L; + long baseSize = allocationSizeInBytes; + final int currentBufferCapacity = data.capacity(); + if (baseSize < (long)currentBufferCapacity) { + baseSize = (long)currentBufferCapacity; + } + long newAllocationSize = baseSize * 2L; + newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize); + if (newAllocationSize > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached."); } final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize); - newBuf.setBytes(0, data, 0, data.capacity()); + newBuf.setBytes(0, data, 0, currentBufferCapacity); data.release(); data = newBuf; allocationSizeInBytes = (int)newAllocationSize; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java index 82cbd47d75816..6002e9e4e0090 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java @@ -15,9 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.vector; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.holders.BitHolder; @@ -42,7 +44,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe private final Mutator mutator = new Mutator(); int valueCount; - private int allocationSizeInBytes = INITIAL_VALUE_ALLOCATION; + private int allocationSizeInBytes = getSizeFromCount(INITIAL_VALUE_ALLOCATION); private int allocationMonitor = 0; public BitVector(String name, BufferAllocator allocator) { @@ -156,7 +158,7 @@ public boolean allocateNewSafe() { @Override public void reset() { valueCount = 0; - allocationSizeInBytes = INITIAL_VALUE_ALLOCATION; + allocationSizeInBytes = getSizeFromCount(INITIAL_VALUE_ALLOCATION); allocationMonitor = 0; zeroVector(); super.reset(); @@ -190,7 +192,14 @@ private void allocateBytes(final long size) { * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one */ public void reAlloc() { - final long newAllocationSize = allocationSizeInBytes * 2L; + long baseSize = allocationSizeInBytes; + final int currentBufferCapacity = data.capacity(); + if (baseSize < (long)currentBufferCapacity) { + baseSize = (long)currentBufferCapacity; + } + long newAllocationSize = baseSize * 2L; + newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize); + if (newAllocationSize > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size"); } @@ -198,7 +207,7 @@ public void reAlloc() { final int curSize = (int)newAllocationSize; final ArrowBuf newBuf = allocator.buffer(curSize); newBuf.setZero(0, newBuf.capacity()); - newBuf.setBytes(0, data, 0, data.capacity()); + newBuf.setBytes(0, data, 0, currentBufferCapacity); data.release(); data = newBuf; allocationSizeInBytes = curSize; @@ -258,40 +267,70 @@ public void transferTo(BitVector target) { public void splitAndTransferTo(int startIndex, int length, BitVector target) { assert startIndex + length <= valueCount; - int firstByte = getByteIndex(startIndex); - int byteSize = getSizeFromCount(length); + int firstByteSource = getByteIndex(startIndex); + int lastByteSource = getByteIndex(valueCount - 1); + int byteSizeTarget = getSizeFromCount(length); int offset = startIndex % 8; - if (offset == 0) { - target.clear(); - // slice - if (target.data != null) { - target.data.release(); - } - target.data = data.slice(firstByte, byteSize); - target.data.retain(1); - } else { - // Copy data - // When the first bit starts from the middle of a byte (offset != 0), copy data from src BitVector. - // Each byte in the target is composed by a part in i-th byte, another part in (i+1)-th byte. - // The last byte copied to target is a bit tricky : - // 1) if length requires partly byte (length % 8 !=0), copy the remaining bits only. - // 2) otherwise, copy the last byte in the same way as to the prior bytes. - target.clear(); - target.allocateNew(length); - // TODO maybe do this one word at a time, rather than byte? - for(int i = 0; i < byteSize - 1; i++) { - target.data.setByte(i, (((this.data.getByte(firstByte + i) & 0xFF) >>> offset) + (this.data.getByte(firstByte + i + 1) << (8 - offset)))); + + if (length > 0) { + if (offset == 0) { + target.clear(); + // slice + if (target.data != null) { + target.data.release(); + } + target.data = data.slice(firstByteSource, byteSizeTarget); + target.data.retain(1); } - if (length % 8 != 0) { - target.data.setByte(byteSize - 1, ((this.data.getByte(firstByte + byteSize - 1) & 0xFF) >>> offset)); - } else { - target.data.setByte(byteSize - 1, - (((this.data.getByte(firstByte + byteSize - 1) & 0xFF) >>> offset) + (this.data.getByte(firstByte + byteSize) << (8 - offset)))); + else { + // Copy data + // When the first bit starts from the middle of a byte (offset != 0), copy data from src BitVector. + // Each byte in the target is composed by a part in i-th byte, another part in (i+1)-th byte. + + target.clear(); + target.allocateNew(byteSizeTarget * 8); + + // TODO maybe do this one word at a time, rather than byte? + + for (int i = 0; i < byteSizeTarget - 1; i++) { + byte b1 = getBitsFromCurrentByte(this.data, firstByteSource + i, offset); + byte b2 = getBitsFromNextByte(this.data, firstByteSource + i + 1, offset); + + target.data.setByte(i, (b1 + b2)); + } + + /* Copying the last piece is done in the following manner: + * if the source vector has 1 or more bytes remaining, we copy + * the last piece as a byte formed by shifting data + * from the current byte and the next byte. + * + * if the source vector has no more bytes remaining + * (we are at the last byte), we copy the last piece as a byte + * by shifting data from the current byte. + */ + if((firstByteSource + byteSizeTarget - 1) < lastByteSource) { + byte b1 = getBitsFromCurrentByte(this.data, firstByteSource + byteSizeTarget - 1, offset); + byte b2 = getBitsFromNextByte(this.data, firstByteSource + byteSizeTarget, offset); + + target.data.setByte(byteSizeTarget - 1, b1 + b2); + } + else { + byte b1 = getBitsFromCurrentByte(this.data, firstByteSource + byteSizeTarget - 1, offset); + target.data.setByte(byteSizeTarget - 1, b1); + } } } target.getMutator().setValueCount(length); } + private static byte getBitsFromCurrentByte(ArrowBuf data, int index, int offset) { + return (byte)((data.getByte(index) & 0xFF) >>> offset); + } + + private static byte getBitsFromNextByte(ArrowBuf data, int index, int offset) { + return (byte)((data.getByte(index) << (8 - offset))); + } + private class TransferImpl implements TransferPair { BitVector to; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index 4ab624f3694cb..11e5dbf06dcff 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -38,6 +38,7 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.ZeroVector; import org.apache.arrow.vector.complex.impl.ComplexCopier; import org.apache.arrow.vector.complex.impl.UnionListReader; @@ -179,7 +180,11 @@ public TransferPair makeTransferPair(ValueVector target) { private class TransferImpl implements TransferPair { ListVector to; - TransferPair pairs[] = new TransferPair[3]; + TransferPair bitsTransferPair; + TransferPair offsetsTransferPair; + TransferPair dataTransferPair; + + TransferPair[] pairs; public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) { this(new ListVector(name, allocator, fieldType, callBack)); @@ -188,12 +193,13 @@ public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) { public TransferImpl(ListVector to) { this.to = to; to.addOrGetVector(vector.getField().getFieldType()); - pairs[0] = offsets.makeTransferPair(to.offsets); - pairs[1] = bits.makeTransferPair(to.bits); + offsetsTransferPair = offsets.makeTransferPair(to.offsets); + bitsTransferPair = bits.makeTransferPair(to.bits); if (to.getDataVector() instanceof ZeroVector) { to.addOrGetVector(vector.getField().getFieldType()); } - pairs[2] = getDataVector().makeTransferPair(to.getDataVector()); + dataTransferPair = getDataVector().makeTransferPair(to.getDataVector()); + pairs = new TransferPair[] { bitsTransferPair, offsetsTransferPair, dataTransferPair }; } @Override @@ -206,10 +212,20 @@ public void transfer() { @Override public void splitAndTransfer(int startIndex, int length) { - to.allocateNew(); - for (int i = 0; i < length; i++) { - copyValueSafe(startIndex + i, i); + UInt4Vector.Accessor offsetVectorAccessor = ListVector.this.offsets.getAccessor(); + final int startPoint = offsetVectorAccessor.get(startIndex); + final int sliceLength = offsetVectorAccessor.get(startIndex + length) - startPoint; + to.clear(); + to.offsets.allocateNew(length + 1); + offsetVectorAccessor = ListVector.this.offsets.getAccessor(); + final UInt4Vector.Mutator targetOffsetVectorMutator = to.offsets.getMutator(); + for (int i = 0; i < length + 1; i++) { + targetOffsetVectorMutator.set(i, offsetVectorAccessor.get(startIndex + i) - startPoint); } + bitsTransferPair.splitAndTransfer(startIndex, length); + dataTransferPair.splitAndTransfer(startPoint, sliceLength); + to.lastSet = length; + to.mutator.setValueCount(length); } @Override diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/NullableMapReaderImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/NullableMapReaderImpl.java index 7c389e61ae202..067716e8ea290 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/NullableMapReaderImpl.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/NullableMapReaderImpl.java @@ -21,6 +21,7 @@ import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.arrow.vector.types.pojo.Field; public class NullableMapReaderImpl extends SingleMapReaderImpl { @@ -31,6 +32,11 @@ public NullableMapReaderImpl(MapVector vector) { this.nullableMapVector = (NullableMapVector)vector; } + @Override + public Field getField() { + return nullableMapVector.getField(); + } + @Override public void copyAsValue(MapWriter writer){ NullableMapWriter impl = (NullableMapWriter) writer; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java index ae17b4bbb10dd..48019093e387f 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java @@ -26,6 +26,7 @@ import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Field; import com.google.common.collect.Maps; @@ -45,6 +46,11 @@ private void setChildrenPosition(int index){ } } + @Override + public Field getField() { + return vector.getField(); + } + @Override public FieldReader reader(String name){ FieldReader reader = fields.get(name); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionListReader.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionListReader.java index 6c7c230226ea3..2bd0ca87cd074 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionListReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionListReader.java @@ -26,6 +26,7 @@ import org.apache.arrow.vector.complex.writer.FieldWriter; import org.apache.arrow.vector.holders.UnionHolder; import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Field; public class UnionListReader extends AbstractFieldReader { @@ -39,6 +40,11 @@ public UnionListReader(ListVector vector) { this.offsets = vector.getOffsetVector(); } + @Override + public Field getField() { + return vector.getField(); + } + @Override public boolean isSet() { return !vector.getAccessor().isNull(idx()); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java index f2343c88e70a5..2e08a622baac5 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java @@ -15,12 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.arrow.vector; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.util.TransferPair; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -63,4 +70,477 @@ public void testBitVectorCopyFromSafe() { } } + @Test + public void testSplitAndTransfer() throws Exception { + + try (final BitVector sourceVector = new BitVector("bitvector", allocator)) { + final BitVector.Mutator sourceMutator = sourceVector.getMutator(); + final BitVector.Accessor sourceAccessor = sourceVector.getAccessor(); + + sourceVector.allocateNew(40); + + /* populate the bitvector -- 010101010101010101010101..... */ + for (int i = 0; i < 40; i++) { + if ((i & 1) == 1) { + sourceMutator.set(i, 1); + } else { + sourceMutator.set(i, 0); + } + } + + sourceMutator.setValueCount(40); + + /* check the vector output */ + for (int i = 0; i < 40; i++) { + int result = sourceAccessor.get(i); + if ((i & 1) == 1) { + assertEquals(Integer.toString(1), Integer.toString(result)); + } else { + assertEquals(Integer.toString(0), Integer.toString(result)); + } + } + + try (final BitVector toVector = new BitVector("toVector", allocator)) { + final TransferPair transferPair = sourceVector.makeTransferPair(toVector); + final BitVector.Accessor toAccessor = toVector.getAccessor(); + final BitVector.Mutator toMutator = toVector.getMutator(); + + /* + * form test cases such that we cover: + * + * (1) the start index is exactly where a particular byte starts in the source bit vector + * (2) the start index is randomly positioned within a byte in the source bit vector + * (2.1) the length is a multiple of 8 + * (2.2) the length is not a multiple of 8 + */ + final int[][] transferLengths = {{0, 8}, {8, 10}, {18, 0}, {18, 8}, {26, 0}, {26, 14}}; + + for (final int[] transferLength : transferLengths) { + final int start = transferLength[0]; + final int length = transferLength[1]; + + transferPair.splitAndTransfer(start, length); + + /* check the toVector output after doing splitAndTransfer */ + for (int i = 0; i < length; i++) { + int actual = toAccessor.get(i); + int expected = sourceAccessor.get(start + i); + assertEquals("different data values not expected --> sourceVector index: " + (start + i) + " toVector index: " + i, + expected, actual); + } + } + } + } + } + + @Test + public void testSplitAndTransfer1() throws Exception { + + try (final BitVector sourceVector = new BitVector("bitvector", allocator)) { + final BitVector.Mutator sourceMutator = sourceVector.getMutator(); + final BitVector.Accessor sourceAccessor = sourceVector.getAccessor(); + + sourceVector.allocateNew(8190); + + /* populate the bitvector */ + for (int i = 0; i < 8190; i++) { + sourceMutator.set(i, 1); + } + + sourceMutator.setValueCount(8190); + + /* check the vector output */ + for (int i = 0; i < 8190; i++) { + int result = sourceAccessor.get(i); + assertEquals(Integer.toString(1), Integer.toString(result)); + } + + try (final BitVector toVector = new BitVector("toVector", allocator)) { + final TransferPair transferPair = sourceVector.makeTransferPair(toVector); + final BitVector.Accessor toAccessor = toVector.getAccessor(); + final BitVector.Mutator toMutator = toVector.getMutator(); + + final int[][] transferLengths = {{0, 4095}, {4095, 4095}}; + + for (final int[] transferLength : transferLengths) { + final int start = transferLength[0]; + final int length = transferLength[1]; + + transferPair.splitAndTransfer(start, length); + + /* check the toVector output after doing splitAndTransfer */ + for (int i = 0; i < length; i++) { + int actual = toAccessor.get(i); + int expected = sourceAccessor.get(start + i); + assertEquals("different data values not expected --> sourceVector index: " + (start + i) + " toVector index: " + i, + expected, actual); + } + } + } + } + } + + @Test + public void testSplitAndTransfer2() throws Exception { + + try (final BitVector sourceVector = new BitVector("bitvector", allocator)) { + final BitVector.Mutator sourceMutator = sourceVector.getMutator(); + final BitVector.Accessor sourceAccessor = sourceVector.getAccessor(); + + sourceVector.allocateNew(32); + + /* populate the bitvector */ + for (int i = 0; i < 32; i++) { + if ((i & 1) == 1) { + sourceMutator.set(i, 1); + } else { + sourceMutator.set(i, 0); + } + } + + sourceMutator.setValueCount(32); + + /* check the vector output */ + for (int i = 0; i < 32; i++) { + int result = sourceAccessor.get(i); + if ((i & 1) == 1) { + assertEquals(Integer.toString(1), Integer.toString(result)); + } else { + assertEquals(Integer.toString(0), Integer.toString(result)); + } + } + + try (final BitVector toVector = new BitVector("toVector", allocator)) { + final TransferPair transferPair = sourceVector.makeTransferPair(toVector); + final BitVector.Accessor toAccessor = toVector.getAccessor(); + final BitVector.Mutator toMutator = toVector.getMutator(); + + final int[][] transferLengths = {{5,22}, {5,24}, {5,25}, {5,27}, {0,31}, {5,7}, {2,3}}; + + for (final int[] transferLength : transferLengths) { + final int start = transferLength[0]; + final int length = transferLength[1]; + + transferPair.splitAndTransfer(start, length); + + /* check the toVector output after doing splitAndTransfer */ + for (int i = 0; i < length; i++) { + int actual = toAccessor.get(i); + int expected = sourceAccessor.get(start + i); + assertEquals("different data values not expected --> sourceVector index: " + (start + i) + " toVector index: " + i, + expected, actual); + } + } + } + } + } + + @Test + public void testReallocAfterVectorTransfer1() { + try (final BitVector vector = new BitVector(EMPTY_SCHEMA_PATH, allocator)) { + vector.allocateNew(4096); + int valueCapacity = vector.getValueCapacity(); + assertEquals(4096, valueCapacity); + + final BitVector.Mutator mutator = vector.getMutator(); + final BitVector.Accessor accessor = vector.getAccessor(); + + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + mutator.setToOne(i); + } + } + + for (int i = 0; i < valueCapacity; i++) { + int val = accessor.get(i); + if ((i & 1) == 1) { + assertEquals("unexpected cleared bit at index: " + i, 1, val); + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + + /* trigger first realloc */ + mutator.setSafeToOne(valueCapacity); + assertEquals(valueCapacity * 2, vector.getValueCapacity()); + + for (int i = valueCapacity; i < valueCapacity*2; i++) { + if ((i & 1) == 1) { + mutator.setToOne(i); + } + } + + for (int i = 0; i < valueCapacity*2; i++) { + int val = accessor.get(i); + if (((i & 1) == 1) || (i == valueCapacity)) { + assertEquals("unexpected cleared bit at index: " + i, 1, val); + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + + /* trigger second realloc */ + mutator.setSafeToOne(valueCapacity*2); + assertEquals(valueCapacity * 4, vector.getValueCapacity()); + + for (int i = valueCapacity*2; i < valueCapacity*4; i++) { + if ((i & 1) == 1) { + mutator.setToOne(i); + } + } + + for (int i = 0; i < valueCapacity*4; i++) { + int val = accessor.get(i); + if (((i & 1) == 1) || (i == valueCapacity) || (i == valueCapacity*2)) { + assertEquals("unexpected cleared bit at index: " + i, 1, val); + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + + /* now transfer the vector */ + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + final BitVector toVector = (BitVector)transferPair.getTo(); + final BitVector.Accessor toAccessor = toVector.getAccessor(); + final BitVector.Mutator toMutator = toVector.getMutator(); + + assertEquals(valueCapacity * 4, toVector.getValueCapacity()); + + /* realloc the toVector */ + toMutator.setSafeToOne(valueCapacity * 4); + + for (int i = 0; i < toVector.getValueCapacity(); i++) { + int val = toAccessor.get(i); + if (i <= valueCapacity * 4) { + if (((i & 1) == 1) || (i == valueCapacity) || + (i == valueCapacity*2) || (i == valueCapacity*4)) { + assertEquals("unexpected cleared bit at index: " + i, 1, val); + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + + toVector.close(); + } + } + + @Test + public void testReallocAfterVectorTransfer2() { + try (final NullableBitVector vector = new NullableBitVector(EMPTY_SCHEMA_PATH, allocator)) { + vector.allocateNew(4096); + int valueCapacity = vector.getValueCapacity(); + assertEquals(4096, valueCapacity); + + final NullableBitVector.Mutator mutator = vector.getMutator(); + final NullableBitVector.Accessor accessor = vector.getAccessor(); + + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + mutator.set(i, 1); + } + } + + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i)); + } + else { + assertTrue("unexpected set bit at index: " + i, accessor.isNull(i)); + } + } + + /* trigger first realloc */ + mutator.setSafe(valueCapacity, 1, 1); + assertEquals(valueCapacity * 2, vector.getValueCapacity()); + + for (int i = valueCapacity; i < valueCapacity*2; i++) { + if ((i & 1) == 1) { + mutator.set(i, 1); + } + } + + for (int i = 0; i < valueCapacity*2; i++) { + if (((i & 1) == 1) || (i == valueCapacity)) { + assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i)); + } + else { + assertTrue("unexpected set bit at index: " + i, accessor.isNull(i)); + } + } + + /* trigger second realloc */ + mutator.setSafe(valueCapacity*2, 1, 1); + assertEquals(valueCapacity * 4, vector.getValueCapacity()); + + for (int i = valueCapacity*2; i < valueCapacity*4; i++) { + if ((i & 1) == 1) { + mutator.set(i, 1); + } + } + + for (int i = 0; i < valueCapacity*4; i++) { + if (((i & 1) == 1) || (i == valueCapacity) || (i == valueCapacity*2)) { + assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i)); + } + else { + assertTrue("unexpected set bit at index: " + i, accessor.isNull(i)); + } + } + + /* now transfer the vector */ + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + final NullableBitVector toVector = (NullableBitVector)transferPair.getTo(); + final NullableBitVector.Accessor toAccessor = toVector.getAccessor(); + final NullableBitVector.Mutator toMutator = toVector.getMutator(); + + assertEquals(valueCapacity * 4, toVector.getValueCapacity()); + + /* realloc the toVector */ + toMutator.setSafe(valueCapacity * 4, 1, 1); + + for (int i = 0; i < toVector.getValueCapacity(); i++) { + if (i <= valueCapacity * 4) { + if (((i & 1) == 1) || (i == valueCapacity) || + (i == valueCapacity*2) || (i == valueCapacity*4)) { + assertFalse("unexpected cleared bit at index: " + i, toAccessor.isNull(i)); + } + else { + assertTrue("unexpected set bit at index: " + i, toAccessor.isNull(i)); + } + } + else { + assertTrue("unexpected set bit at index: " + i, toAccessor.isNull(i)); + } + } + + toVector.close(); + } + } + + @Test + public void testBitVector() { + // Create a new value vector for 1024 integers + try (final BitVector vector = new BitVector(EMPTY_SCHEMA_PATH, allocator)) { + final BitVector.Mutator m = vector.getMutator(); + vector.allocateNew(1024); + m.setValueCount(1024); + + // Put and set a few values + m.set(0, 1); + m.set(1, 0); + m.set(100, 0); + m.set(1022, 1); + + m.setValueCount(1024); + + final BitVector.Accessor accessor = vector.getAccessor(); + assertEquals(1, accessor.get(0)); + assertEquals(0, accessor.get(1)); + assertEquals(0, accessor.get(100)); + assertEquals(1, accessor.get(1022)); + + assertEquals(1022, accessor.getNullCount()); + + // test setting the same value twice + m.set(0, 1); + m.set(0, 1); + m.set(1, 0); + m.set(1, 0); + assertEquals(1, accessor.get(0)); + assertEquals(0, accessor.get(1)); + + // test toggling the values + m.set(0, 0); + m.set(1, 1); + assertEquals(0, accessor.get(0)); + assertEquals(1, accessor.get(1)); + + // should not change + assertEquals(1022, accessor.getNullCount()); + + // Ensure unallocated space returns 0 + assertEquals(0, accessor.get(3)); + + // unset the previously set bits + m.set(1, 0); + m.set(1022, 0); + // this should set all the array to 0 + assertEquals(1024, accessor.getNullCount()); + + // set all the array to 1 + for (int i = 0; i < 1024; ++i) { + assertEquals(1024 - i, accessor.getNullCount()); + m.set(i, 1); + } + + assertEquals(0, accessor.getNullCount()); + + vector.allocateNew(1015); + m.setValueCount(1015); + + // ensure it has been zeroed + assertEquals(1015, accessor.getNullCount()); + + m.set(0, 1); + m.set(1014, 1); // ensure that the last item of the last byte is allocated + + assertEquals(1013, accessor.getNullCount()); + + vector.zeroVector(); + assertEquals(1015, accessor.getNullCount()); + + // set all the array to 1 + for (int i = 0; i < 1015; ++i) { + assertEquals(1015 - i, accessor.getNullCount()); + m.set(i, 1); + } + + assertEquals(0, accessor.getNullCount()); + } + } + + @Test + public void testBitVectorRangeSetAllOnes() { + validateRange(1000, 0, 1000); + validateRange(1000, 0, 1); + validateRange(1000, 1, 2); + validateRange(1000, 5, 6); + validateRange(1000, 5, 10); + validateRange(1000, 5, 150); + validateRange(1000, 5, 27); + for (int i = 0; i < 8; i++) { + for (int j = 0; j < 8; j++) { + validateRange(1000, 10 + i, 27 + j); + validateRange(1000, i, j); + } + } + } + + private void validateRange(int length, int start, int count) { + String desc = "[" + start + ", " + (start + count) + ") "; + try (BitVector bitVector = new BitVector("bits", allocator)) { + bitVector.reset(); + bitVector.allocateNew(length); + bitVector.getMutator().setRangeToOne(start, count); + for (int i = 0; i < start; i++) { + Assert.assertEquals(desc + i, 0, bitVector.getAccessor().get(i)); + } + for (int i = start; i < start + count; i++) { + Assert.assertEquals(desc + i, 1, bitVector.getAccessor().get(i)); + } + for (int i = start + count; i < length; i++) { + Assert.assertEquals(desc + i, 0, bitVector.getAccessor().get(i)); + } + } + } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java index 08e3700daeebf..46e66582c5b61 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java @@ -46,7 +46,9 @@ public void testTransferFixedWidth() { v1.makeTransferPair(v2).transfer(); assertEquals(0, childAllocator1.getAllocatedMemory()); - assertEquals(5 * 4096, childAllocator2.getAllocatedMemory()); + int expectedBitVector = 512; + int expectedValueVector = 4096*4; + assertEquals(expectedBitVector + expectedValueVector, childAllocator2.getAllocatedMemory()); } @Test @@ -65,7 +67,11 @@ public void testTransferVariableidth() { v1.makeTransferPair(v2).transfer(); assertEquals(0, childAllocator1.getAllocatedMemory()); - int expected = 8*4096 + 4*4096 + 4096; + + int expectedValueVector = 4096*8; + int expectedOffsetVector = 4096*4; + int expectedBitVector = 512; + int expected = expectedBitVector + expectedOffsetVector + expectedValueVector; assertEquals(expected, childAllocator2.getAllocatedMemory()); } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java index 63543b0932908..d87c766b65c5a 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import java.nio.charset.Charset; import java.util.List; @@ -33,6 +34,7 @@ import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.TransferPair; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -404,6 +406,277 @@ private void validateRange(int length, int start, int count) { } } + @Test /* Float8Vector */ + public void testReallocAfterVectorTransfer1() { + try (final Float8Vector vector = new Float8Vector(EMPTY_SCHEMA_PATH, allocator)) { + final Float8Vector.Mutator mutator = vector.getMutator(); + final Float8Vector.Accessor accessor = vector.getAccessor(); + final int initialDefaultCapacity = 4096; + boolean error = false; + + /* use the default capacity; 4096*8 => 32KB */ + vector.allocateNew(); + + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + double baseValue = 100.375; + + for (int i = 0; i < initialDefaultCapacity; i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + /* the above setSafe calls should not have triggered a realloc as + * we are within the capacity. check the vector contents + */ + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + for (int i = 0; i < initialDefaultCapacity; i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity, baseValue + (double)initialDefaultCapacity); + assertEquals(initialDefaultCapacity * 2, vector.getValueCapacity()); + + for (int i = initialDefaultCapacity + 1; i < (initialDefaultCapacity * 2); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 2); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity * 2, baseValue + (double)(initialDefaultCapacity * 2)); + assertEquals(initialDefaultCapacity * 4, vector.getValueCapacity()); + + for (int i = (initialDefaultCapacity * 2) + 1; i < (initialDefaultCapacity * 4); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 4); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* at this point we are working with a 128KB buffer data for this + * vector. now let's transfer this vector + */ + + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + + Float8Vector toVector = (Float8Vector)transferPair.getTo(); + + /* now let's realloc the toVector */ + toVector.reAlloc(); + assertEquals(initialDefaultCapacity * 8, toVector.getValueCapacity()); + + final Float8Vector.Accessor toAccessor = toVector.getAccessor(); + + for (int i = 0; i < (initialDefaultCapacity * 8); i++) { + double value = toAccessor.get(i); + if (i < (initialDefaultCapacity * 4)) { + assertEquals(baseValue + (double)i, value, 0); + } + else { + assertEquals(0, value, 0); + } + } + + toVector.close(); + } + } + + @Test /* NullableFloat8Vector */ + public void testReallocAfterVectorTransfer2() { + try (final NullableFloat8Vector vector = new NullableFloat8Vector(EMPTY_SCHEMA_PATH, allocator)) { + final NullableFloat8Vector.Mutator mutator = vector.getMutator(); + final NullableFloat8Vector.Accessor accessor = vector.getAccessor(); + final int initialDefaultCapacity = 4096; + boolean error = false; + + vector.allocateNew(initialDefaultCapacity); + + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + double baseValue = 100.375; + + for (int i = 0; i < initialDefaultCapacity; i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + /* the above setSafe calls should not have triggered a realloc as + * we are within the capacity. check the vector contents + */ + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + for (int i = 0; i < initialDefaultCapacity; i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity, baseValue + (double)initialDefaultCapacity); + assertEquals(initialDefaultCapacity * 2, vector.getValueCapacity()); + + for (int i = initialDefaultCapacity + 1; i < (initialDefaultCapacity * 2); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 2); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity * 2, baseValue + (double)(initialDefaultCapacity * 2)); + assertEquals(initialDefaultCapacity * 4, vector.getValueCapacity()); + + for (int i = (initialDefaultCapacity * 2) + 1; i < (initialDefaultCapacity * 4); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 4); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* at this point we are working with a 128KB buffer data for this + * vector. now let's transfer this vector + */ + + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + + NullableFloat8Vector toVector = (NullableFloat8Vector)transferPair.getTo(); + final NullableFloat8Vector.Accessor toAccessor = toVector.getAccessor(); + + /* check toVector contents before realloc */ + for (int i = 0; i < (initialDefaultCapacity * 4); i++) { + assertFalse("unexpected null value at index: " + i, toAccessor.isNull(i)); + double value = toAccessor.get(i); + assertEquals("unexpected value at index: " + i, baseValue + (double)i, value, 0); + } + + /* now let's realloc the toVector and check contents again */ + toVector.reAlloc(); + assertEquals(initialDefaultCapacity * 8, toVector.getValueCapacity()); + + for (int i = 0; i < (initialDefaultCapacity * 8); i++) { + if (i < (initialDefaultCapacity * 4)) { + assertFalse("unexpected null value at index: " + i, toAccessor.isNull(i)); + double value = toAccessor.get(i); + assertEquals("unexpected value at index: " + i, baseValue + (double)i, value, 0); + } + else { + assertTrue("unexpected non-null value at index: " + i, toAccessor.isNull(i)); + } + } + + toVector.close(); + } + } + + @Test /* NullableVarCharVector */ + public void testReallocAfterVectorTransfer3() { + try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) { + final NullableVarCharVector.Mutator mutator = vector.getMutator(); + final NullableVarCharVector.Accessor accessor = vector.getAccessor(); + + /* 4096 values with 10 byte per record */ + vector.allocateNew(4096 * 10, 4096); + int valueCapacity = vector.getValueCapacity(); + + /* populate the vector */ + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + mutator.set(i, STR1); + } + else { + mutator.set(i, STR2); + } + } + + /* Check the vector output */ + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertArrayEquals(STR1, accessor.get(i)); + } + else { + assertArrayEquals(STR2, accessor.get(i)); + } + } + + /* trigger first realloc */ + mutator.setSafe(valueCapacity, STR2, 0, STR2.length); + + /* populate the remaining vector */ + for (int i = valueCapacity; i < vector.getValueCapacity(); i++) { + if ((i & 1) == 1) { + mutator.set(i, STR1); + } + else { + mutator.set(i, STR2); + } + } + + /* Check the vector output */ + valueCapacity = vector.getValueCapacity(); + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertArrayEquals(STR1, accessor.get(i)); + } + else { + assertArrayEquals(STR2, accessor.get(i)); + } + } + + /* trigger second realloc */ + mutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length); + + /* populate the remaining vector */ + for (int i = valueCapacity; i < vector.getValueCapacity(); i++) { + if ((i & 1) == 1) { + mutator.set(i, STR1); + } + else { + mutator.set(i, STR2); + } + } + + /* Check the vector output */ + valueCapacity = vector.getValueCapacity(); + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertArrayEquals(STR1, accessor.get(i)); + } + else { + assertArrayEquals(STR2, accessor.get(i)); + } + } + + /* we are potentially working with 4x the size of vector buffer + * that we initially started with. Now let's transfer the vector. + */ + + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + NullableVarCharVector toVector = (NullableVarCharVector)transferPair.getTo(); + NullableVarCharVector.Mutator toMutator = toVector.getMutator(); + NullableVarCharVector.Accessor toAccessor = toVector.getAccessor(); + + valueCapacity = toVector.getValueCapacity(); + + /* trigger a realloc of this toVector */ + toMutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length); + + toVector.close(); + } + } + @Test public void testReAllocNullableFixedWidthVector() { // Create a new value vector for 1024 integers diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorReAlloc.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorReAlloc.java index da9cb00361c0b..53d4539007995 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorReAlloc.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorReAlloc.java @@ -72,31 +72,6 @@ public void testFixedType() { } } - @Test - public void testVariableLengthType() { - try (final VarCharVector vector = new VarCharVector("", allocator)) { - final VarCharVector.Mutator m = vector.getMutator(); - // note: capacity ends up being - 1 due to offsets vector - vector.setInitialCapacity(511); - vector.allocateNew(); - - assertEquals(511, vector.getValueCapacity()); - - try { - m.set(512, "foo".getBytes(StandardCharsets.UTF_8)); - Assert.fail("Expected out of bounds exception"); - } catch (Exception e) { - // ok - } - - vector.reAlloc(); - assertEquals(1023, vector.getValueCapacity()); - - m.set(512, "foo".getBytes(StandardCharsets.UTF_8)); - assertEquals("foo", new String(vector.getAccessor().get(512), StandardCharsets.UTF_8)); - } - } - @Test public void testNullableType() { try (final NullableVarCharVector vector = new NullableVarCharVector("", allocator)) { @@ -114,7 +89,7 @@ public void testNullableType() { } vector.reAlloc(); - assertEquals(1024, vector.getValueCapacity()); + assertEquals(1023, vector.getValueCapacity()); // note: size - 1 for some reason... m.set(512, "foo".getBytes(StandardCharsets.UTF_8)); assertEquals("foo", new String(vector.getAccessor().get(512), StandardCharsets.UTF_8)); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java index 5a9c80dc124a2..57bcfc8622b4f 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java @@ -27,6 +27,10 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.SchemaChangeCallBack; +import org.apache.arrow.vector.NullableFloat8Vector; +import org.apache.arrow.vector.NullableFloat4Vector; +import org.apache.arrow.vector.NullableBigIntVector; +import org.apache.arrow.vector.NullableIntVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; @@ -37,6 +41,11 @@ import org.apache.arrow.vector.complex.impl.UnionListWriter; import org.apache.arrow.vector.complex.impl.UnionReader; import org.apache.arrow.vector.complex.impl.UnionWriter; +import org.apache.arrow.vector.complex.impl.SingleMapWriter; +import org.apache.arrow.vector.complex.reader.IntReader; +import org.apache.arrow.vector.complex.reader.Float8Reader; +import org.apache.arrow.vector.complex.reader.Float4Reader; +import org.apache.arrow.vector.complex.reader.BigIntReader; import org.apache.arrow.vector.complex.reader.BaseReader.MapReader; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; @@ -833,4 +842,83 @@ public void complexCopierWithList() { innerMap = (JsonStringHashMap) object.get(3); assertEquals(2, innerMap.get("a")); } + + @Test + public void testSingleMapWriter1() { + /* initialize a SingleMapWriter with empty MapVector and then lazily + * create all vectors with expected initialCapacity. + */ + MapVector parent = MapVector.empty("parent", allocator); + SingleMapWriter singleMapWriter = new SingleMapWriter(parent); + + int initialCapacity = 1024; + singleMapWriter.setInitialCapacity(initialCapacity); + + IntWriter intWriter = singleMapWriter.integer("intField"); + BigIntWriter bigIntWriter = singleMapWriter.bigInt("bigIntField"); + Float4Writer float4Writer = singleMapWriter.float4("float4Field"); + Float8Writer float8Writer = singleMapWriter.float8("float8Field"); + ListWriter listWriter = singleMapWriter.list("listField"); + + int intValue = 100; + long bigIntValue = 10000; + float float4Value = 100.5f; + double float8Value = 100.375; + + for (int i = 0; i < initialCapacity; i++) { + singleMapWriter.start(); + + intWriter.writeInt(intValue + i); + bigIntWriter.writeBigInt(bigIntValue + (long)i); + float4Writer.writeFloat4(float4Value + (float)i); + float8Writer.writeFloat8(float8Value + (double)i); + + listWriter.setPosition(i); + listWriter.startList(); + listWriter.integer().writeInt(intValue + i); + listWriter.integer().writeInt(intValue + i + 1); + listWriter.integer().writeInt(intValue + i + 2); + listWriter.integer().writeInt(intValue + i + 3); + listWriter.endList(); + + singleMapWriter.end(); + } + + NullableIntVector intVector = (NullableIntVector)parent.getChild("intField"); + NullableBigIntVector bigIntVector = (NullableBigIntVector)parent.getChild("bigIntField"); + NullableFloat4Vector float4Vector = (NullableFloat4Vector)parent.getChild("float4Field"); + NullableFloat8Vector float8Vector = (NullableFloat8Vector)parent.getChild("float8Field"); + + assertEquals(initialCapacity, singleMapWriter.getValueCapacity()); + assertEquals(initialCapacity, intVector.getValueCapacity()); + assertEquals(initialCapacity, bigIntVector.getValueCapacity()); + assertEquals(initialCapacity, float4Vector.getValueCapacity()); + assertEquals(initialCapacity, float8Vector.getValueCapacity()); + + MapReader singleMapReader = new SingleMapReaderImpl(parent); + + IntReader intReader = singleMapReader.reader("intField"); + BigIntReader bigIntReader = singleMapReader.reader("bigIntField"); + Float4Reader float4Reader = singleMapReader.reader("float4Field"); + Float8Reader float8Reader = singleMapReader.reader("float8Field"); + UnionListReader listReader = (UnionListReader)singleMapReader.reader("listField"); + + for (int i = 0; i < initialCapacity; i++) { + intReader.setPosition(i); + bigIntReader.setPosition(i); + float4Reader.setPosition(i); + float8Reader.setPosition(i); + listReader.setPosition(i); + + assertEquals(intValue + i, intReader.readInteger().intValue()); + assertEquals(bigIntValue + (long)i, bigIntReader.readLong().longValue()); + assertEquals(float4Value + (float)i, float4Reader.readFloat().floatValue(), 0); + assertEquals(float8Value + (double)i, float8Reader.readDouble().doubleValue(), 0); + + for (int j = 0; j < 4; j++) { + listReader.next(); + assertEquals(intValue + i + j, listReader.reader().readInteger().intValue()); + } + } + } }