Skip to content

Commit

Permalink
Support reading Iceberg tables with equality delete files
Browse files Browse the repository at this point in the history
  • Loading branch information
yingsu00 committed Feb 13, 2024
1 parent 860c8c1 commit 73d15e6
Show file tree
Hide file tree
Showing 23 changed files with 1,194 additions and 184 deletions.
12 changes: 12 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,18 @@ void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr) {
}
}

folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
toSubfieldsMap(const RowTypePtr& rowType) {
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
subfieldsMap;
for (int i = 0; i < rowType->size(); i++) {
const std::string& name = rowType->nameOf(i);
common::Subfield subfield(name);
subfieldsMap[name].push_back(&subfield);
}
return subfieldsMap;
}

std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& rowType,
const folly::F14FastMap<std::string, std::vector<const common::Subfield*>>&
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ void checkColumnNameLowerCase(const SubfieldFilters& filters);

void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr);

folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
toSubfieldsMap(const RowTypePtr& rowType);

std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& rowType,
const folly::F14FastMap<std::string, std::vector<const common::Subfield*>>&
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
&partitionKeys_,
fileHandleFactory_,
executor_,
expressionEvaluator_,
connectorQueryCtx_,
hiveConfig_,
ioStats_);
Expand All @@ -242,10 +243,12 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
splitReader_.reset();
}
splitReader_ = createSplitReader();

// Split reader subclasses may need to use the reader options in prepareSplit
// so we initialize it beforehand.
splitReader_->configureReaderOptions();
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
splitReader_->prepareSplit(
metadataFilter_, remainingFilterExprSet_, runtimeStats_);
}

std::optional<RowVectorPtr> HiveDataSource::next(
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class HiveDataSource : public DataSource {
// The row type for the data source output, not including filter-only columns
const RowTypePtr outputType_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
std::shared_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
dwio::common::RuntimeStatistics runtimeStats_;
std::atomic<uint64_t> totalRemainingFilterTime_{0};
Expand Down
246 changes: 138 additions & 108 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ std::unique_ptr<SplitReader> SplitReader::create(
partitionKeys,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
core::ExpressionEvaluator* expressionEvaluator,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics>& ioStats) {
Expand All @@ -50,6 +51,7 @@ std::unique_ptr<SplitReader> SplitReader::create(
partitionKeys,
fileHandleFactory,
executor,
expressionEvaluator,
connectorQueryCtx,
hiveConfig,
ioStats);
Expand All @@ -62,6 +64,7 @@ std::unique_ptr<SplitReader> SplitReader::create(
partitionKeys,
fileHandleFactory,
executor,
expressionEvaluator,
connectorQueryCtx,
hiveConfig,
ioStats);
Expand All @@ -78,6 +81,7 @@ SplitReader::SplitReader(
partitionKeys,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
core::ExpressionEvaluator* expressionEvaluator,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig>& hiveConfig,
const std::shared_ptr<io::IoStatistics>& ioStats)
Expand All @@ -89,6 +93,7 @@ SplitReader::SplitReader(
pool_(connectorQueryCtx->memoryPool()),
fileHandleFactory_(fileHandleFactory),
executor_(executor),
expressionEvaluator_(expressionEvaluator),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
ioStats_(ioStats),
Expand All @@ -105,120 +110,16 @@ void SplitReader::configureReaderOptions() {

void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
dwio::common::RuntimeStatistics& runtimeStats) {
VELOX_CHECK_NE(
baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN);

std::shared_ptr<FileHandle> fileHandle;
try {
fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second;
} catch (VeloxRuntimeError& e) {
if (e.errorCode() == error_code::kFileNotFound.c_str() &&
hiveConfig_->ignoreMissingFiles(
connectorQueryCtx_->sessionProperties())) {
emptySplit_ = true;
return;
} else {
throw;
}
}
// Here we keep adding new entries to CacheTTLController when new fileHandles
// are generated, if CacheTTLController was created. Creator of
// CacheTTLController needs to make sure a size control strategy was available
// such as removing aged out entries.
if (auto* cacheTTLController = cache::CacheTTLController::getInstance()) {
cacheTTLController->addOpenFileInfo(fileHandle->uuid.id());
}
auto baseFileInput = createBufferedInput(
*fileHandle, baseReaderOpts_, connectorQueryCtx_, ioStats_, executor_);

baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat())
->createReader(std::move(baseFileInput), baseReaderOpts_);
createReader();

// Note that this doesn't apply to Hudi tables.
emptySplit_ = false;
if (baseReader_->numberOfRows() == 0) {
emptySplit_ = true;
return;
}

// Check filters and see if the whole split can be skipped. Note that this
// doesn't apply to Hudi tables.
if (!testFilters(
scanSpec_.get(),
baseReader_.get(),
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
partitionKeys_)) {
if (testEmptySplit(runtimeStats)) {
emptySplit_ = true;
++runtimeStats.skippedSplits;
runtimeStats.skippedSplitBytes += hiveSplit_->length;
return;
}

auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema());

configureRowReaderOptions(
baseRowReaderOpts_,
hiveTableHandle_->tableParameters(),
scanSpec_,
metadataFilter,
ROW(std::vector<std::string>(fileType->names()), std::move(columnTypes)),
hiveSplit_);
// NOTE: we firstly reset the finished 'baseRowReader_' of previous split
// before setting up for the next one to avoid doubling the peak memory usage.
baseRowReader_.reset();
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);
}

std::vector<TypePtr> SplitReader::adaptColumns(
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema) {
// Keep track of schema types for columns in file, used by ColumnSelector.
std::vector<TypePtr> columnTypes = fileType->children();

auto& childrenSpecs = scanSpec_->children();
for (size_t i = 0; i < childrenSpecs.size(); ++i) {
auto* childSpec = childrenSpecs[i].get();
const std::string& fieldName = childSpec->fieldName();

auto iter = hiveSplit_->partitionKeys.find(fieldName);
if (iter != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, iter->second);
} else if (fieldName == kPath) {
setConstantValue(
childSpec, VARCHAR(), velox::variant(hiveSplit_->filePath));
} else if (fieldName == kBucket) {
if (hiveSplit_->tableBucketNumber.has_value()) {
setConstantValue(
childSpec,
INTEGER(),
velox::variant(hiveSplit_->tableBucketNumber.value()));
}
} else {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema);
setNullConstantValue(childSpec, tableSchema->findChild(fieldName));
} else {
// Column no longer missing, reset constant value set on the spec.
childSpec->setConstantValue(nullptr);
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
if (outputTypeIdx.has_value()) {
// We know the fieldName exists in the file, make the type at that
// position match what we expect in the output.
columnTypes[fileTypeIdx.value()] =
readerOutputType_->childAt(*outputTypeIdx);
}
}
}
}

scanSpec_->resetCachedValues(false);

return columnTypes;
createRowReader(metadataFilter);
}

uint64_t SplitReader::next(int64_t size, VectorPtr& output) {
Expand All @@ -239,6 +140,11 @@ void SplitReader::resetSplit() {
hiveSplit_.reset();
}

std::shared_ptr<const dwio::common::TypeWithId> SplitReader::baseFileSchema() {
VELOX_CHECK_NOT_NULL(baseReader_.get());
return baseReader_->typeWithId();
}

int64_t SplitReader::estimatedRowSize() const {
if (!baseRowReader_) {
return DataSource::kUnknownRowSize;
Expand Down Expand Up @@ -341,6 +247,130 @@ std::string SplitReader::toString() const {
static_cast<const void*>(baseRowReader_.get()));
}

void SplitReader::createReader() {
VELOX_CHECK_NE(
baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN);

std::shared_ptr<FileHandle> fileHandle;
try {
fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second;
} catch (VeloxRuntimeError& e) {
if (e.errorCode() == error_code::kFileNotFound.c_str() &&
hiveConfig_->ignoreMissingFiles(
connectorQueryCtx_->sessionProperties())) {
emptySplit_ = true;
return;
} else {
throw;
}
}

// Here we keep adding new entries to CacheTTLController when new fileHandles
// are generated, if CacheTTLController was created. Creator of
// CacheTTLController needs to make sure a size control strategy was available
// such as removing aged out entries.
if (auto* cacheTTLController = cache::CacheTTLController::getInstance()) {
cacheTTLController->addOpenFileInfo(fileHandle->uuid.id());
}
auto baseFileInput = createBufferedInput(
*fileHandle, baseReaderOpts_, connectorQueryCtx_, ioStats_, executor_);

baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat())
->createReader(std::move(baseFileInput), baseReaderOpts_);
}

void SplitReader::createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter) {
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema());

configureRowReaderOptions(
baseRowReaderOpts_,
hiveTableHandle_->tableParameters(),
scanSpec_,
metadataFilter,
ROW(std::vector<std::string>(fileType->names()), std::move(columnTypes)),
hiveSplit_);
// NOTE: we firstly reset the finished 'baseRowReader_' of previous split
// before setting up for the next one to avoid doubling the peak memory usage.
baseRowReader_.reset();
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);
}

bool SplitReader::testEmptySplit(
dwio::common::RuntimeStatistics& runtimeStats) {
// Note that this doesn't apply to Hudi tables.
// emptySplit_ = false;
if (baseReader_->numberOfRows() == 0) {
return true;
}

// Check filters and see if the whole split can be skipped. Note that this
// doesn't apply to Hudi tables.
if (!testFilters(
scanSpec_.get(),
baseReader_.get(),
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
partitionKeys_)) {
// emptySplit_ = true;
++runtimeStats.skippedSplits;
runtimeStats.skippedSplitBytes += hiveSplit_->length;
return true;
}

return false;
}

std::vector<TypePtr> SplitReader::adaptColumns(
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema) {
// Keep track of schema types for columns in file, used by ColumnSelector.
std::vector<TypePtr> columnTypes = fileType->children();

auto& childrenSpecs = scanSpec_->children();
for (size_t i = 0; i < childrenSpecs.size(); ++i) {
auto* childSpec = childrenSpecs[i].get();
const std::string& fieldName = childSpec->fieldName();

auto iter = hiveSplit_->partitionKeys.find(fieldName);
if (iter != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, iter->second);
} else if (fieldName == kPath) {
setConstantValue(
childSpec, VARCHAR(), velox::variant(hiveSplit_->filePath));
} else if (fieldName == kBucket) {
if (hiveSplit_->tableBucketNumber.has_value()) {
setConstantValue(
childSpec,
INTEGER(),
velox::variant(hiveSplit_->tableBucketNumber.value()));
}
} else {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema);
setNullConstantValue(childSpec, tableSchema->findChild(fieldName));
} else {
// Column no longer missing, reset constant value set on the spec.
childSpec->setConstantValue(nullptr);
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
if (outputTypeIdx.has_value()) {
// We know the fieldName exists in the file, make the type at that
// position match what we expect in the output.
columnTypes[fileTypeIdx.value()] =
readerOutputType_->childAt(*outputTypeIdx);
}
}
}
}

scanSpec_->resetCachedValues(false);

return columnTypes;
}

} // namespace facebook::velox::connector::hive

template <>
Expand Down
Loading

0 comments on commit 73d15e6

Please sign in to comment.