From b51e407b256a469fb25bd37369d69728c776f3d1 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Tue, 1 Aug 2023 16:34:26 -0700 Subject: [PATCH] Convert PositionAppender to use BlockBuilder bulk append methods --- .../output/BytePositionsAppender.java | 212 ----------- .../output/Fixed12PositionsAppender.java | 229 ------------ .../output/Int128PositionsAppender.java | 227 ------------ .../operator/output/IntPositionsAppender.java | 212 ----------- .../output/LongPositionsAppender.java | 212 ----------- .../output/PositionsAppenderFactory.java | 37 +- .../output/PositionsAppenderUtil.java | 14 +- .../output/ShortPositionsAppender.java | 212 ----------- .../output/SlicePositionsAppender.java | 333 ------------------ .../output/TypedPositionsAppender.java | 5 + .../output/TestSlicePositionsAppender.java | 122 ------- 11 files changed, 16 insertions(+), 1799 deletions(-) delete mode 100644 core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java delete mode 100644 core/trino-main/src/main/java/io/trino/operator/output/Fixed12PositionsAppender.java delete mode 100644 core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java delete mode 100644 core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java delete mode 100644 core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java delete mode 100644 core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java delete mode 100644 core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java delete mode 100644 core/trino-main/src/test/java/io/trino/operator/output/TestSlicePositionsAppender.java diff --git a/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java deleted file mode 100644 index 8599de929e4b..000000000000 --- a/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator.output; - -import io.trino.spi.block.Block; -import io.trino.spi.block.ByteArrayBlock; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.block.ValueBlock; -import it.unimi.dsi.fastutil.ints.IntArrayList; - -import java.util.Arrays; -import java.util.Optional; - -import static com.google.common.base.Preconditions.checkArgument; -import static io.airlift.slice.SizeOf.instanceSize; -import static io.airlift.slice.SizeOf.sizeOf; -import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; -import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; -import static java.lang.Math.max; - -public class BytePositionsAppender - implements PositionsAppender -{ - private static final int INSTANCE_SIZE = instanceSize(BytePositionsAppender.class); - private static final Block NULL_VALUE_BLOCK = new ByteArrayBlock(1, Optional.of(new boolean[] {true}), new byte[1]); - - private boolean initialized; - private int initialEntryCount; - - private int positionCount; - private boolean hasNullValue; - private boolean hasNonNullValue; - - // it is assumed that these arrays are the same length - private boolean[] valueIsNull = new boolean[0]; - private byte[] values = new byte[0]; - - private long retainedSizeInBytes; - private long sizeInBytes; - - public BytePositionsAppender(int expectedEntries) - { - this.initialEntryCount = max(expectedEntries, 1); - - updateRetainedSize(); - } - - @Override - public void append(IntArrayList positions, ValueBlock block) - { - checkArgument(block instanceof ByteArrayBlock, "Block must be instance of %s", ByteArrayBlock.class); - - if (positions.isEmpty()) { - return; - } - int[] positionArray = positions.elements(); - int positionsSize = positions.size(); - ensureCapacity(positionCount + positionsSize); - - if (block.mayHaveNull()) { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - boolean isNull = block.isNull(position); - int positionIndex = positionCount + i; - if (isNull) { - valueIsNull[positionIndex] = true; - hasNullValue = true; - } - else { - values[positionIndex] = block.getByte(position, 0); - hasNonNullValue = true; - } - } - positionCount += positionsSize; - } - else { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - values[positionCount + i] = block.getByte(position, 0); - } - positionCount += positionsSize; - hasNonNullValue = true; - } - - updateSize(positionsSize); - } - - @Override - public void appendRle(ValueBlock block, int rlePositionCount) - { - checkArgument(block instanceof ByteArrayBlock, "Block must be instance of %s", ByteArrayBlock.class); - - if (rlePositionCount == 0) { - return; - } - int sourcePosition = 0; - ensureCapacity(positionCount + rlePositionCount); - if (block.isNull(sourcePosition)) { - Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); - hasNullValue = true; - } - else { - byte value = block.getByte(sourcePosition, 0); - Arrays.fill(values, positionCount, positionCount + rlePositionCount, value); - hasNonNullValue = true; - } - positionCount += rlePositionCount; - - updateSize(rlePositionCount); - } - - @Override - public void append(int sourcePosition, ValueBlock source) - { - checkArgument(source instanceof ByteArrayBlock, "Block must be instance of %s", ByteArrayBlock.class); - - ensureCapacity(positionCount + 1); - if (source.isNull(sourcePosition)) { - valueIsNull[positionCount] = true; - hasNullValue = true; - } - else { - values[positionCount] = source.getByte(sourcePosition, 0); - hasNonNullValue = true; - } - positionCount++; - - updateSize(1); - } - - @Override - public Block build() - { - Block result; - if (hasNonNullValue) { - result = new ByteArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); - } - else { - result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount); - } - reset(); - return result; - } - - @Override - public long getRetainedSizeInBytes() - { - return retainedSizeInBytes; - } - - @Override - public long getSizeInBytes() - { - return sizeInBytes; - } - - @Override - public void reset() - { - initialEntryCount = calculateBlockResetSize(positionCount); - initialized = false; - valueIsNull = new boolean[0]; - values = new byte[0]; - positionCount = 0; - sizeInBytes = 0; - hasNonNullValue = false; - hasNullValue = false; - updateRetainedSize(); - } - - private void ensureCapacity(int capacity) - { - if (values.length >= capacity) { - return; - } - - int newSize; - if (initialized) { - newSize = calculateNewArraySize(values.length); - } - else { - newSize = initialEntryCount; - initialized = true; - } - newSize = max(newSize, capacity); - - valueIsNull = Arrays.copyOf(valueIsNull, newSize); - values = Arrays.copyOf(values, newSize); - updateRetainedSize(); - } - - private void updateSize(long positionsSize) - { - sizeInBytes += ByteArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; - } - - private void updateRetainedSize() - { - retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); - } -} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/Fixed12PositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/Fixed12PositionsAppender.java deleted file mode 100644 index 685db0271293..000000000000 --- a/core/trino-main/src/main/java/io/trino/operator/output/Fixed12PositionsAppender.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator.output; - -import io.trino.spi.block.Block; -import io.trino.spi.block.Fixed12Block; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.block.ValueBlock; -import it.unimi.dsi.fastutil.ints.IntArrayList; - -import java.util.Arrays; -import java.util.Optional; - -import static com.google.common.base.Preconditions.checkArgument; -import static io.airlift.slice.SizeOf.SIZE_OF_INT; -import static io.airlift.slice.SizeOf.instanceSize; -import static io.airlift.slice.SizeOf.sizeOf; -import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; -import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; -import static java.lang.Math.max; - -public class Fixed12PositionsAppender - implements PositionsAppender -{ - private static final int INSTANCE_SIZE = instanceSize(Fixed12PositionsAppender.class); - private static final Block NULL_VALUE_BLOCK = new Fixed12Block(1, Optional.of(new boolean[] {true}), new int[3]); - - private boolean initialized; - private int initialEntryCount; - - private int positionCount; - private boolean hasNullValue; - private boolean hasNonNullValue; - - // it is assumed that these arrays are the same length - private boolean[] valueIsNull = new boolean[0]; - private int[] values = new int[0]; - - private long retainedSizeInBytes; - private long sizeInBytes; - - public Fixed12PositionsAppender(int expectedEntries) - { - this.initialEntryCount = max(expectedEntries, 1); - - updateRetainedSize(); - } - - @Override - public void append(IntArrayList positions, ValueBlock block) - { - checkArgument(block instanceof Fixed12Block, "Block must be instance of %s", Fixed12Block.class); - - if (positions.isEmpty()) { - return; - } - int[] positionArray = positions.elements(); - int positionsSize = positions.size(); - ensureCapacity(positionCount + positionsSize); - - if (block.mayHaveNull()) { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - boolean isNull = block.isNull(position); - if (isNull) { - valueIsNull[positionCount + i] = true; - hasNullValue = true; - } - else { - int valuesIndex = (positionCount + i) * 3; - values[valuesIndex] = block.getInt(position, 0); - values[valuesIndex + 1] = block.getInt(position, SIZE_OF_INT); - values[valuesIndex + 2] = block.getInt(position, SIZE_OF_INT + SIZE_OF_INT); - hasNonNullValue = true; - } - } - positionCount += positionsSize; - } - else { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - int valuesIndex = (positionCount + i) * 3; - values[valuesIndex] = block.getInt(position, 0); - values[valuesIndex + 1] = block.getInt(position, SIZE_OF_INT); - values[valuesIndex + 2] = block.getInt(position, SIZE_OF_INT + SIZE_OF_INT); - } - positionCount += positionsSize; - hasNonNullValue = true; - } - - updateSize(positionsSize); - } - - @Override - public void appendRle(ValueBlock block, int rlePositionCount) - { - checkArgument(block instanceof Fixed12Block, "Block must be instance of %s", Fixed12Block.class); - - if (rlePositionCount == 0) { - return; - } - int sourcePosition = 0; - ensureCapacity(positionCount + rlePositionCount); - if (block.isNull(sourcePosition)) { - Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); - hasNullValue = true; - } - else { - int valueHigh = block.getInt(sourcePosition, 0); - int valueMid = block.getInt(sourcePosition, SIZE_OF_INT); - int valueLow = block.getInt(sourcePosition, SIZE_OF_INT + SIZE_OF_INT); - int positionIndex = positionCount * 3; - for (int i = 0; i < rlePositionCount; i++) { - values[positionIndex] = valueHigh; - values[positionIndex + 1] = valueMid; - values[positionIndex + 2] = valueLow; - positionIndex += 3; - } - hasNonNullValue = true; - } - positionCount += rlePositionCount; - - updateSize(rlePositionCount); - } - - @Override - public void append(int sourcePosition, ValueBlock source) - { - checkArgument(source instanceof Fixed12Block, "Block must be instance of %s", Fixed12Block.class); - - ensureCapacity(positionCount + 1); - if (source.isNull(sourcePosition)) { - valueIsNull[positionCount] = true; - hasNullValue = true; - } - else { - int positionIndex = positionCount * 3; - values[positionIndex] = source.getInt(sourcePosition, 0); - values[positionIndex + 1] = source.getInt(sourcePosition, SIZE_OF_INT); - values[positionIndex + 2] = source.getInt(sourcePosition, SIZE_OF_INT + SIZE_OF_INT); - hasNonNullValue = true; - } - positionCount++; - - updateSize(1); - } - - @Override - public Block build() - { - Block result; - if (hasNonNullValue) { - result = new Fixed12Block(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); - } - else { - result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount); - } - reset(); - return result; - } - - @Override - public long getRetainedSizeInBytes() - { - return retainedSizeInBytes; - } - - @Override - public long getSizeInBytes() - { - return sizeInBytes; - } - - @Override - public void reset() - { - initialEntryCount = calculateBlockResetSize(positionCount); - initialized = false; - valueIsNull = new boolean[0]; - values = new int[0]; - positionCount = 0; - sizeInBytes = 0; - hasNonNullValue = false; - hasNullValue = false; - updateRetainedSize(); - } - - private void ensureCapacity(int capacity) - { - if (valueIsNull.length >= capacity) { - return; - } - - int newSize; - if (initialized) { - newSize = calculateNewArraySize(valueIsNull.length); - } - else { - newSize = initialEntryCount; - initialized = true; - } - newSize = max(newSize, capacity); - - valueIsNull = Arrays.copyOf(valueIsNull, newSize); - values = Arrays.copyOf(values, newSize * 3); - updateRetainedSize(); - } - - private void updateSize(long positionsSize) - { - sizeInBytes += Fixed12Block.SIZE_IN_BYTES_PER_POSITION * positionsSize; - } - - private void updateRetainedSize() - { - retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); - } -} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java deleted file mode 100644 index 251a7f25eff4..000000000000 --- a/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator.output; - -import io.trino.spi.block.Block; -import io.trino.spi.block.Int128ArrayBlock; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.block.ValueBlock; -import it.unimi.dsi.fastutil.ints.IntArrayList; - -import java.util.Arrays; -import java.util.Optional; - -import static com.google.common.base.Preconditions.checkArgument; -import static io.airlift.slice.SizeOf.SIZE_OF_LONG; -import static io.airlift.slice.SizeOf.instanceSize; -import static io.airlift.slice.SizeOf.sizeOf; -import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; -import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; -import static java.lang.Math.max; - -public class Int128PositionsAppender - implements PositionsAppender -{ - private static final int INSTANCE_SIZE = instanceSize(Int128PositionsAppender.class); - private static final Block NULL_VALUE_BLOCK = new Int128ArrayBlock(1, Optional.of(new boolean[] {true}), new long[2]); - - private boolean initialized; - private int initialEntryCount; - - private int positionCount; - private boolean hasNullValue; - private boolean hasNonNullValue; - - // it is assumed that these arrays are the same length - private boolean[] valueIsNull = new boolean[0]; - private long[] values = new long[0]; - - private long retainedSizeInBytes; - private long sizeInBytes; - - public Int128PositionsAppender(int expectedEntries) - { - this.initialEntryCount = max(expectedEntries, 1); - - updateRetainedSize(); - } - - @Override - public void append(IntArrayList positions, ValueBlock block) - { - checkArgument(block instanceof Int128ArrayBlock, "Block must be instance of %s", Int128ArrayBlock.class); - - if (positions.isEmpty()) { - return; - } - int[] positionArray = positions.elements(); - int positionsSize = positions.size(); - ensureCapacity(positionCount + positionsSize); - - if (block.mayHaveNull()) { - int positionIndex = positionCount * 2; - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - boolean isNull = block.isNull(position); - - if (isNull) { - valueIsNull[positionCount + i] = true; - hasNullValue = true; - } - else { - values[positionIndex] = block.getLong(position, 0); - values[positionIndex + 1] = block.getLong(position, SIZE_OF_LONG); - hasNonNullValue = true; - } - positionIndex += 2; - } - positionCount += positionsSize; - } - else { - int positionIndex = positionCount * 2; - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - values[positionIndex] = block.getLong(position, 0); - values[positionIndex + 1] = block.getLong(position, SIZE_OF_LONG); - positionIndex += 2; - } - positionCount += positionsSize; - hasNonNullValue = true; - } - - updateSize(positionsSize); - } - - @Override - public void appendRle(ValueBlock block, int rlePositionCount) - { - checkArgument(block instanceof Int128ArrayBlock, "Block must be instance of %s", Int128ArrayBlock.class); - - if (rlePositionCount == 0) { - return; - } - int sourcePosition = 0; - ensureCapacity(positionCount + rlePositionCount); - if (block.isNull(sourcePosition)) { - Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); - hasNullValue = true; - } - else { - long valueHigh = block.getLong(sourcePosition, 0); - long valueLow = block.getLong(sourcePosition, SIZE_OF_LONG); - int positionIndex = positionCount * 2; - for (int i = 0; i < rlePositionCount; i++) { - values[positionIndex] = valueHigh; - values[positionIndex + 1] = valueLow; - positionIndex += 2; - } - hasNonNullValue = true; - } - positionCount += rlePositionCount; - - updateSize(rlePositionCount); - } - - @Override - public void append(int sourcePosition, ValueBlock source) - { - checkArgument(source instanceof Int128ArrayBlock, "Block must be instance of %s", Int128ArrayBlock.class); - - ensureCapacity(positionCount + 1); - if (source.isNull(sourcePosition)) { - valueIsNull[positionCount] = true; - hasNullValue = true; - } - else { - int positionIndex = positionCount * 2; - values[positionIndex] = source.getLong(sourcePosition, 0); - values[positionIndex + 1] = source.getLong(sourcePosition, SIZE_OF_LONG); - hasNonNullValue = true; - } - positionCount++; - - updateSize(1); - } - - @Override - public Block build() - { - Block result; - if (hasNonNullValue) { - result = new Int128ArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); - } - else { - result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount); - } - reset(); - return result; - } - - @Override - public long getRetainedSizeInBytes() - { - return retainedSizeInBytes; - } - - @Override - public long getSizeInBytes() - { - return sizeInBytes; - } - - @Override - public void reset() - { - initialEntryCount = calculateBlockResetSize(positionCount); - initialized = false; - valueIsNull = new boolean[0]; - values = new long[0]; - positionCount = 0; - sizeInBytes = 0; - hasNonNullValue = false; - hasNullValue = false; - updateRetainedSize(); - } - - private void ensureCapacity(int capacity) - { - if (valueIsNull.length >= capacity) { - return; - } - - int newSize; - if (initialized) { - newSize = calculateNewArraySize(valueIsNull.length); - } - else { - newSize = initialEntryCount; - initialized = true; - } - newSize = max(newSize, capacity); - - valueIsNull = Arrays.copyOf(valueIsNull, newSize); - values = Arrays.copyOf(values, newSize * 2); - updateRetainedSize(); - } - - private void updateSize(long positionsSize) - { - sizeInBytes += Int128ArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; - } - - private void updateRetainedSize() - { - retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); - } -} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java deleted file mode 100644 index bcb7d73b046d..000000000000 --- a/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator.output; - -import io.trino.spi.block.Block; -import io.trino.spi.block.IntArrayBlock; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.block.ValueBlock; -import it.unimi.dsi.fastutil.ints.IntArrayList; - -import java.util.Arrays; -import java.util.Optional; - -import static com.google.common.base.Preconditions.checkArgument; -import static io.airlift.slice.SizeOf.instanceSize; -import static io.airlift.slice.SizeOf.sizeOf; -import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; -import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; -import static java.lang.Math.max; - -public class IntPositionsAppender - implements PositionsAppender -{ - private static final int INSTANCE_SIZE = instanceSize(IntPositionsAppender.class); - private static final Block NULL_VALUE_BLOCK = new IntArrayBlock(1, Optional.of(new boolean[] {true}), new int[1]); - - private boolean initialized; - private int initialEntryCount; - - private int positionCount; - private boolean hasNullValue; - private boolean hasNonNullValue; - - // it is assumed that these arrays are the same length - private boolean[] valueIsNull = new boolean[0]; - private int[] values = new int[0]; - - private long retainedSizeInBytes; - private long sizeInBytes; - - public IntPositionsAppender(int expectedEntries) - { - this.initialEntryCount = max(expectedEntries, 1); - - updateRetainedSize(); - } - - @Override - public void append(IntArrayList positions, ValueBlock block) - { - checkArgument(block instanceof IntArrayBlock, "Block must be instance of %s", IntArrayBlock.class); - - if (positions.isEmpty()) { - return; - } - int[] positionArray = positions.elements(); - int positionsSize = positions.size(); - ensureCapacity(positionCount + positionsSize); - - if (block.mayHaveNull()) { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - boolean isNull = block.isNull(position); - int positionIndex = positionCount + i; - if (isNull) { - valueIsNull[positionIndex] = true; - hasNullValue = true; - } - else { - values[positionIndex] = block.getInt(position, 0); - hasNonNullValue = true; - } - } - positionCount += positionsSize; - } - else { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - values[positionCount + i] = block.getInt(position, 0); - } - positionCount += positionsSize; - hasNonNullValue = true; - } - - updateSize(positionsSize); - } - - @Override - public void appendRle(ValueBlock block, int rlePositionCount) - { - checkArgument(block instanceof IntArrayBlock, "Block must be instance of %s", IntArrayBlock.class); - - if (rlePositionCount == 0) { - return; - } - int sourcePosition = 0; - ensureCapacity(positionCount + rlePositionCount); - if (block.isNull(sourcePosition)) { - Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); - hasNullValue = true; - } - else { - int value = block.getInt(sourcePosition, 0); - Arrays.fill(values, positionCount, positionCount + rlePositionCount, value); - hasNonNullValue = true; - } - positionCount += rlePositionCount; - - updateSize(rlePositionCount); - } - - @Override - public void append(int sourcePosition, ValueBlock source) - { - checkArgument(source instanceof IntArrayBlock, "Block must be instance of %s", IntArrayBlock.class); - - ensureCapacity(positionCount + 1); - if (source.isNull(sourcePosition)) { - valueIsNull[positionCount] = true; - hasNullValue = true; - } - else { - values[positionCount] = source.getInt(sourcePosition, 0); - hasNonNullValue = true; - } - positionCount++; - - updateSize(1); - } - - @Override - public Block build() - { - Block result; - if (hasNonNullValue) { - result = new IntArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); - } - else { - result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount); - } - reset(); - return result; - } - - @Override - public long getRetainedSizeInBytes() - { - return retainedSizeInBytes; - } - - @Override - public long getSizeInBytes() - { - return sizeInBytes; - } - - @Override - public void reset() - { - initialEntryCount = calculateBlockResetSize(positionCount); - initialized = false; - valueIsNull = new boolean[0]; - values = new int[0]; - positionCount = 0; - sizeInBytes = 0; - hasNonNullValue = false; - hasNullValue = false; - updateRetainedSize(); - } - - private void ensureCapacity(int capacity) - { - if (values.length >= capacity) { - return; - } - - int newSize; - if (initialized) { - newSize = calculateNewArraySize(values.length); - } - else { - newSize = initialEntryCount; - initialized = true; - } - newSize = max(newSize, capacity); - - valueIsNull = Arrays.copyOf(valueIsNull, newSize); - values = Arrays.copyOf(values, newSize); - updateRetainedSize(); - } - - private void updateSize(long positionsSize) - { - sizeInBytes += IntArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; - } - - private void updateRetainedSize() - { - retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); - } -} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java deleted file mode 100644 index 6fc555f02a01..000000000000 --- a/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator.output; - -import io.trino.spi.block.Block; -import io.trino.spi.block.LongArrayBlock; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.block.ValueBlock; -import it.unimi.dsi.fastutil.ints.IntArrayList; - -import java.util.Arrays; -import java.util.Optional; - -import static com.google.common.base.Preconditions.checkArgument; -import static io.airlift.slice.SizeOf.instanceSize; -import static io.airlift.slice.SizeOf.sizeOf; -import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; -import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; -import static java.lang.Math.max; - -public class LongPositionsAppender - implements PositionsAppender -{ - private static final int INSTANCE_SIZE = instanceSize(LongPositionsAppender.class); - private static final Block NULL_VALUE_BLOCK = new LongArrayBlock(1, Optional.of(new boolean[] {true}), new long[1]); - - private boolean initialized; - private int initialEntryCount; - - private int positionCount; - private boolean hasNullValue; - private boolean hasNonNullValue; - - // it is assumed that these arrays are the same length - private boolean[] valueIsNull = new boolean[0]; - private long[] values = new long[0]; - - private long retainedSizeInBytes; - private long sizeInBytes; - - public LongPositionsAppender(int expectedEntries) - { - this.initialEntryCount = max(expectedEntries, 1); - - updateRetainedSize(); - } - - @Override - public void append(IntArrayList positions, ValueBlock block) - { - checkArgument(block instanceof LongArrayBlock, "Block must be instance of %s", LongArrayBlock.class); - - if (positions.isEmpty()) { - return; - } - int[] positionArray = positions.elements(); - int positionsSize = positions.size(); - ensureCapacity(positionCount + positionsSize); - - if (block.mayHaveNull()) { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - int positionIndex = positionCount + i; - boolean isNull = block.isNull(position); - if (isNull) { - valueIsNull[positionIndex] = true; - hasNullValue = true; - } - else { - values[positionIndex] = block.getLong(position, 0); - hasNonNullValue = true; - } - } - positionCount += positionsSize; - } - else { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - values[positionCount + i] = block.getLong(position, 0); - } - positionCount += positionsSize; - hasNonNullValue = true; - } - - updateSize(positionsSize); - } - - @Override - public void appendRle(ValueBlock block, int rlePositionCount) - { - checkArgument(block instanceof LongArrayBlock, "Block must be instance of %s", LongArrayBlock.class); - - if (rlePositionCount == 0) { - return; - } - int sourcePosition = 0; - ensureCapacity(positionCount + rlePositionCount); - if (block.isNull(sourcePosition)) { - Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); - hasNullValue = true; - } - else { - long value = block.getLong(sourcePosition, 0); - Arrays.fill(values, positionCount, positionCount + rlePositionCount, value); - hasNonNullValue = true; - } - positionCount += rlePositionCount; - - updateSize(rlePositionCount); - } - - @Override - public void append(int sourcePosition, ValueBlock source) - { - checkArgument(source instanceof LongArrayBlock, "Block must be instance of %s", LongArrayBlock.class); - - ensureCapacity(positionCount + 1); - if (source.isNull(sourcePosition)) { - valueIsNull[positionCount] = true; - hasNullValue = true; - } - else { - values[positionCount] = source.getLong(sourcePosition, 0); - hasNonNullValue = true; - } - positionCount++; - - updateSize(1); - } - - @Override - public Block build() - { - Block result; - if (hasNonNullValue) { - result = new LongArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); - } - else { - result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount); - } - reset(); - return result; - } - - @Override - public long getRetainedSizeInBytes() - { - return retainedSizeInBytes; - } - - @Override - public long getSizeInBytes() - { - return sizeInBytes; - } - - @Override - public void reset() - { - initialEntryCount = calculateBlockResetSize(positionCount); - initialized = false; - valueIsNull = new boolean[0]; - values = new long[0]; - positionCount = 0; - sizeInBytes = 0; - hasNonNullValue = false; - hasNullValue = false; - updateRetainedSize(); - } - - private void ensureCapacity(int capacity) - { - if (values.length >= capacity) { - return; - } - - int newSize; - if (initialized) { - newSize = calculateNewArraySize(values.length); - } - else { - newSize = initialEntryCount; - initialized = true; - } - newSize = max(newSize, capacity); - - valueIsNull = Arrays.copyOf(valueIsNull, newSize); - values = Arrays.copyOf(values, newSize); - updateRetainedSize(); - } - - private void updateSize(long positionsSize) - { - sizeInBytes += LongArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; - } - - private void updateRetainedSize() - { - retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); - } -} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java index 34eab30e020e..e972baad6d22 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java @@ -13,14 +13,9 @@ */ package io.trino.operator.output; -import io.trino.spi.block.ByteArrayBlock; -import io.trino.spi.block.Fixed12Block; -import io.trino.spi.block.Int128ArrayBlock; -import io.trino.spi.block.IntArrayBlock; -import io.trino.spi.block.LongArrayBlock; import io.trino.spi.block.RowBlock; -import io.trino.spi.block.ShortArrayBlock; import io.trino.spi.block.VariableWidthBlock; +import io.trino.spi.block.VariableWidthBlockBuilder; import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.type.BlockTypeOperators; @@ -28,11 +23,14 @@ import java.util.Optional; +import static io.trino.operator.output.PositionsAppenderUtil.MAX_ARRAY_SIZE; +import static java.lang.Math.min; import static java.util.Objects.requireNonNull; public class PositionsAppenderFactory { private final BlockTypeOperators blockTypeOperators; + private static final int EXPECTED_VARIABLE_WIDTH_BYTES_PER_ENTRY = 32; public PositionsAppenderFactory(BlockTypeOperators blockTypeOperators) { @@ -50,30 +48,15 @@ public UnnestingPositionsAppender create(Type type, int expectedPositions, long private PositionsAppender createPrimitiveAppender(Type type, int expectedPositions, long maxPageSizeInBytes) { - if (type.getValueBlockType() == ByteArrayBlock.class) { - return new BytePositionsAppender(expectedPositions); - } - if (type.getValueBlockType() == ShortArrayBlock.class) { - return new ShortPositionsAppender(expectedPositions); - } - if (type.getValueBlockType() == IntArrayBlock.class) { - return new IntPositionsAppender(expectedPositions); - } - if (type.getValueBlockType() == LongArrayBlock.class) { - return new LongPositionsAppender(expectedPositions); - } - if (type.getValueBlockType() == Fixed12Block.class) { - return new Fixed12PositionsAppender(expectedPositions); - } - if (type.getValueBlockType() == Int128ArrayBlock.class) { - return new Int128PositionsAppender(expectedPositions); - } - if (type.getValueBlockType() == VariableWidthBlock.class) { - return new SlicePositionsAppender(expectedPositions, maxPageSizeInBytes); - } if (type.getValueBlockType() == RowBlock.class) { return RowPositionsAppender.createRowAppender(this, (RowType) type, expectedPositions, maxPageSizeInBytes); } + if (type.getValueBlockType() == VariableWidthBlock.class) { + // it is guaranteed Math.min will not overflow; safe to cast + int expectedBytes = (int) min((long) expectedPositions * EXPECTED_VARIABLE_WIDTH_BYTES_PER_ENTRY, maxPageSizeInBytes); + expectedBytes = min(expectedBytes, MAX_ARRAY_SIZE); + return new TypedPositionsAppender(new VariableWidthBlockBuilder(null, expectedPositions, expectedBytes)); + } return new TypedPositionsAppender(type, expectedPositions); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderUtil.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderUtil.java index 0d1d6b642096..b6efce94d2f1 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderUtil.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderUtil.java @@ -24,9 +24,7 @@ final class PositionsAppenderUtil // See java.util.ArrayList for an explanation static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; - private PositionsAppenderUtil() - { - } + private PositionsAppenderUtil() {} // Copied from io.trino.spi.block.BlockUtil#calculateNewArraySize static int calculateNewArraySize(int currentSize) @@ -61,14 +59,4 @@ else if (newSize > MAX_ARRAY_SIZE) { } return (int) newSize; } - - // Copied from io.trino.spi.block.BlockUtil#calculateBlockResetBytes - static int calculateBlockResetBytes(int currentBytes) - { - long newBytes = (long) ceil(currentBytes * BLOCK_RESET_SKEW); - if (newBytes > MAX_ARRAY_SIZE) { - return MAX_ARRAY_SIZE; - } - return (int) newBytes; - } } diff --git a/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java deleted file mode 100644 index 16739ae1ea04..000000000000 --- a/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator.output; - -import io.trino.spi.block.Block; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.block.ShortArrayBlock; -import io.trino.spi.block.ValueBlock; -import it.unimi.dsi.fastutil.ints.IntArrayList; - -import java.util.Arrays; -import java.util.Optional; - -import static com.google.common.base.Preconditions.checkArgument; -import static io.airlift.slice.SizeOf.instanceSize; -import static io.airlift.slice.SizeOf.sizeOf; -import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; -import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; -import static java.lang.Math.max; - -public class ShortPositionsAppender - implements PositionsAppender -{ - private static final int INSTANCE_SIZE = instanceSize(ShortPositionsAppender.class); - private static final Block NULL_VALUE_BLOCK = new ShortArrayBlock(1, Optional.of(new boolean[] {true}), new short[1]); - - private boolean initialized; - private int initialEntryCount; - - private int positionCount; - private boolean hasNullValue; - private boolean hasNonNullValue; - - // it is assumed that these arrays are the same length - private boolean[] valueIsNull = new boolean[0]; - private short[] values = new short[0]; - - private long retainedSizeInBytes; - private long sizeInBytes; - - public ShortPositionsAppender(int expectedEntries) - { - this.initialEntryCount = max(expectedEntries, 1); - - updateRetainedSize(); - } - - @Override - public void append(IntArrayList positions, ValueBlock block) - { - checkArgument(block instanceof ShortArrayBlock, "Block must be instance of %s", ShortArrayBlock.class); - - if (positions.isEmpty()) { - return; - } - int[] positionArray = positions.elements(); - int positionsSize = positions.size(); - ensureCapacity(positionCount + positionsSize); - - if (block.mayHaveNull()) { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - boolean isNull = block.isNull(position); - int positionIndex = positionCount + i; - if (isNull) { - valueIsNull[positionIndex] = true; - hasNullValue = true; - } - else { - values[positionIndex] = block.getShort(position, 0); - hasNonNullValue = true; - } - } - positionCount += positionsSize; - } - else { - for (int i = 0; i < positionsSize; i++) { - int position = positionArray[i]; - values[positionCount + i] = block.getShort(position, 0); - } - positionCount += positionsSize; - hasNonNullValue = true; - } - - updateSize(positionsSize); - } - - @Override - public void appendRle(ValueBlock block, int rlePositionCount) - { - checkArgument(block instanceof ShortArrayBlock, "Block must be instance of %s", ShortArrayBlock.class); - - if (rlePositionCount == 0) { - return; - } - int sourcePosition = 0; - ensureCapacity(positionCount + rlePositionCount); - if (block.isNull(sourcePosition)) { - Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); - hasNullValue = true; - } - else { - short value = block.getShort(sourcePosition, 0); - Arrays.fill(values, positionCount, positionCount + rlePositionCount, value); - hasNonNullValue = true; - } - positionCount += rlePositionCount; - - updateSize(rlePositionCount); - } - - @Override - public void append(int sourcePosition, ValueBlock source) - { - checkArgument(source instanceof ShortArrayBlock, "Block must be instance of %s", ShortArrayBlock.class); - - ensureCapacity(positionCount + 1); - if (source.isNull(sourcePosition)) { - valueIsNull[positionCount] = true; - hasNullValue = true; - } - else { - values[positionCount] = source.getShort(sourcePosition, 0); - hasNonNullValue = true; - } - positionCount++; - - updateSize(1); - } - - @Override - public Block build() - { - Block result; - if (hasNonNullValue) { - result = new ShortArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); - } - else { - result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount); - } - reset(); - return result; - } - - @Override - public long getRetainedSizeInBytes() - { - return retainedSizeInBytes; - } - - @Override - public long getSizeInBytes() - { - return sizeInBytes; - } - - @Override - public void reset() - { - initialEntryCount = calculateBlockResetSize(positionCount); - initialized = false; - valueIsNull = new boolean[0]; - values = new short[0]; - positionCount = 0; - sizeInBytes = 0; - hasNonNullValue = false; - hasNullValue = false; - updateRetainedSize(); - } - - private void ensureCapacity(int capacity) - { - if (values.length >= capacity) { - return; - } - - int newSize; - if (initialized) { - newSize = calculateNewArraySize(values.length); - } - else { - newSize = initialEntryCount; - initialized = true; - } - newSize = max(newSize, capacity); - - valueIsNull = Arrays.copyOf(valueIsNull, newSize); - values = Arrays.copyOf(values, newSize); - updateRetainedSize(); - } - - private void updateSize(long positionsSize) - { - sizeInBytes += ShortArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; - } - - private void updateRetainedSize() - { - retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); - } -} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java deleted file mode 100644 index 204cf679520c..000000000000 --- a/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java +++ /dev/null @@ -1,333 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator.output; - -import com.google.common.annotations.VisibleForTesting; -import io.airlift.slice.Slice; -import io.airlift.slice.Slices; -import io.trino.spi.block.Block; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.block.ValueBlock; -import io.trino.spi.block.VariableWidthBlock; -import it.unimi.dsi.fastutil.ints.IntArrayList; - -import java.util.Arrays; -import java.util.Optional; - -import static com.google.common.base.Preconditions.checkArgument; -import static io.airlift.slice.SizeOf.SIZE_OF_BYTE; -import static io.airlift.slice.SizeOf.SIZE_OF_INT; -import static io.airlift.slice.SizeOf.instanceSize; -import static io.airlift.slice.SizeOf.sizeOf; -import static io.airlift.slice.Slices.EMPTY_SLICE; -import static io.trino.operator.output.PositionsAppenderUtil.MAX_ARRAY_SIZE; -import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetBytes; -import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; -import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; -import static java.lang.Math.min; -import static java.lang.Math.toIntExact; - -public class SlicePositionsAppender - implements PositionsAppender -{ - private static final int EXPECTED_BYTES_PER_ENTRY = 32; - private static final int INSTANCE_SIZE = instanceSize(SlicePositionsAppender.class); - private static final Block NULL_VALUE_BLOCK = new VariableWidthBlock(1, EMPTY_SLICE, new int[] {0, 0}, Optional.of(new boolean[] {true})); - - private boolean initialized; - private int initialEntryCount; - private int initialBytesSize; - - private byte[] bytes = new byte[0]; - - private boolean hasNullValue; - private boolean hasNonNullValue; - // it is assumed that the offset array is one position longer than the valueIsNull array - private boolean[] valueIsNull = new boolean[0]; - private int[] offsets = new int[1]; - - private int positionCount; - - private long retainedSizeInBytes; - private long sizeInBytes; - - public SlicePositionsAppender(int expectedEntries, long maxPageSizeInBytes) - { - this(expectedEntries, getExpectedBytes(maxPageSizeInBytes, expectedEntries)); - } - - public SlicePositionsAppender(int expectedEntries, int expectedBytes) - { - initialEntryCount = expectedEntries; - initialBytesSize = min(expectedBytes, MAX_ARRAY_SIZE); - - updateRetainedSize(); - } - - @Override - public void append(IntArrayList positions, ValueBlock block) - { - checkArgument(block instanceof VariableWidthBlock, "Block must be instance of %s", VariableWidthBlock.class); - - if (positions.isEmpty()) { - return; - } - ensurePositionCapacity(positionCount + positions.size()); - VariableWidthBlock variableWidthBlock = (VariableWidthBlock) block; - int newByteCount = 0; - int[] lengths = new int[positions.size()]; - int[] sourceOffsets = new int[positions.size()]; - int[] positionArray = positions.elements(); - - if (block.mayHaveNull()) { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - int length = variableWidthBlock.getSliceLength(position); - lengths[i] = length; - sourceOffsets[i] = variableWidthBlock.getRawSliceOffset(position); - newByteCount += length; - boolean isNull = block.isNull(position); - valueIsNull[positionCount + i] = isNull; - offsets[positionCount + i + 1] = offsets[positionCount + i] + length; - hasNullValue |= isNull; - hasNonNullValue |= !isNull; - } - } - else { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - int length = variableWidthBlock.getSliceLength(position); - lengths[i] = length; - sourceOffsets[i] = variableWidthBlock.getRawSliceOffset(position); - newByteCount += length; - offsets[positionCount + i + 1] = offsets[positionCount + i] + length; - } - hasNonNullValue = true; - } - copyBytes(variableWidthBlock.getRawSlice(), lengths, sourceOffsets, positions.size(), newByteCount); - } - - @Override - public void appendRle(ValueBlock block, int rlePositionCount) - { - checkArgument(block instanceof VariableWidthBlock, "Block must be instance of %s", VariableWidthBlock.class); - - if (rlePositionCount == 0) { - return; - } - ensurePositionCapacity(positionCount + rlePositionCount); - if (block.isNull(0)) { - Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); - Arrays.fill(offsets, positionCount + 1, positionCount + rlePositionCount + 1, getCurrentOffset()); - positionCount += rlePositionCount; - - hasNullValue = true; - updateSize(rlePositionCount, 0); - } - else { - hasNonNullValue = true; - duplicateBytes(block.getSlice(0, 0, block.getSliceLength(0)), rlePositionCount); - } - } - - @Override - public void append(int position, ValueBlock source) - { - checkArgument(source instanceof VariableWidthBlock, "Block must be instance of %s but is %s".formatted(VariableWidthBlock.class, source.getClass())); - - ensurePositionCapacity(positionCount + 1); - if (source.isNull(position)) { - valueIsNull[positionCount] = true; - offsets[positionCount + 1] = getCurrentOffset(); - positionCount++; - - hasNullValue = true; - updateSize(1, 0); - } - else { - hasNonNullValue = true; - int currentOffset = getCurrentOffset(); - int sliceLength = source.getSliceLength(position); - Slice slice = source.getSlice(position, 0, sliceLength); - - ensureExtraBytesCapacity(sliceLength); - - slice.getBytes(0, bytes, currentOffset, sliceLength); - - offsets[positionCount + 1] = currentOffset + sliceLength; - - positionCount++; - updateSize(1, sliceLength); - } - } - - @Override - public Block build() - { - Block result; - if (hasNonNullValue) { - result = new VariableWidthBlock( - positionCount, - Slices.wrappedBuffer(bytes, 0, getCurrentOffset()), - offsets, - hasNullValue ? Optional.of(valueIsNull) : Optional.empty()); - } - else { - result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount); - } - reset(); - return result; - } - - @Override - public long getRetainedSizeInBytes() - { - return retainedSizeInBytes; - } - - @Override - public long getSizeInBytes() - { - return sizeInBytes; - } - - private void copyBytes(Slice rawSlice, int[] lengths, int[] sourceOffsets, int count, int newByteCount) - { - ensureExtraBytesCapacity(newByteCount); - - byte[] base = rawSlice.byteArray(); - int byteArrayOffset = rawSlice.byteArrayOffset(); - for (int i = 0; i < count; i++) { - System.arraycopy(base, byteArrayOffset + sourceOffsets[i], bytes, offsets[positionCount + i], lengths[i]); - } - - positionCount += count; - updateSize(count, newByteCount); - } - - /** - * Copy all bytes from {@code slice} to {@code count} consecutive positions in the {@link #bytes} array. - */ - private void duplicateBytes(Slice slice, int count) - { - int length = slice.length(); - int newByteCount = toIntExact((long) count * length); - int startOffset = getCurrentOffset(); - ensureExtraBytesCapacity(newByteCount); - - duplicateBytes(slice, bytes, startOffset, count); - - int currentStartOffset = startOffset + length; - for (int i = 0; i < count; i++) { - offsets[positionCount + i + 1] = currentStartOffset; - currentStartOffset += length; - } - - positionCount += count; - updateSize(count, newByteCount); - } - - /** - * Copy {@code length} bytes from {@code slice}, starting at offset {@code sourceOffset} to {@code count} consecutive positions in the {@link #bytes} array. - */ - @VisibleForTesting - static void duplicateBytes(Slice slice, byte[] bytes, int startOffset, int count) - { - int length = slice.length(); - if (length == 0) { - // nothing to copy - return; - } - // copy slice to the first position - slice.getBytes(0, bytes, startOffset, length); - int totalDuplicatedBytes = count * length; - int duplicatedBytes = length; - // copy every byte copied so far, doubling the number of bytes copied on evey iteration - while (duplicatedBytes * 2 <= totalDuplicatedBytes) { - System.arraycopy(bytes, startOffset, bytes, startOffset + duplicatedBytes, duplicatedBytes); - duplicatedBytes = duplicatedBytes * 2; - } - // copy the leftover - System.arraycopy(bytes, startOffset, bytes, startOffset + duplicatedBytes, totalDuplicatedBytes - duplicatedBytes); - } - - @Override - public void reset() - { - initialEntryCount = calculateBlockResetSize(positionCount); - initialBytesSize = calculateBlockResetBytes(getCurrentOffset()); - initialized = false; - valueIsNull = new boolean[0]; - offsets = new int[1]; - bytes = new byte[0]; - positionCount = 0; - sizeInBytes = 0; - hasNonNullValue = false; - hasNullValue = false; - updateRetainedSize(); - } - - private int getCurrentOffset() - { - return offsets[positionCount]; - } - - private void updateSize(long positionsSize, int bytesWritten) - { - sizeInBytes += (SIZE_OF_BYTE + SIZE_OF_INT) * positionsSize + bytesWritten; - } - - private void ensureExtraBytesCapacity(int extraBytesCapacity) - { - int totalBytesCapacity = getCurrentOffset() + extraBytesCapacity; - if (bytes.length < totalBytesCapacity) { - int newBytesLength = Math.max(bytes.length, initialBytesSize); - if (totalBytesCapacity > newBytesLength) { - newBytesLength = Math.max(totalBytesCapacity, calculateNewArraySize(newBytesLength)); - } - bytes = Arrays.copyOf(bytes, newBytesLength); - updateRetainedSize(); - } - } - - private void ensurePositionCapacity(int capacity) - { - if (valueIsNull.length < capacity) { - int newSize; - if (initialized) { - newSize = calculateNewArraySize(valueIsNull.length); - } - else { - newSize = initialEntryCount; - initialized = true; - } - newSize = Math.max(newSize, capacity); - - valueIsNull = Arrays.copyOf(valueIsNull, newSize); - offsets = Arrays.copyOf(offsets, newSize + 1); - updateRetainedSize(); - } - } - - private void updateRetainedSize() - { - retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(offsets) + sizeOf(bytes); - } - - private static int getExpectedBytes(long maxPageSizeInBytes, int expectedPositions) - { - // it is guaranteed Math.min will not overflow; safe to cast - return (int) min((long) expectedPositions * EXPECTED_BYTES_PER_ENTRY, maxPageSizeInBytes); - } -} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java index eb26ff3241b5..141c26811ca6 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java @@ -33,6 +33,11 @@ class TypedPositionsAppender this.blockBuilder = type.createBlockBuilder(null, expectedPositions); } + public TypedPositionsAppender(BlockBuilder blockBuilder) + { + this.blockBuilder = blockBuilder; + } + @Override public void append(IntArrayList positions, ValueBlock block) { diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestSlicePositionsAppender.java b/core/trino-main/src/test/java/io/trino/operator/output/TestSlicePositionsAppender.java deleted file mode 100644 index c90ffe5234b0..000000000000 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestSlicePositionsAppender.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator.output; - -import io.airlift.slice.Slice; -import io.airlift.slice.Slices; -import io.trino.spi.block.Block; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.block.ValueBlock; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; - -import static io.trino.block.BlockAssertions.assertBlockEquals; -import static io.trino.block.BlockAssertions.createStringsBlock; -import static io.trino.operator.output.SlicePositionsAppender.duplicateBytes; -import static io.trino.spi.type.VarcharType.VARCHAR; -import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; - -public class TestSlicePositionsAppender -{ - @Test - public void testAppendEmptySliceRle() - { - // test SlicePositionAppender.appendRle with empty value (Slice with length 0) - PositionsAppender positionsAppender = new SlicePositionsAppender(1, 100); - ValueBlock value = createStringsBlock(""); - positionsAppender.appendRle(value, 10); - - Block actualBlock = positionsAppender.build(); - - assertBlockEquals(VARCHAR, actualBlock, RunLengthEncodedBlock.create(value, 10)); - } - - @Test - public void testDuplicateZeroLength() - { - Slice slice = Slices.wrappedBuffer(); - byte[] target = new byte[] {-1}; - duplicateBytes(slice, target, 0, 100); - assertArrayEquals(new byte[] {-1}, target); - } - - @Test - public void testDuplicate1Byte() - { - Slice slice = Slices.wrappedBuffer(new byte[] {2}); - byte[] target = new byte[5]; - Arrays.fill(target, (byte) -1); - duplicateBytes(slice, target, 3, 2); - assertArrayEquals(new byte[] {-1, -1, -1, 2, 2}, target); - } - - @Test - public void testDuplicate2Bytes() - { - Slice slice = Slices.wrappedBuffer(new byte[] {1, 2}); - byte[] target = new byte[8]; - Arrays.fill(target, (byte) -1); - duplicateBytes(slice, target, 1, 3); - assertArrayEquals(new byte[] {-1, 1, 2, 1, 2, 1, 2, -1}, target); - } - - @Test - public void testDuplicate1Time() - { - Slice slice = Slices.wrappedBuffer(new byte[] {1, 2}); - byte[] target = new byte[8]; - Arrays.fill(target, (byte) -1); - - duplicateBytes(slice, target, 1, 1); - - assertArrayEquals(new byte[] {-1, 1, 2, -1, -1, -1, -1, -1}, target); - } - - @Test - public void testDuplicateMultipleBytesOffNumberOfTimes() - { - Slice slice = Slices.wrappedBuffer(new byte[] {5, 3, 1}); - byte[] target = new byte[17]; - Arrays.fill(target, (byte) -1); - - duplicateBytes(slice, target, 1, 5); - - assertArrayEquals(new byte[] {-1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, -1}, target); - } - - @Test - public void testDuplicateMultipleBytesEvenNumberOfTimes() - { - Slice slice = Slices.wrappedBuffer(new byte[] {5, 3, 1}); - byte[] target = new byte[20]; - Arrays.fill(target, (byte) -1); - - duplicateBytes(slice, target, 1, 6); - - assertArrayEquals(new byte[] {-1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, -1}, target); - } - - @Test - public void testDuplicateMultipleBytesPowerOfTwoNumberOfTimes() - { - Slice slice = Slices.wrappedBuffer(new byte[] {5, 3, 1}); - byte[] target = new byte[14]; - Arrays.fill(target, (byte) -1); - - duplicateBytes(slice, target, 1, 4); - - assertArrayEquals(new byte[] {-1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, -1}, target); - } -}