Skip to content

Commit

Permalink
feat(connector): Support reading Iceberg split with equality deletes
Browse files Browse the repository at this point in the history
This commit introduces EqualityDeleteFileReader, which is used to read
Iceberg splits with equality delete files.

Co-authored-by: Naveen Kumar Mahadevuni <[email protected]>
  • Loading branch information
yingsu00 and Naveen Kumar Mahadevuni committed Nov 27, 2024
1 parent ebd597a commit e9662e6
Show file tree
Hide file tree
Showing 25 changed files with 1,270 additions and 97 deletions.
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ bool applyPartitionFilter(
VELOX_FAIL(
"Bad type {} for partition value: {}", type->kind(), partitionValue);
}
return true;
}

} // namespace
Expand Down
8 changes: 7 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
ioStats_,
fileHandleFactory_,
executor_,
scanSpec_);
scanSpec_,
remainingFilterExprSet_,
expressionEvaluator_,
totalRemainingFilterTime_);
}

std::unique_ptr<HivePartitionFunction> HiveDataSource::setupBucketConversion() {
Expand Down Expand Up @@ -315,8 +318,10 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
}

splitReader_ = createSplitReader();

// Split reader subclasses may need to use the reader options in prepareSplit
// so we initialize it beforehand.

splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
}
Expand Down Expand Up @@ -583,6 +588,7 @@ std::shared_ptr<wave::WaveDataSource> HiveDataSource::toWaveDataSource() {
void HiveDataSource::registerWaveDelegateHook(WaveDelegateHookFunction hook) {
waveDelegateHook_ = hook;
}

std::shared_ptr<wave::WaveDataSource> toWaveDataSource();

} // namespace facebook::velox::connector::hive
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class HiveDataSource : public DataSource {
subfields_;
SubfieldFilters filters_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
std::shared_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
dwio::common::RuntimeStatistics runtimeStats_;
std::atomic<uint64_t> totalRemainingFilterTime_{0};
Expand Down
15 changes: 13 additions & 2 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ std::unique_ptr<SplitReader> SplitReader::create(
const std::shared_ptr<io::IoStatistics>& ioStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec) {
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime) {
// Create the SplitReader based on hiveSplit->customSplitInfo["table_format"]
if (hiveSplit->customSplitInfo.count("table_format") > 0 &&
hiveSplit->customSplitInfo["table_format"] == "hive-iceberg") {
Expand All @@ -89,7 +92,10 @@ std::unique_ptr<SplitReader> SplitReader::create(
ioStats,
fileHandleFactory,
executor,
scanSpec);
scanSpec,
remainingFilterExprSet,
expressionEvaluator,
totalRemainingFilterTime);
} else {
return std::unique_ptr<SplitReader>(new SplitReader(
hiveSplit,
Expand Down Expand Up @@ -179,6 +185,11 @@ void SplitReader::resetSplit() {
hiveSplit_.reset();
}

std::shared_ptr<const dwio::common::TypeWithId> SplitReader::baseFileSchema() {
VELOX_CHECK_NOT_NULL(baseReader_.get());
return baseReader_->typeWithId();
}

int64_t SplitReader::estimatedRowSize() const {
if (!baseRowReader_) {
return DataSource::kUnknownRowSize;
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ class SplitReader {
const std::shared_ptr<io::IoStatistics>& ioStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec);
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime);

virtual ~SplitReader() = default;

Expand All @@ -87,6 +90,8 @@ class SplitReader {

void resetSplit();

std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema();

int64_t estimatedRowSize() const;

void updateRuntimeStats(dwio::common::RuntimeStatistics& stats) const;
Expand Down
10 changes: 8 additions & 2 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(velox_hive_iceberg_splitreader IcebergSplitReader.cpp
IcebergSplit.cpp PositionalDeleteFileReader.cpp)
velox_add_library(
velox_hive_iceberg_splitreader
EqualityDeleteFileReader.cpp
FilterUtil.cpp
IcebergDeleteFile.cpp
IcebergSplitReader.cpp
IcebergSplit.cpp
PositionalDeleteFileReader.cpp)

velox_link_libraries(velox_hive_iceberg_splitreader velox_connector
Folly::folly)
Expand Down
227 changes: 227 additions & 0 deletions velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h"

#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/iceberg/FilterUtil.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/dwio/common/TypeUtils.h"

using namespace facebook::velox::common;
using namespace facebook::velox::core;
using namespace facebook::velox::exec;

namespace facebook::velox::connector::hive::iceberg {

static constexpr const int kMaxBatchRows = 10'000;

EqualityDeleteFileReader::EqualityDeleteFileReader(
const IcebergDeleteFile& deleteFile,
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<const HiveConfig>& hiveConfig,
std::shared_ptr<io::IoStatistics> ioStats,
dwio::common::RuntimeStatistics& runtimeStats,
const std::string& connectorId)
: deleteFile_(deleteFile),
baseFileSchema_(baseFileSchema),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
deleteSplit_(nullptr),
fileHandleFactory_(fileHandleFactory),
executor_(executor),
pool_(connectorQueryCtx->memoryPool()),
ioStats_(ioStats),
deleteRowReader_(nullptr) {
VELOX_CHECK_EQ(deleteFile_.content, FileContent::kEqualityDeletes);

if (deleteFile_.recordCount == 0) {
return;
}

std::unordered_set<int32_t> equalityFieldIds(
deleteFile_.equalityFieldIds.begin(), deleteFile_.equalityFieldIds.end());
auto deleteFieldSelector = [&equalityFieldIds](size_t index) {
return equalityFieldIds.find(static_cast<int32_t>(index)) !=
equalityFieldIds.end();
};
auto deleteFileSchema = dwio::common::typeutils::buildSelectedType(
baseFileSchema_, deleteFieldSelector);

rowType_ = std::static_pointer_cast<const RowType>(deleteFileSchema->type());

// TODO: push down filter if previous delete file contains this one. E.g.
// previous equality delete file has a=1, and this file also contains
// columns a, then a!=1 can be pushed as a filter when reading this delete
// file.

auto scanSpec = std::make_shared<common::ScanSpec>("<root>");
scanSpec->addAllChildFields(rowType_->asRow());

deleteSplit_ = std::make_shared<HiveConnectorSplit>(
connectorId,
deleteFile_.filePath,
deleteFile_.fileFormat,
0,
deleteFile_.fileSizeInBytes);

// Create the Reader and RowReader

dwio::common::ReaderOptions deleteReaderOpts(pool_);
configureReaderOptions(
deleteReaderOpts,
hiveConfig_,
connectorQueryCtx_,
rowType_,
deleteSplit_);

auto deleteFileHandleCachePtr =
fileHandleFactory_->generate(deleteFile_.filePath);
auto deleteFileInput = createBufferedInput(
*deleteFileHandleCachePtr,
deleteReaderOpts,
connectorQueryCtx_,
ioStats_,
executor_);

auto deleteReader =
dwio::common::getReaderFactory(deleteReaderOpts.fileFormat())
->createReader(std::move(deleteFileInput), deleteReaderOpts);

dwio::common::RowReaderOptions deleteRowReaderOpts;
configureRowReaderOptions(
{},
scanSpec,
nullptr,
rowType_,
deleteSplit_,
hiveConfig_,
connectorQueryCtx_->sessionProperties(),
deleteRowReaderOpts);

deleteRowReader_.reset();
deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts);
}

void EqualityDeleteFileReader::readDeleteValues(
SubfieldFilters& subfieldFilters,
std::vector<core::TypedExprPtr>& expressionInputs) {
VELOX_CHECK(deleteRowReader_);
VELOX_CHECK(deleteSplit_);

if (!deleteValuesOutput_) {
deleteValuesOutput_ = BaseVector::create(rowType_, 0, pool_);
}

// TODO:: verfiy if the field is a sub-field. Velox currently doesn't support
// pushing down filters to sub-fields
if (rowType_->size() == 1) {
// Construct the IN list filter that can be pushed down to the base file
// readers, then update the baseFileScanSpec.
readSingleColumnDeleteValues(subfieldFilters);
} else {
readMultipleColumnDeleteValues(expressionInputs);
}

deleteSplit_.reset();
}

void EqualityDeleteFileReader::readSingleColumnDeleteValues(
SubfieldFilters& subfieldFilters) {
std::unique_ptr<Filter> filter = std::make_unique<AlwaysTrue>();
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto vector =
std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_)->childAt(0);
auto name = rowType_->nameOf(0);

auto typeKind = vector->type()->kind();
VELOX_CHECK(
typeKind != TypeKind::DOUBLE && typeKind != TypeKind::REAL,
"Iceberg does not allow DOUBLE or REAL columns as the equality delete columns: {} : {}",
name,
typeKind);

auto notExistsFilter =
createNotExistsFilter(vector, 0, deleteValuesOutput_->size(), typeKind);
filter = filter->mergeWith(notExistsFilter.get());
}

if (filter->kind() != FilterKind::kAlwaysTrue) {
if (subfieldFilters.find(common::Subfield(rowType_->nameOf(0))) !=
subfieldFilters.end()) {
subfieldFilters[common::Subfield(rowType_->nameOf(0))] =
subfieldFilters[common::Subfield(rowType_->nameOf(0))]->mergeWith(
filter.get());
} else {
subfieldFilters[common::Subfield(rowType_->nameOf(0))] =
std::move(filter);
}
}
}

void EqualityDeleteFileReader::readMultipleColumnDeleteValues(
std::vector<core::TypedExprPtr>& expressionInputs) {
auto numDeleteFields = rowType_->size();
VELOX_CHECK_GT(
numDeleteFields,
0,
"Iceberg equality delete file should have at least one field.");

// TODO: logical expression simplifications
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto rowVector = std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_);
auto numDeletedValues = rowVector->childAt(0)->size();

for (int i = 0; i < numDeletedValues; i++) {
std::vector<core::TypedExprPtr> disconjunctInputs;

for (int j = 0; j < numDeleteFields; j++) {
auto type = rowType_->childAt(j);
auto name = rowType_->nameOf(j);
auto value = BaseVector::wrapInConstant(1, i, rowVector->childAt(j));

std::vector<core::TypedExprPtr> isNotEqualInputs;
isNotEqualInputs.push_back(
std::make_shared<FieldAccessTypedExpr>(type, name));
isNotEqualInputs.push_back(std::make_shared<ConstantTypedExpr>(value));
auto isNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), isNotEqualInputs, "neq");

disconjunctInputs.push_back(isNotEqualExpr);
}

auto disconjunctNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), disconjunctInputs, "or");
expressionInputs.push_back(disconjunctNotEqualExpr);
}
}
}

} // namespace facebook::velox::connector::hive::iceberg
Loading

0 comments on commit e9662e6

Please sign in to comment.