Skip to content

Commit

Permalink
Convert Arrow REE arrays to Velox Vectors (facebookincubator#8777)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#8777

Allowing conversion of REE arrow arrays to Velox Vectors. If the REE
array has a single (or zero) runs, it is converted to a ConstanVector. In case
it has multiple runs, it is converted to a DictionaryVector to prevent the data
within the array from being flattened.

Reviewed By: Yuhta

Differential Revision: D53879037

fbshipit-source-id: 0f3178bfeb884376b45bc86cfa66104458779094
  • Loading branch information
pedroerp authored and facebook-github-bot committed Feb 23, 2024
1 parent f951ddc commit 0d02c8f
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 4 deletions.
67 changes: 67 additions & 0 deletions velox/vector/arrow/Bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,13 @@ TypePtr importFromArrowImpl(
return ROW(std::move(childNames), std::move(childTypes));
}

// Run-end-encoding (REE).
case 'r':
VELOX_CHECK_EQ(arrowSchema.n_children, 2);
VELOX_CHECK_NOT_NULL(arrowSchema.children[1]);
// The Velox type is the type of the `values` child.
return importFromArrow(*arrowSchema.children[1]);

default:
break;
}
Expand Down Expand Up @@ -1328,6 +1335,58 @@ VectorPtr createDictionaryVector(
std::move(wrapped));
}

VectorPtr createVectorFromReeArray(
memory::MemoryPool* pool,
const ArrowSchema& arrowSchema,
const ArrowArray& arrowArray,
bool isViewer) {
VELOX_CHECK_EQ(arrowArray.n_children, 2);
VELOX_CHECK_EQ(arrowSchema.n_children, 2);

// REE cannot have top level nulls.
VELOX_CHECK_EQ(arrowArray.null_count, 0);

auto values = importFromArrowImpl(
*arrowSchema.children[1], *arrowArray.children[1], pool, isViewer);

const auto& runEndSchema = *arrowSchema.children[0];
auto runEndType = importFromArrowImpl(runEndSchema.format, runEndSchema);
VELOX_CHECK_EQ(
runEndType->kind(),
TypeKind::INTEGER,
"Only int32 run lengths are supported for REE arrow conversion.");

// If there is more than one run, we turn it into a dictionary.
if (values->size() > 1) {
const auto& runsArray = *arrowArray.children[0];
VELOX_CHECK_EQ(runsArray.n_buffers, 2);

// REE runs cannot be null.
VELOX_CHECK_EQ(runsArray.null_count, 0);

const auto* runsBuffer = static_cast<const int32_t*>(runsArray.buffers[1]);
VELOX_CHECK_NOT_NULL(runsBuffer);

auto indices = allocateIndices(arrowArray.length, pool);
auto rawIndices = indices->asMutable<vector_size_t>();

size_t cursor = 0;
for (size_t i = 0; i < runsArray.length; ++i) {
while (cursor < runsBuffer[i]) {
rawIndices[cursor++] = i;
}
}
return BaseVector::wrapInDictionary(
nullptr, indices, arrowArray.length, values);
}
// Otherwise (single or zero runs), turn it into a constant.
else if (values->size() == 1) {
return BaseVector::wrapInConstant(arrowArray.length, 0, values);
} else {
return BaseVector::createNullConstant(values->type(), 0, pool);
}
}

VectorPtr createTimestampVector(
memory::MemoryPool* pool,
const TypePtr& type,
Expand Down Expand Up @@ -1358,6 +1417,10 @@ VectorPtr createTimestampVector(
optionalNullCount(nullCount));
}

bool isREE(const ArrowSchema& arrowSchema) {
return arrowSchema.format[0] == '+' && arrowSchema.format[1] == 'r';
}

VectorPtr importFromArrowImpl(
ArrowSchema& arrowSchema,
ArrowArray& arrowArray,
Expand Down Expand Up @@ -1405,6 +1468,10 @@ VectorPtr importFromArrowImpl(
wrapInBufferView);
}

if (isREE(arrowSchema)) {
return createVectorFromReeArray(pool, arrowSchema, arrowArray, isViewer);
}

// String data types (VARCHAR and VARBINARY).
if (type->isVarchar() || type->isVarbinary()) {
VELOX_USER_CHECK_EQ(
Expand Down
142 changes: 138 additions & 4 deletions velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1292,14 +1292,12 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest {
}

private:
template <typename F>
void testArrowRoundTrip(const arrow::Array& array, F validateVector) {
void toVeloxVector(const arrow::Array& array, VectorPtr& outVector) {
ArrowSchema schema;
ArrowArray data;
ASSERT_OK(arrow::ExportType(*array.type(), &schema));
ASSERT_OK(arrow::ExportArray(array, &data));
auto vec = importFromArrow(schema, data, pool_.get());
validateVector(*vec);
outVector = importFromArrow(schema, data, pool_.get());
if (isViewer()) {
// These release calls just decrease the refcount; there are still
// references to keep the data alive from the original Arrow array.
Expand All @@ -1309,6 +1307,16 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest {
EXPECT_FALSE(schema.release);
EXPECT_FALSE(data.release);
}
}

template <typename F>
void testArrowRoundTrip(const arrow::Array& array, F validateVector) {
VectorPtr vec;
toVeloxVector(array, vec);
validateVector(*vec);

ArrowSchema schema;
ArrowArray data;
velox::exportToArrow(vec, schema);
velox::exportToArrow(vec, data, pool_.get());
ASSERT_OK_AND_ASSIGN(auto arrowType, arrow::ImportType(&schema));
Expand Down Expand Up @@ -1373,6 +1381,124 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest {
});
}

void testImportREE() {
testImportREENoRuns();
testImportREESingleRun();
testImportREEMultipleRuns();
}

void testImportREENoRuns() {
auto pool = arrow::default_memory_pool();

arrow::RunEndEncodedBuilder reeInt32(
pool,
std::make_shared<arrow::Int32Builder>(pool),
std::make_shared<arrow::Int32Builder>(pool),
run_end_encoded(arrow::int32(), arrow::int32()));
ASSERT_OK_AND_ASSIGN(auto array, reeInt32.Finish());

VectorPtr vector;
toVeloxVector(*array, vector);

ASSERT_EQ(*vector->type(), *INTEGER());
EXPECT_EQ(vector->encoding(), VectorEncoding::Simple::CONSTANT);
EXPECT_EQ(vector->size(), 0);
}

void testImportREESingleRun() {
auto pool = arrow::default_memory_pool();

// Simple integer column.
arrow::RunEndEncodedBuilder reeInt32(
pool,
std::make_shared<arrow::Int32Builder>(pool),
std::make_shared<arrow::Int32Builder>(pool),
run_end_encoded(arrow::int32(), arrow::int32()));
ASSERT_OK(reeInt32.AppendScalar(*arrow::MakeScalar<int32_t>(123), 20));
ASSERT_OK_AND_ASSIGN(auto array, reeInt32.Finish());

validateImportREESingleRun(*array, INTEGER(), 20);

// String column.
arrow::RunEndEncodedBuilder reeString(
pool,
std::make_shared<arrow::Int32Builder>(pool),
std::make_shared<arrow::StringBuilder>(pool),
run_end_encoded(arrow::int32(), arrow::utf8()));
ASSERT_OK(reeString.AppendScalar(*arrow::MakeScalar("bla"), 199));
ASSERT_OK_AND_ASSIGN(array, reeString.Finish());

validateImportREESingleRun(*array, VARCHAR(), 199);

// Array/List.
auto valuesBuilder = std::make_shared<arrow::FloatBuilder>(pool);
auto listBuilder =
std::make_shared<arrow::ListBuilder>(pool, valuesBuilder);
ASSERT_OK(listBuilder->Append());
ASSERT_OK(valuesBuilder->Append(1.1));
ASSERT_OK(valuesBuilder->Append(2.2));
ASSERT_OK(valuesBuilder->Append(3.3));
ASSERT_OK(valuesBuilder->Append(4.4));
ASSERT_OK_AND_ASSIGN(auto listArray, listBuilder->Finish());

auto runEndBuilder = std::make_shared<arrow::Int32Builder>(pool);
ASSERT_OK(runEndBuilder->Append(123));
ASSERT_OK_AND_ASSIGN(auto runEndsArray, runEndBuilder->Finish());

// Create a list containing [1.1, 2.2, 3.3, 4.4] and repeat it 123 times.
ASSERT_OK_AND_ASSIGN(
array, arrow::RunEndEncodedArray::Make(123, runEndsArray, listArray));
validateImportREESingleRun(*array, ARRAY(REAL()), 123);
}

void validateImportREESingleRun(
const arrow::Array& array,
const TypePtr& expectedType,
size_t expectedSize) {
testArrowRoundTrip(array, [&](const BaseVector& vec) {
ASSERT_EQ(*vec.type(), *expectedType);
EXPECT_EQ(vec.encoding(), VectorEncoding::Simple::CONSTANT);
EXPECT_EQ(vec.size(), expectedSize);
});
}

void testImportREEMultipleRuns() {
auto pool = arrow::default_memory_pool();

// Simple integer column.
arrow::RunEndEncodedBuilder reeInt32(
pool,
std::make_shared<arrow::Int32Builder>(pool),
std::make_shared<arrow::Int32Builder>(pool),
run_end_encoded(arrow::int32(), arrow::int32()));
ASSERT_OK(reeInt32.AppendScalar(*arrow::MakeScalar<int32_t>(123), 20));
ASSERT_OK(reeInt32.AppendScalar(*arrow::MakeScalar<int32_t>(321), 2));
ASSERT_OK(reeInt32.AppendNulls(10));
ASSERT_OK(reeInt32.AppendScalar(*arrow::MakeScalar<int32_t>(50), 30));
ASSERT_OK_AND_ASSIGN(auto array, reeInt32.Finish());

VectorPtr vector;
toVeloxVector(*array, vector);

ASSERT_EQ(*vector->type(), *INTEGER());
EXPECT_EQ(vector->encoding(), VectorEncoding::Simple::DICTIONARY);
EXPECT_EQ(vector->size(), 62);

DecodedVector decoded(*vector);
EXPECT_TRUE(decoded.mayHaveNulls());
EXPECT_FALSE(decoded.isNullAt(0));
EXPECT_EQ(decoded.valueAt<int32_t>(0), 123);
EXPECT_EQ(decoded.valueAt<int32_t>(1), 123);
EXPECT_EQ(decoded.valueAt<int32_t>(19), 123);
EXPECT_EQ(decoded.valueAt<int32_t>(20), 321);
EXPECT_EQ(decoded.valueAt<int32_t>(21), 321);
EXPECT_TRUE(decoded.isNullAt(22));
EXPECT_TRUE(decoded.isNullAt(31));
EXPECT_EQ(decoded.valueAt<int32_t>(32), 50);
EXPECT_EQ(decoded.valueAt<int32_t>(33), 50);
EXPECT_EQ(decoded.valueAt<int32_t>(61), 50);
}

void testImportFailures() {
ArrowSchema arrowSchema;
ArrowArray arrowArray;
Expand Down Expand Up @@ -1477,6 +1603,10 @@ TEST_F(ArrowBridgeArrayImportAsViewerTest, dictionary) {
testImportDictionary();
}

TEST_F(ArrowBridgeArrayImportAsViewerTest, ree) {
testImportREE();
}

TEST_F(ArrowBridgeArrayImportAsViewerTest, failures) {
testImportFailures();
}
Expand Down Expand Up @@ -1524,6 +1654,10 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, dictionary) {
testImportDictionary();
}

TEST_F(ArrowBridgeArrayImportAsOwnerTest, ree) {
testImportREE();
}

TEST_F(ArrowBridgeArrayImportAsOwnerTest, failures) {
testImportFailures();
}
Expand Down
21 changes: 21 additions & 0 deletions velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,20 @@ class ArrowBridgeSchemaImportTest : public ArrowBridgeSchemaExportTest {
return type;
}

TypePtr testSchemaReeImport(const char* valuesFmt) {
auto reeSchema = makeArrowSchema("+r");
auto runsSchema = makeArrowSchema("i");
auto valuesSchema = makeArrowSchema(valuesFmt);

std::vector<ArrowSchema*> schemas{&runsSchema, &valuesSchema};
reeSchema.n_children = 2;
reeSchema.children = schemas.data();

auto type = importFromArrow(reeSchema);
reeSchema.release(&reeSchema);
return type;
}

ArrowSchema makeComplexArrowSchema(
std::vector<ArrowSchema>& schemas,
std::vector<ArrowSchema*>& schemaPtrs,
Expand Down Expand Up @@ -569,5 +583,12 @@ TEST_F(ArrowBridgeSchemaImportTest, dictionaryTypeTest) {
{"col1", "col2"})));
}

TEST_F(ArrowBridgeSchemaImportTest, reeTypeTest) {
// Ensure REE just returns the type of the inner `values` child.
EXPECT_EQ(DOUBLE(), testSchemaReeImport("g"));
EXPECT_EQ(INTEGER(), testSchemaReeImport("i"));
EXPECT_EQ(BIGINT(), testSchemaReeImport("l"));
}

} // namespace
} // namespace facebook::velox::test

0 comments on commit 0d02c8f

Please sign in to comment.