Skip to content

Commit

Permalink
Support reading Iceberg tables with equality delete files
Browse files Browse the repository at this point in the history
  • Loading branch information
yingsu00 committed Mar 16, 2024
1 parent 9fb0c58 commit b25fc0b
Show file tree
Hide file tree
Showing 21 changed files with 1,039 additions and 62 deletions.
1 change: 1 addition & 0 deletions fizz
Submodule fizz added at 666553
12 changes: 12 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,18 @@ void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr) {
}
}

folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
toSubfieldsMap(const RowTypePtr& rowType) {
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
subfieldsMap;
for (int i = 0; i < rowType->size(); i++) {
const std::string& name = rowType->nameOf(i);
common::Subfield subfield(name);
subfieldsMap[name].push_back(&subfield);
}
return subfieldsMap;
}

std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& rowType,
const folly::F14FastMap<std::string, std::vector<const common::Subfield*>>&
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ void checkColumnNameLowerCase(

void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr);

folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
toSubfieldsMap(const RowTypePtr& rowType);

std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& rowType,
const folly::F14FastMap<std::string, std::vector<const common::Subfield*>>&
Expand Down
16 changes: 12 additions & 4 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
&partitionKeys_,
fileHandleFactory_,
executor_,
expressionEvaluator_,
connectorQueryCtx_,
hiveConfig_,
ioStats_);
Expand All @@ -191,10 +192,12 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
}

splitReader_ = createSplitReader();

// Split reader subclasses may need to use the reader options in prepareSplit
// so we initialize it beforehand.
splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
splitReader_->prepareSplit(
metadataFilter_, remainingFilterExprSet_, runtimeStats_);
}

std::optional<RowVectorPtr> HiveDataSource::next(
Expand Down Expand Up @@ -351,11 +354,16 @@ int64_t HiveDataSource::estimatedRowSize() {
vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) {
auto filterStartMicros = getCurrentTimeMicro();
filterRows_.resize(output_->size());

expressionEvaluator_->evaluate(
remainingFilterExprSet_.get(), filterRows_, *rowVector, filterResult_);
auto res = exec::processFilterResults(
filterResult_, filterRows_, filterEvalCtx_, pool_);
auto filterResult = std::static_pointer_cast<RowVector>(filterResult_);

vector_size_t res;
for (int i = 0; i < filterResult->childrenSize(); i++) {
res = exec::processFilterResults(
filterResult->childAt(i), filterRows_, filterEvalCtx_, pool_);
}

totalRemainingFilterTime_.fetch_add(
(getCurrentTimeMicro() - filterStartMicros) * 1000,
std::memory_order_relaxed);
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class HiveDataSource : public DataSource {
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
infoColumns_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
std::shared_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
dwio::common::RuntimeStatistics runtimeStats_;
std::atomic<uint64_t> totalRemainingFilterTime_{0};
Expand Down
11 changes: 11 additions & 0 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ std::unique_ptr<SplitReader> SplitReader::create(
partitionKeys,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
core::ExpressionEvaluator* expressionEvaluator,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics>& ioStats) {
Expand All @@ -83,6 +84,7 @@ std::unique_ptr<SplitReader> SplitReader::create(
partitionKeys,
fileHandleFactory,
executor,
expressionEvaluator,
connectorQueryCtx,
hiveConfig,
ioStats);
Expand All @@ -95,6 +97,7 @@ std::unique_ptr<SplitReader> SplitReader::create(
partitionKeys,
fileHandleFactory,
executor,
expressionEvaluator,
connectorQueryCtx,
hiveConfig,
ioStats);
Expand All @@ -111,6 +114,7 @@ SplitReader::SplitReader(
partitionKeys,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
core::ExpressionEvaluator* expressionEvaluator,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics>& ioStats)
Expand All @@ -122,6 +126,7 @@ SplitReader::SplitReader(
pool_(connectorQueryCtx->memoryPool()),
fileHandleFactory_(fileHandleFactory),
executor_(executor),
expressionEvaluator_(expressionEvaluator),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
ioStats_(ioStats),
Expand All @@ -140,6 +145,7 @@ void SplitReader::configureReaderOptions(

void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
dwio::common::RuntimeStatistics& runtimeStats) {
createReader();

Expand Down Expand Up @@ -173,6 +179,11 @@ void SplitReader::resetSplit() {
hiveSplit_.reset();
}

std::shared_ptr<const dwio::common::TypeWithId> SplitReader::baseFileSchema() {
VELOX_CHECK_NOT_NULL(baseReader_.get());
return baseReader_->typeWithId();
}

int64_t SplitReader::estimatedRowSize() const {
if (!baseRowReader_) {
return DataSource::kUnknownRowSize;
Expand Down
6 changes: 6 additions & 0 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class SplitReader {
partitionKeys,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
core::ExpressionEvaluator* expressionEvaluator,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics>& ioStats);
Expand All @@ -78,6 +79,7 @@ class SplitReader {
partitionKeys,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
core::ExpressionEvaluator* expressionEvaluator,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics>& ioStats);
Expand All @@ -92,6 +94,7 @@ class SplitReader {
/// files or log files, and add column adapatations for metadata columns
virtual void prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
dwio::common::RuntimeStatistics& runtimeStats);

virtual uint64_t next(uint64_t size, VectorPtr& output);
Expand All @@ -102,6 +105,8 @@ class SplitReader {

void resetSplit();

std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema();

int64_t estimatedRowSize() const;

void updateRuntimeStats(dwio::common::RuntimeStatistics& stats) const;
Expand Down Expand Up @@ -139,6 +144,7 @@ class SplitReader {
std::unique_ptr<dwio::common::RowReader> baseRowReader_;
FileHandleFactory* const fileHandleFactory_;
folly::Executor* const executor_;
core::ExpressionEvaluator* expressionEvaluator_;
const ConnectorQueryCtx* const connectorQueryCtx_;
const std::shared_ptr<HiveConfig> hiveConfig_;
std::shared_ptr<io::IoStatistics> ioStats_;
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# limitations under the License.

add_library(
velox_hive_iceberg_splitreader IcebergSplitReader.cpp IcebergSplit.cpp
PositionalDeleteFileReader.cpp)
velox_hive_iceberg_splitreader
IcebergSplitReader.cpp IcebergSplit.cpp PositionalDeleteFileReader.cpp
EqualityDeleteFileReader.cpp FilterUtil.cpp)

target_link_libraries(velox_hive_iceberg_splitreader velox_connector
Folly::folly)
Expand Down
192 changes: 192 additions & 0 deletions velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h"

#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/connectors/hive/iceberg/FilterUtil.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/dwio/common/TypeUtils.h"

using namespace facebook::velox::common;

namespace facebook::velox::connector::hive::iceberg {

static constexpr const int kMaxBatchRows = 10'000;

EqualityDeleteFileReader::EqualityDeleteFileReader(
const IcebergDeleteFile& deleteFile,
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
core::ExpressionEvaluator* expressionEvaluator,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig> hiveConfig,
std::shared_ptr<io::IoStatistics> ioStats,
dwio::common::RuntimeStatistics& runtimeStats,
const std::string& connectorId)
: deleteFile_(deleteFile),
baseFileSchema_(baseFileSchema),
fileHandleFactory_(fileHandleFactory),
executor_(executor),
expressionEvaluator_(expressionEvaluator),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
pool_(connectorQueryCtx->memoryPool()),
ioStats_(ioStats),
deleteSplit_(nullptr),
deleteRowReader_(nullptr) {
VELOX_CHECK(deleteFile_.content == FileContent::kEqualityDeletes);

if (deleteFile_.recordCount == 0) {
return;
}

std::unordered_set<int32_t> equalityFieldIds(
deleteFile_.equalityFieldIds.begin(), deleteFile_.equalityFieldIds.end());
auto deleteFieldSelector = [&equalityFieldIds](size_t index) {
return equalityFieldIds.find(static_cast<int32_t>(index)) !=
equalityFieldIds.end();
};
auto deleteFileSchema = dwio::common::typeutils::buildSelectedType(
baseFileSchema_, deleteFieldSelector);

rowType_ = std::static_pointer_cast<const RowType>(deleteFileSchema->type());

// TODO: push down filter if previous delete file contains this one. E.g.
// previous equality delete file has a=1, and this file also contains
// columns a, then a!=1 can be pushed as a filter when reading this delete
// file.

auto scanSpec = std::make_shared<common::ScanSpec>("<root>");
scanSpec->addAllChildFields(rowType_->asRow());

deleteSplit_ = std::make_shared<HiveConnectorSplit>(
connectorId,
deleteFile_.filePath,
deleteFile_.fileFormat,
0,
deleteFile_.fileSizeInBytes);

// Create the Reader and RowReader

dwio::common::ReaderOptions deleteReaderOpts(pool_);
configureReaderOptions(
deleteReaderOpts,
hiveConfig_,
connectorQueryCtx_->sessionProperties(),
rowType_,
deleteSplit_);

auto deleteFileHandle =
fileHandleFactory_->generate(deleteFile_.filePath).second;
auto deleteFileInput = createBufferedInput(
*deleteFileHandle,
deleteReaderOpts,
connectorQueryCtx_,
ioStats_,
executor_);

auto deleteReader =
dwio::common::getReaderFactory(deleteReaderOpts.getFileFormat())
->createReader(std::move(deleteFileInput), deleteReaderOpts);

dwio::common::RowReaderOptions deleteRowReaderOpts;
configureRowReaderOptions(
deleteRowReaderOpts, {}, scanSpec, nullptr, rowType_, deleteSplit_);

deleteRowReader_.reset();
deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts);
}

void EqualityDeleteFileReader::readDeleteValues(
SubfieldFilters& subfieldFilters,
std::unique_ptr<exec::ExprSet>& exprSet) {
VELOX_CHECK(deleteRowReader_);
VELOX_CHECK(deleteSplit_);

if (!deleteValuesOutput_) {
deleteValuesOutput_ = BaseVector::create(rowType_, 0, pool_);
}

// TODO:: verfiy if the field is a sub-field. Velox currently doesn't support
// pushing down filters to sub-fields
if (rowType_->size() == 1) {
// Construct the IN list filter that can be pushed down to the base file
// readers, then update the baseFileScanSpec.
readSingleColumnDeleteValues(subfieldFilters);
} else {
readMultipleColumnDeleteValues(exprSet);
}

deleteSplit_.reset();
}

void EqualityDeleteFileReader::readSingleColumnDeleteValues(
SubfieldFilters& subfieldFilters) {
std::unique_ptr<Filter> filter = std::make_unique<AlwaysTrue>();
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto vector =
std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_)->childAt(0);
auto name = rowType_->nameOf(0);

auto typeKind = vector->type()->kind();
VELOX_CHECK(
typeKind != TypeKind::DOUBLE && typeKind != TypeKind::REAL,
"Iceberg does not allow DOUBLE or REAL columns as the equality delete columns: {} : {}",
name,
typeKind);

// TODO: handle NULLs
auto notInFilter =
createNotInFilter(vector, 0, deleteValuesOutput_->size(), typeKind);
filter = filter->mergeWith(notInFilter.get());
}

if (filter->kind() != FilterKind::kAlwaysTrue) {
subfieldFilters[common::Subfield(rowType_->nameOf(0))] = std::move(filter);
}
}

void EqualityDeleteFileReader::readMultipleColumnDeleteValues(
std::unique_ptr<exec::ExprSet>& exprSet) {
// std::vector<std::shared_ptr<exec::Expr>> inputs;

while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto rowVector = std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_);

// TODO: logical expression simplifications
auto deleteExprSet =
createEqualityDeleteExprSet(rowVector, expressionEvaluator_);
VELOX_CHECK_EQ(deleteExprSet->size(), 1);

exprSet->addExpr(deleteExprSet->expr(0));
}
}

} // namespace facebook::velox::connector::hive::iceberg
Loading

0 comments on commit b25fc0b

Please sign in to comment.