Skip to content

Commit

Permalink
[GLUTEN-7205] [VL] Optimize row to column for scalar type (#7206)
Browse files Browse the repository at this point in the history
Optimize current deserialize data one by one to batch deserialization, the performance will improve more than 3 times.
  • Loading branch information
jinchengchenghh authored Sep 12, 2024
1 parent 2cbf72d commit ce09169
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 1 deletion.
260 changes: 260 additions & 0 deletions cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,239 @@

using namespace facebook::velox;
namespace gluten {
namespace {

inline int64_t calculateBitSetWidthInBytes(int32_t numFields) {
return ((numFields + 63) / 64) * 8;
}

inline int64_t getFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) {
return nullBitsetWidthInBytes + 8L * index;
}

inline bool isNull(uint8_t* buffer_address, int32_t index) {
int64_t mask = 1L << (index & 0x3f); // mod 64 and shift
int64_t wordOffset = (index >> 6) * 8;
int64_t value = *((int64_t*)(buffer_address + wordOffset));
return (value & mask) != 0;
}

int32_t getTotalStringSize(
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress) {
size_t size = 0;
for (auto pos = 0; pos < numRows; pos++) {
if (isNull(memoryAddress + offsets[pos], columnIdx)) {
continue;
}

int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
int32_t length = static_cast<int32_t>(offsetAndSize);
if (!StringView::isInline(length)) {
size += length;
}
}
return size;
}

template <TypeKind Kind>
VectorPtr createFlatVector(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
using T = typename TypeTraits<Kind>::NativeType;
auto typeWidth = sizeof(T);
auto column = BaseVector::create<FlatVector<T>>(type, numRows, pool);
auto rawValues = column->template mutableRawValues<uint8_t>();
auto shift = __builtin_ctz((uint32_t)typeWidth);
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
const uint8_t* srcptr = (memoryAddress + offsets[pos] + fieldOffset);
uint8_t* destptr = rawValues + (pos << shift);
memcpy(destptr, srcptr, typeWidth);
} else {
column->setNull(pos, true);
}
}
return column;
}

template <>
VectorPtr createFlatVector<TypeKind::HUGEINT>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
auto column = BaseVector::create<FlatVector<int128_t>>(type, numRows, pool);
auto rawValues = column->mutableRawValues<uint8_t>();
auto typeWidth = sizeof(int128_t);
auto shift = __builtin_ctz((uint32_t)typeWidth);
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
uint8_t* destptr = rawValues + (pos << shift);
int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
int32_t length = static_cast<int32_t>(offsetAndSize);
int32_t wordoffset = static_cast<int32_t>(offsetAndSize >> 32);
uint8_t bytesValue[length];
memcpy(bytesValue, memoryAddress + offsets[pos] + wordoffset, length);
uint8_t bytesValue2[16]{};
for (int k = length - 1; k >= 0; k--) {
bytesValue2[length - 1 - k] = bytesValue[k];
}
if (int8_t(bytesValue[0]) < 0) {
memset(bytesValue2 + length, 255, 16 - length);
}
memcpy(destptr, bytesValue2, typeWidth);
} else {
column->setNull(pos, true);
}
}
return column;
}

template <>
VectorPtr createFlatVector<TypeKind::BOOLEAN>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
auto column = BaseVector::create<FlatVector<bool>>(type, numRows, pool);
auto rawValues = column->mutableRawValues<uint64_t>();
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
bool value = *(bool*)(memoryAddress + offsets[pos] + fieldOffset);
bits::setBit(rawValues, pos, value);
} else {
column->setNull(pos, true);
}
}
return column;
}

template <>
VectorPtr createFlatVector<TypeKind::TIMESTAMP>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
auto column = BaseVector::create<FlatVector<Timestamp>>(type, numRows, pool);
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
int64_t value = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
column->set(pos, Timestamp::fromMicros(value));
} else {
column->setNull(pos, true);
}
}
return column;
}

VectorPtr createFlatVectorStringView(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
auto column = BaseVector::create<FlatVector<StringView>>(type, numRows, pool);
auto size = getTotalStringSize(columnIdx, numRows, fieldOffset, offsets, memoryAddress);
char* rawBuffer = column->getRawStringBufferWithSpace(size, true);
for (auto pos = 0; pos < numRows; pos++) {
if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
int32_t length = static_cast<int32_t>(offsetAndSize);
int32_t wordoffset = static_cast<int32_t>(offsetAndSize >> 32);
auto valueSrcPtr = memoryAddress + offsets[pos] + wordoffset;
if (StringView::isInline(length)) {
column->set(pos, StringView(reinterpret_cast<char*>(valueSrcPtr), length));
} else {
memcpy(rawBuffer, valueSrcPtr, length);
column->setNoCopy(pos, StringView(rawBuffer, length));
rawBuffer += length;
}
} else {
column->setNull(pos, true);
}
}
return column;
}

template <>
VectorPtr createFlatVector<TypeKind::VARCHAR>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool);
}

template <>
VectorPtr createFlatVector<TypeKind::VARBINARY>(
const TypePtr& type,
int32_t columnIdx,
int32_t numRows,
int64_t fieldOffset,
std::vector<int64_t>& offsets,
uint8_t* memoryAddress,
memory::MemoryPool* pool) {
return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool);
}

template <>
VectorPtr createFlatVector<TypeKind::UNKNOWN>(
const TypePtr& /*type*/,
int32_t /*columnIdx*/,
int32_t numRows,
int64_t /*fieldOffset*/,
std::vector<int64_t>& /*offsets*/,
uint8_t* /*memoryAddress*/,
memory::MemoryPool* pool) {
auto nulls = allocateNulls(numRows, pool, bits::kNull);
return std::make_shared<FlatVector<UnknownValue>>(
pool,
UNKNOWN(),
nulls,
numRows,
nullptr, // values
std::vector<BufferPtr>{}); // stringBuffers
}

bool supporteType(const RowTypePtr rowType) {
for (auto i = 0; i < rowType->size(); i++) {
auto kind = rowType->childAt(i)->kind();
switch (kind) {
case TypeKind::ARRAY:
case TypeKind::MAP:
case TypeKind::ROW:
return false;
default:
break;
}
}
return true;
}

} // namespace
VeloxRowToColumnarConverter::VeloxRowToColumnarConverter(
struct ArrowSchema* cSchema,
std::shared_ptr<memory::MemoryPool> memoryPool)
Expand All @@ -32,6 +265,9 @@ VeloxRowToColumnarConverter::VeloxRowToColumnarConverter(

std::shared_ptr<ColumnarBatch>
VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) {
if (supporteType(asRowType(rowType_))) {
return convertPrimitive(numRows, rowLength, memoryAddress);
}
std::vector<std::optional<std::string_view>> data;
int64_t offset = 0;
for (auto i = 0; i < numRows; i++) {
Expand All @@ -41,4 +277,28 @@ VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_
auto vp = row::UnsafeRowDeserializer::deserialize(data, rowType_, pool_.get());
return std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<RowVector>(vp));
}

std::shared_ptr<ColumnarBatch>
VeloxRowToColumnarConverter::convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) {
auto numFields = rowType_->size();
int64_t nullBitsetWidthInBytes = calculateBitSetWidthInBytes(numFields);
std::vector<int64_t> offsets;
offsets.resize(numRows);
for (auto i = 1; i < numRows; i++) {
offsets[i] = offsets[i - 1] + rowLength[i - 1];
}

std::vector<VectorPtr> columns;
columns.resize(numFields);

for (auto i = 0; i < numFields; i++) {
auto fieldOffset = getFieldOffset(nullBitsetWidthInBytes, i);
auto& type = rowType_->childAt(i);
columns[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
createFlatVector, type->kind(), type, i, numRows, fieldOffset, offsets, memoryAddress, pool_.get());
}

auto rowVector = std::make_shared<RowVector>(pool_.get(), rowType_, BufferPtr(nullptr), numRows, std::move(columns));
return std::make_shared<VeloxColumnarBatch>(rowVector);
}
} // namespace gluten
3 changes: 2 additions & 1 deletion cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class VeloxRowToColumnarConverter final : public RowToColumnarConverter {

std::shared_ptr<ColumnarBatch> convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress);

protected:
private:
std::shared_ptr<ColumnarBatch> convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress);
facebook::velox::TypePtr rowType_;
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
};
Expand Down

0 comments on commit ce09169

Please sign in to comment.