Skip to content

Commit

Permalink
Fix the output type of complex type vector (#359)
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo authored and zhejiangxiaomai committed Jul 14, 2023
1 parent 5ff5303 commit 97e308e
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 10 deletions.
14 changes: 12 additions & 2 deletions velox/dwio/common/SelectiveRepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,19 @@ void SelectiveListColumnReader::getValues(RowSet rows, VectorPtr* result) {
}
*result = std::make_shared<ArrayVector>(
&memoryPool_,
requestedType_->type,
outputType_ ? outputType_ : requestedType_->type,
anyNulls_ ? resultNulls_ : nullptr,
rows.size(),
offsets_,
sizes_,
elements);
}

void SelectiveListColumnReader::setOutputType(
const std::shared_ptr<const ArrayType>& outputType) {
outputType_ = outputType;
}

SelectiveMapColumnReader::SelectiveMapColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
Expand Down Expand Up @@ -279,7 +284,7 @@ void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) {
}
*result = std::make_shared<MapVector>(
&memoryPool_,
requestedType_->type,
outputType_ ? outputType_ : requestedType_->type,
anyNulls_ ? resultNulls_ : nullptr,
rows.size(),
offsets_,
Expand All @@ -288,4 +293,9 @@ void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) {
values);
}

void SelectiveMapColumnReader::setOutputType(
const std::shared_ptr<const MapType>& outputType) {
outputType_ = outputType;
}

} // namespace facebook::velox::dwio::common
11 changes: 11 additions & 0 deletions velox/dwio/common/SelectiveRepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ class SelectiveListColumnReader : public SelectiveRepeatedColumnReader {
void getValues(RowSet rows, VectorPtr* FOLLY_NULLABLE result) override;

protected:
void setOutputType(const std::shared_ptr<const ArrayType>& outputType);

std::unique_ptr<SelectiveColumnReader> child_;
const std::shared_ptr<const dwio::common::TypeWithId> requestedType_;

private:
std::shared_ptr<const ArrayType> outputType_ = nullptr;
};

class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader {
Expand Down Expand Up @@ -138,6 +143,12 @@ class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader {
std::unique_ptr<SelectiveColumnReader> keyReader_;
std::unique_ptr<SelectiveColumnReader> elementReader_;
const std::shared_ptr<const dwio::common::TypeWithId> requestedType_;

protected:
void setOutputType(const std::shared_ptr<const MapType>& outputType);

private:
std::shared_ptr<const MapType> outputType_ = nullptr;
};

} // namespace facebook::velox::dwio::common
63 changes: 55 additions & 8 deletions velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ void fillRowVectorChildren(
}
}
}

} // namespace

void SelectiveStructColumnReaderBase::getValues(
Expand All @@ -228,16 +229,14 @@ void SelectiveStructColumnReaderBase::getValues(
VELOX_CHECK(
result->get()->type()->isRow(),
"Struct reader expects a result of type ROW.");
auto& rowType = result->get()->type()->asRow();
if (!result->unique() || result->get()->isLazy()) {
checkOutputType(outputType_, asRowType(requestedType_->type));
const auto& outDataType = outputType_ ? outputType_ : result->get()->type();
auto& rowType = outDataType->asRow();
if (outputType_ || !result->unique() || result->get()->isLazy()) {
std::vector<VectorPtr> children(rowType.size());
fillRowVectorChildren(*result->get()->pool(), rowType, children);
*result = std::make_unique<RowVector>(
result->get()->pool(),
result->get()->type(),
nullptr,
0,
std::move(children));
result->get()->pool(), outDataType, nullptr, 0, std::move(children));
}
auto* resultRow = static_cast<RowVector*>(result->get());
resultRow->resize(rows.size());
Expand Down Expand Up @@ -277,7 +276,7 @@ void SelectiveStructColumnReaderBase::getValues(
}
resultRow->childAt(channel) = std::make_shared<LazyVector>(
&memoryPool_,
resultRow->type()->childAt(channel),
outDataType->childAt(channel),
rows.size(),
std::make_unique<ColumnLoader>(this, children_[index], numReads_));
} else {
Expand All @@ -287,4 +286,52 @@ void SelectiveStructColumnReaderBase::getValues(
}
}

void SelectiveStructColumnReaderBase::setOutputType(
const RowTypePtr& outputType) {
outputType_ = outputType;
}

/**
* Check the output type against requested type on compatibility.
* @param outputType: the output type from user.
* @param requestedType: the type from Parquet.
*/
void SelectiveStructColumnReaderBase::checkOutputType(
const RowTypePtr& outputType,
const RowTypePtr& requestedType) {
if (outputType == nullptr) {
return;
}
VELOX_CHECK_NOT_NULL(requestedType);
for (int i = 0; i < outputType->size(); ++i) {
if (!requestedType->containsChild(outputType->nameOf(i)))
continue;

bool isPartitionColumn = false;
for (const auto& childSpec : scanSpec_->children()) {
if (childSpec->fieldName() == outputType->nameOf(i) &&
childSpec->isConstant()) {
isPartitionColumn = true;
break;
}
}
// Skip the type check for partition column because requested type does not
// contain it.
if (isPartitionColumn)
continue;

const auto& childOutputType = outputType->childAt(i);
const auto& childRequestedType =
requestedType->findChild(outputType->nameOf(i));
if (auto rowTypePtr = asRowType(childOutputType)) {
VELOX_CHECK_NOT_NULL(asRowType(childRequestedType));
checkOutputType(
asRowType(childOutputType), asRowType(childRequestedType));
continue;
}
VELOX_CHECK(BaseVector::compatibleKind(
childOutputType->kind(), childRequestedType->kind()));
}
}

} // namespace facebook::velox::dwio::common
9 changes: 9 additions & 0 deletions velox/dwio/common/SelectiveStructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
return hasMutation_;
}

void setOutputType(const RowTypePtr& outputType);

const std::shared_ptr<const dwio::common::TypeWithId> requestedType_;

std::vector<SelectiveColumnReader*> children_;
Expand All @@ -141,6 +143,13 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
// and query. Set at construction, which takes place on first
// use. If no ExceptionContext is in effect, this is "".
const std::string debugString_;

private:
void checkOutputType(
const RowTypePtr& outputType,
const RowTypePtr& requestedType);

RowTypePtr outputType_ = nullptr;
};

struct SelectiveStructColumnReader : SelectiveStructColumnReaderBase {
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/parquet/reader/RepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ MapColumnReader::MapColumnReader(
scanSpec) {
const std::shared_ptr<const MapType>& mapTypePtr =
std::dynamic_pointer_cast<const MapType>(colType);
setOutputType(mapTypePtr);
auto& keyChildType = requestedType->childAt(0);
auto& elementChildType = requestedType->childAt(1);
keyReader_ = ParquetColumnReader::build(
Expand Down Expand Up @@ -249,6 +250,7 @@ ListColumnReader::ListColumnReader(
auto& childType = requestedType->childAt(0);
const std::shared_ptr<const ArrayType>& arrayTypePtr =
std::dynamic_pointer_cast<const ArrayType>(colType);
setOutputType(arrayTypePtr);
child_ = ParquetColumnReader::build(
childType,
params,
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/parquet/reader/StructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ StructColumnReader::StructColumnReader(
rowTypePtr = asRowType(colType);
VELOX_CHECK_NOT_NULL(rowTypePtr);
}
setOutputType(rowTypePtr);

auto& childSpecs = scanSpec_->children();
if (rowTypePtr && !caseSensitive) {
Expand Down

0 comments on commit 97e308e

Please sign in to comment.