Skip to content

Commit

Permalink
Add bulk append methods to BlockBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Nov 19, 2023
1 parent 9ef7983 commit 1714e7d
Show file tree
Hide file tree
Showing 36 changed files with 2,842 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,35 @@
import it.unimi.dsi.fastutil.ints.IntArrayList;

import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

class TypedPositionsAppender
implements PositionsAppender
{
private static final int INSTANCE_SIZE = instanceSize(TypedPositionsAppender.class);

private final Type type;
private BlockBuilder blockBuilder;

TypedPositionsAppender(Type type, int expectedPositions)
{
this.type = requireNonNull(type, "type is null");
this.blockBuilder = type.createBlockBuilder(null, expectedPositions);
}

@Override
public void append(IntArrayList positions, ValueBlock source)
public void append(IntArrayList positions, ValueBlock block)
{
int[] positionArray = positions.elements();
for (int i = 0; i < positions.size(); i++) {
type.appendTo(source, positionArray[i], blockBuilder);
}
blockBuilder.appendPositions(block, positions.elements(), 0, positions.size());
}

@Override
public void appendRle(ValueBlock block, int rlePositionCount)
public void appendRle(ValueBlock block, int count)
{
for (int i = 0; i < rlePositionCount; i++) {
type.appendTo(block, 0, blockBuilder);
}
blockBuilder.appendRepeated(block, 0, count);
}

@Override
public void append(int position, ValueBlock source)
public void append(int position, ValueBlock block)
{
type.appendTo(source, position, blockBuilder);
blockBuilder.append(block, position);
}

@Override
Expand Down
27 changes: 27 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,33 @@
<code>java.method.removed</code>
<old>method &lt;E extends java.lang.Throwable&gt; void io.trino.spi.block.VariableWidthBlockBuilder::buildEntry(io.trino.spi.block.VariableWidthBlockBuilder.VariableWidthEntryBuilder&lt;E&gt;) throws E</old>
</item>
<item>
<ignore>true</ignore>
<code>java.method.addedToInterface</code>
<new>method void io.trino.spi.block.BlockBuilder::append(io.trino.spi.block.ValueBlock, int)</new>
</item>
<item>
<code>java.method.addedToInterface</code>
<new>method void io.trino.spi.block.BlockBuilder::appendPositions(io.trino.spi.block.ValueBlock, int[], int, int)</new>
</item>
<item>
<code>java.method.addedToInterface</code>
<new>method void io.trino.spi.block.BlockBuilder::appendRange(io.trino.spi.block.ValueBlock, int, int)</new>
</item>
<item>
<code>java.method.addedToInterface</code>
<new>method void io.trino.spi.block.BlockBuilder::appendRepeated(io.trino.spi.block.ValueBlock, int, int)</new>
</item>
<item>
<code>java.method.returnTypeChangedCovariantly</code>
<old>method io.trino.spi.block.BlockBuilder io.trino.spi.block.ShortArrayBlockBuilder::appendNull()</old>
<new>method io.trino.spi.block.ShortArrayBlockBuilder io.trino.spi.block.ShortArrayBlockBuilder::appendNull()</new>
</item>
<item>
<code>java.method.returnTypeChangedCovariantly</code>
<old>method io.trino.spi.block.BlockBuilder io.trino.spi.block.ShortArrayBlockBuilder::writeShort(short)</old>
<new>method io.trino.spi.block.ShortArrayBlockBuilder io.trino.spi.block.ShortArrayBlockBuilder::writeShort(short)</new>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ int[] getOffsets()
return offsets;
}

boolean[] getRawValueIsNull()
{
return valueIsNull;
}

int getOffsetBase()
{
return arrayOffset;
Expand Down
131 changes: 114 additions & 17 deletions core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlockBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.spi.block.ArrayBlock.createArrayBlockInternal;
import static io.trino.spi.block.BlockUtil.appendRawBlockRange;
import static io.trino.spi.block.BlockUtil.calculateNewArraySize;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;

public class ArrayBlockBuilder
implements BlockBuilder
{
private static final int INSTANCE_SIZE = instanceSize(ArrayBlockBuilder.class);
private static final int SIZE_IN_BYTES_PER_POSITION = Integer.BYTES + Byte.BYTES;

private int positionCount;

Expand All @@ -39,7 +42,7 @@ public class ArrayBlockBuilder
private int[] offsets = new int[1];
private boolean[] valueIsNull = new boolean[0];
private boolean hasNullValue;
private boolean hasNonNullRow;
private boolean hasNonNullValue;

private final BlockBuilder values;
private boolean currentEntryOpened;
Expand Down Expand Up @@ -82,7 +85,7 @@ private ArrayBlockBuilder(@Nullable BlockBuilderStatus blockBuilderStatus, Block
this.values = requireNonNull(values, "values is null");
this.initialEntryCount = max(expectedEntries, 1);

updateDataSize();
updateRetainedSize();
}

@Override
Expand All @@ -94,7 +97,7 @@ public int getPositionCount()
@Override
public long getSizeInBytes()
{
return values.getSizeInBytes() + ((Integer.BYTES + Byte.BYTES) * (long) positionCount);
return values.getSizeInBytes() + (SIZE_IN_BYTES_PER_POSITION * (long) positionCount);
}

@Override
Expand All @@ -116,6 +119,97 @@ public <E extends Throwable> void buildEntry(ArrayValueBuilder<E> builder)
currentEntryOpened = false;
}

@Override
public void append(ValueBlock block, int position)
{
if (currentEntryOpened) {
throw new IllegalStateException("Current entry must be closed before a null can be written");
}

ArrayBlock arrayBlock = (ArrayBlock) block;
if (block.isNull(position)) {
entryAdded(true);
return;
}

int offsetBase = arrayBlock.getOffsetBase();
int[] offsets = arrayBlock.getOffsets();
int startOffset = offsets[offsetBase + position];
int length = offsets[offsetBase + position + 1] - startOffset;

appendRawBlockRange(arrayBlock.getRawElementBlock(), startOffset, length, values);
entryAdded(false);
}

@Override
public void appendRepeated(ValueBlock block, int position, int count)
{
if (currentEntryOpened) {
throw new IllegalStateException("Current entry must be closed before a null can be written");
}
for (int i = 0; i < count; i++) {
append(block, position);
}
}

@Override
public void appendRange(ValueBlock block, int offset, int length)
{
if (currentEntryOpened) {
throw new IllegalStateException("Current entry must be closed before a null can be written");
}

ensureCapacity(positionCount + length);

ArrayBlock arrayBlock = (ArrayBlock) block;

int rawOffsetBase = arrayBlock.getOffsetBase();
int[] rawOffsets = arrayBlock.getOffsets();
int startOffset = rawOffsets[rawOffsetBase + offset];
int endOffset = rawOffsets[rawOffsetBase + offset + length];

appendRawBlockRange(arrayBlock.getRawElementBlock(), startOffset, endOffset - startOffset, values);

// update offsets for copied data
for (int i = 0; i < length; i++) {
int entrySize = rawOffsets[rawOffsetBase + offset + i + 1] - rawOffsets[rawOffsetBase + offset + i];
offsets[positionCount + i + 1] = offsets[positionCount + i] + entrySize;
}

boolean[] rawValueIsNull = arrayBlock.getRawValueIsNull();
if (rawValueIsNull != null) {
for (int i = 0; i < length; i++) {
if (rawValueIsNull[rawOffsetBase + offset + i]) {
valueIsNull[positionCount + i] = true;
hasNullValue = true;
}
else {
hasNonNullValue = true;
}
}
}
else {
hasNonNullValue = true;
}

positionCount += length;

if (blockBuilderStatus != null) {
blockBuilderStatus.addBytes(length * SIZE_IN_BYTES_PER_POSITION);
}
}

@Override
public void appendPositions(ValueBlock block, int[] positions, int offset, int length)
{
if (currentEntryOpened) {
throw new IllegalStateException("Current entry must be closed before a null can be written");
}
for (int i = 0; i < length; i++) {
append(block, positions[offset + i]);
}
}

@Override
public BlockBuilder appendNull()
{
Expand All @@ -129,37 +223,41 @@ public BlockBuilder appendNull()

private void entryAdded(boolean isNull)
{
if (valueIsNull.length <= positionCount) {
growCapacity();
}
ensureCapacity(positionCount + 1);

offsets[positionCount + 1] = values.getPositionCount();
valueIsNull[positionCount] = isNull;
hasNullValue |= isNull;
hasNonNullRow |= !isNull;
hasNonNullValue |= !isNull;
positionCount++;

if (blockBuilderStatus != null) {
blockBuilderStatus.addBytes(Integer.BYTES + Byte.BYTES);
blockBuilderStatus.addBytes(SIZE_IN_BYTES_PER_POSITION);
}
}

private void growCapacity()
private void ensureCapacity(int capacity)
{
if (valueIsNull.length >= capacity) {
return;
}

int newSize;
if (initialized) {
newSize = BlockUtil.calculateNewArraySize(valueIsNull.length);
newSize = calculateNewArraySize(capacity);
}
else {
newSize = initialEntryCount;
initialized = true;
}
newSize = max(newSize, capacity);

valueIsNull = Arrays.copyOf(valueIsNull, newSize);
offsets = Arrays.copyOf(offsets, newSize + 1);
updateDataSize();
updateRetainedSize();
}

private void updateDataSize()
private void updateRetainedSize()
{
retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(offsets);
if (blockBuilderStatus != null) {
Expand All @@ -173,7 +271,7 @@ public Block build()
if (currentEntryOpened) {
throw new IllegalStateException("Current entry must be closed before the block can be built");
}
if (!hasNonNullRow) {
if (!hasNonNullValue) {
return nullRle(positionCount);
}
return buildValueBlock();
Expand All @@ -197,10 +295,9 @@ public BlockBuilder newBlockBuilderLike(int expectedEntries, BlockBuilderStatus
@Override
public String toString()
{
StringBuilder sb = new StringBuilder("ArrayBlockBuilder{");
sb.append("positionCount=").append(getPositionCount());
sb.append('}');
return sb.toString();
return "ArrayBlockBuilder{" +
"positionCount=" + getPositionCount() +
'}';
}

private Block nullRle(int positionCount)
Expand Down
20 changes: 20 additions & 0 deletions core/trino-spi/src/main/java/io/trino/spi/block/BlockBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ public interface BlockBuilder
*/
long getRetainedSizeInBytes();

/**
* Append the specified value.
*/
void append(ValueBlock block, int position);

/**
* Append the specified value multiple times.
*/
void appendRepeated(ValueBlock block, int position, int count);

/**
* Append the values in the specified range.
*/
void appendRange(ValueBlock block, int offset, int length);

/**
* Append the values at the specified positions.
*/
void appendPositions(ValueBlock block, int[] positions, int offset, int length);

/**
* Appends a null value to the block.
*/
Expand Down
17 changes: 17 additions & 0 deletions core/trino-spi/src/main/java/io/trino/spi/block/BlockUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,21 @@ else if (buffer.length < capacity) {

return buffer;
}

static void appendRawBlockRange(Block rawBlock, int offset, int length, BlockBuilder blockBuilder)
{
rawBlock = rawBlock.getLoadedBlock();
if (rawBlock instanceof RunLengthEncodedBlock rleBlock) {
blockBuilder.appendRepeated(rleBlock.getValue(), 0, length);
}
else if (rawBlock instanceof DictionaryBlock dictionaryBlock) {
blockBuilder.appendPositions(dictionaryBlock.getDictionary(), dictionaryBlock.getRawIds(), offset, length);
}
else if (rawBlock instanceof ValueBlock valueBlock) {
blockBuilder.appendRange(valueBlock, offset, length);
}
else {
throw new IllegalArgumentException("Unsupported block type " + rawBlock.getClass().getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,19 @@ Slice getValuesSlice()
{
return Slices.wrappedBuffer(values, arrayOffset, positionCount);
}

int getRawValuesOffset()
{
return arrayOffset;
}

boolean[] getRawValueIsNull()
{
return valueIsNull;
}

byte[] getRawValues()
{
return values;
}
}
Loading

0 comments on commit 1714e7d

Please sign in to comment.