Skip to content

Commit

Permalink
Combine multiple layers of dictionaries when writing flatmap column (#…
Browse files Browse the repository at this point in the history
…114)

Summary:
Pull Request resolved: #114

The current writer cannot handle dictionary around flat map column.  Fix this case by push the dictionary to flat map values, so they can be potentially encoded with `ArrayWithOffsets`.

Reviewed By: xiaoxmeng, HuamengJiang

Differential Revision: D66992657
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 12, 2024
1 parent cc6724a commit 5c1e610
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 19 deletions.
8 changes: 4 additions & 4 deletions CMake/abseil.cmake → CMake/Findabseil.cmake
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
include_guard(GLOBAL)

# TODO: these variables are named VELOX_* because we are piggy-backing on
# Velox's resolve dependency module for now. We should change and have
# our own in the future.
# TODO: these variables are named VELOX_* because we are piggy-backing on
# Velox's resolve dependency module for now. We should change and have our own
# in the future.
set(VELOX_ABSEIL_VERSION 20240116.0)
set(VELOX_ABSEIL_BUILD_SHA256_CHECKSUM
"338420448b140f0dfd1a1ea3c3ce71b3bc172071f24f4d9a57d59b45037da440")
set(VELOX_ABSEIL_SOURCE_URL
"https://github.com/abseil/abseil-cpp/archive/refs/tags/${VELOX_ABSEIL_VERSION}.tar.gz")

resolve_dependency_url(ABSEIL)
velox_resolve_dependency_url(ABSEIL)

message(STATUS "Building abseil from source")

Expand Down
24 changes: 12 additions & 12 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,25 @@ include(CTest) # include after project() but before add_subdirectory()
# generated .cpp/.h files), but adding this for convenience for now.
find_package(FlatBuffers REQUIRED)

set_source(gtest)
resolve_dependency(gtest)
velox_set_source(gtest)
velox_resolve_dependency(gtest)

set_source(glog)
resolve_dependency(glog)
velox_set_source(glog)
velox_resolve_dependency(glog)

set_source(gflags)
resolve_dependency(gflags COMPONENTS shared)
velox_set_source(gflags)
velox_resolve_dependency(gflags COMPONENTS shared)

set(BOOST_INCLUDE_LIBRARIES algorithm context filesystem program_options)

set_source(Boost)
resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES})
velox_set_source(Boost)
velox_resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES})

set_source(folly)
resolve_dependency(folly)
velox_set_source(folly)
velox_resolve_dependency(folly)

set_source(abseil)
resolve_dependency(abseil)
velox_set_source(abseil)
velox_resolve_dependency(abseil)

# Use xxhash and xsimd from Velox for now.
include_directories(.)
Expand Down
10 changes: 8 additions & 2 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,14 @@ class FlatMapFieldWriter : public FieldWriter {
folly::Executor* executor = nullptr) override {
// Check if the vector received is already flattened
const auto isFlatMap = vector->type()->kind() == velox::TypeKind::ROW;
isFlatMap ? ingestFlattenedMap(vector, ranges)
: ingestMap(vector, ranges, executor);
if (isFlatMap) {
ingestFlattenedMap(
velox::RowVector::pushDictionaryToRowVectorLeaves(
velox::BaseVector::loadedVectorShared(vector)),
ranges);
} else {
ingestMap(vector, ranges, executor);
}
}

FlatMapPassthroughValueFieldWriter& createPassthroughValueFieldWriter(
Expand Down
58 changes: 58 additions & 0 deletions dwio/nimble/velox/tests/VeloxWriterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,64 @@ TEST_F(VeloxWriterTests, EncodingLayoutSchemaEvolutionExpandingRow) {
// that no captured encoding was used.
}

TEST_F(VeloxWriterTests, CombineMultipleLayersOfDictionaries) {
using namespace facebook::velox;
test::VectorMaker vectorMaker{leafPool_.get()};
auto wrapInDictionary = [&](const std::vector<vector_size_t>& indices,
const VectorPtr& values) {
auto buf =
AlignedBuffer::allocate<vector_size_t>(indices.size(), leafPool_.get());
memcpy(
buf->asMutable<vector_size_t>(),
indices.data(),
sizeof(vector_size_t) * indices.size());
return BaseVector::wrapInDictionary(nullptr, buf, indices.size(), values);
};
auto vector = vectorMaker.rowVector({
wrapInDictionary(
{0, 0, 1, 1},
vectorMaker.rowVector({
wrapInDictionary(
{0, 0}, vectorMaker.arrayVector<int64_t>({{1, 2, 3}})),
})),
});
nimble::VeloxWriterOptions options;
options.flatMapColumns = {"c0"};
options.dictionaryArrayColumns = {"c0"};
std::string file;
auto writeFile = std::make_unique<InMemoryWriteFile>(&file);
nimble::VeloxWriter writer(
*rootPool_,
ROW({"c0"}, {MAP(VARCHAR(), ARRAY(BIGINT()))}),
std::move(writeFile),
std::move(options));
writer.write(vector);
writer.close();
InMemoryReadFile readFile(file);
nimble::VeloxReadParams params;
params.readFlatMapFieldAsStruct = {"c0"};
params.flatMapFeatureSelector["c0"].features = {"c0"};
nimble::VeloxReader reader(*leafPool_, &readFile, nullptr, std::move(params));
VectorPtr result;
ASSERT_TRUE(reader.next(4, result));
ASSERT_EQ(result->size(), 4);
auto* c0 = result->asChecked<RowVector>()->childAt(0)->asChecked<RowVector>();
auto& dict = c0->childAt(0);
ASSERT_EQ(dict->encoding(), VectorEncoding::Simple::DICTIONARY);
ASSERT_EQ(dict->size(), 4);
auto* indices = dict->wrapInfo()->as<vector_size_t>();
for (int i = 0; i < 4; ++i) {
ASSERT_EQ(indices[i], 0);
}
auto* values = dict->valueVector()->asChecked<ArrayVector>();
ASSERT_EQ(values->size(), 1);
auto* elements = values->elements()->asChecked<SimpleVector<int64_t>>();
ASSERT_EQ(values->sizeAt(0), 3);
for (int i = 0; i < 3; ++i) {
ASSERT_EQ(elements->valueAt(i + values->offsetAt(0)), 1 + i);
}
}

#define ASSERT_CHUNK_COUNT(count, chunked) \
for (auto __i = 0; __i < count; ++__i) { \
ASSERT_TRUE(chunked.hasNext()); \
Expand Down
2 changes: 1 addition & 1 deletion velox
Submodule velox updated 1626 files

0 comments on commit 5c1e610

Please sign in to comment.