Skip to content

Commit

Permalink
Fix the issue that the splits cannot be skipped for some special case
Browse files Browse the repository at this point in the history
In some rare case, the tables created by other engines would put the
partition key columns in the data file too, and fail to write ColumnStats
for such columns. In such case, the split from a partition with NULL
partition keys failed to be skipped, causing wrong results. This is
because HiveConnectorUtils::testFilters() assumes the partition keys are
not in the data file, therefore is unable to apply the filter on the
partition key. This commit fixes this issue by also checking the partition
key list even when the partition columns are in the data file.
  • Loading branch information
yingsu00 committed Apr 2, 2024
1 parent 06b48eb commit 929b685
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 14 deletions.
14 changes: 10 additions & 4 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,18 +582,24 @@ bool testFilters(
for (const auto& child : scanSpec->children()) {
if (child->filter()) {
const auto& name = child->fieldName();
if (!rowType->containsChild(name)) {
// If missing column is partition key.
auto iter = partitionKey.find(name);
auto iter = partitionKey.find(name);
// The partition key columns are writen in the data file for
// IcebergTables, so we need to test both cases
if (!rowType->containsChild(name) || iter != partitionKey.end()) {
if (iter != partitionKey.end() && iter->second.has_value()) {
// This is a non-null partition key
return applyPartitionFilter(
(*partitionKeysHandle)[name]->dataType()->kind(),
iter->second.value(),
child->filter());
}
// Column is missing. Most likely due to schema evolution.
// Column is missing, most likely due to schema evolution. Or it's a
// partition key but the partition value is NULL.
if (child->filter()->isDeterministic() &&
!child->filter()->testNull()) {
VLOG(1) << "Skipping " << filePath
<< " because the filter testNull() failed for column "
<< child->fieldName();
return false;
}
} else {
Expand Down
97 changes: 90 additions & 7 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ class TableScanTest : public virtual HiveConnectorTestBase {
}

std::vector<RowVectorPtr> makeVectors(
int32_t count,
int32_t rowsPerVector,
const RowTypePtr& rowType = nullptr,
std::function<bool(vector_size_t /*index*/)> isNullAt = nullptr) {
auto inputs = rowType ? rowType : rowType_;
return HiveConnectorTestBase::makeVectors(
inputs, count, rowsPerVector, isNullAt);
}

std::vector<RowVectorPtr> makeNullableVectors(
int32_t count,
int32_t rowsPerVector,
const RowTypePtr& rowType = nullptr) {
Expand Down Expand Up @@ -154,7 +164,8 @@ class TableScanTest : public virtual HiveConnectorTestBase {
void testPartitionedTableImpl(
const std::string& filePath,
const TypePtr& partitionType,
const std::optional<std::string>& partitionValue) {
const std::optional<std::string>& partitionValue,
bool isPartitionColumnInFile = false) {
auto split = HiveConnectorSplitBuilder(filePath)
.partitionKey("pkey", partitionValue)
.build();
Expand All @@ -174,8 +185,11 @@ class TableScanTest : public virtual HiveConnectorTestBase {

std::string partitionValueStr =
partitionValue.has_value() ? "'" + *partitionValue + "'" : "null";
assertQuery(
op, split, fmt::format("SELECT {}, * FROM tmp", partitionValueStr));

std::string duckdbSql = isPartitionColumnInFile
? "SELECT * FROM tmp"
: fmt::format("SELECT {}, * FROM tmp", partitionValueStr);
assertQuery(op, split, duckdbSql);

outputType = ROW({"c0", "pkey", "c1"}, {BIGINT(), partitionType, DOUBLE()});
op = PlanBuilder()
Expand Down Expand Up @@ -213,12 +227,60 @@ class TableScanTest : public virtual HiveConnectorTestBase {
op, split, fmt::format("SELECT {} FROM tmp", partitionValueStr));
}

void testPartitionedTable(
void testPartitionedTableWithFilterOnPartitionKey(
const RowTypePtr& fileSchema,
const std::string& filePath,
const TypePtr& partitionType,
const std::optional<std::string>& partitionValue) {
testPartitionedTableImpl(filePath, partitionType, partitionValue);
testPartitionedTableImpl(filePath, partitionType, std::nullopt);
// The filter on the partition key cannot eliminate the partition, therefore
// the split should NOT be skipped and all rows in it should be selected.
auto split = HiveConnectorSplitBuilder(filePath)
.partitionKey("pkey", partitionValue)
.build();
auto outputType = ROW({"c0", "c1"}, {BIGINT(), DOUBLE()});
ColumnHandleMap assignments = {
{"pkey", partitionKey("pkey", partitionType)},
{"c0", regularColumn("c0", BIGINT())},
{"c1", regularColumn("c1", DOUBLE())}};
std::string filter = partitionValue.has_value()
? "pkey = " + partitionValue.value()
: "pkey IS NULL";
auto op = PlanBuilder()
.startTableScan()
.dataColumns(fileSchema)
.outputType(outputType)
.assignments(assignments)
.subfieldFilter(filter)
.endTableScan()
.planNode();

assertQuery(op, split, fmt::format("SELECT c0, c1 FROM tmp"));

// The split should be skipped because the partition key does not pass
// the filter
filter = partitionValue.has_value() ? "pkey <> " + partitionValue.value()
: "pkey IS NOT NULL";
op = PlanBuilder()
.startTableScan()
.dataColumns(fileSchema)
.outputType(outputType)
.assignments(assignments)
.subfieldFilter(filter)
.endTableScan()
.planNode();
assertQuery(op, split, fmt::format("SELECT c0, c1 FROM tmp WHERE 1 = 0"));
}

void testPartitionedTable(
const std::string& filePath,
const TypePtr& partitionType,
const std::optional<std::string>& partitionValue,
const RowTypePtr& fileSchema = nullptr,
bool isPartitionColumnInFile = false) {
testPartitionedTableImpl(
filePath, partitionType, partitionValue, isPartitionColumnInFile);
testPartitionedTableImpl(
filePath, partitionType, std::nullopt, isPartitionColumnInFile);
}

RowTypePtr rowType_{
Expand Down Expand Up @@ -1678,7 +1740,28 @@ TEST_F(TableScanTest, partitionedTableDateKey) {
auto filePath = TempFilePath::create();
writeToFile(filePath->path, vectors);
createDuckDbTable(vectors);
testPartitionedTable(filePath->path, DATE(), "2023-10-27");
testPartitionedTable(filePath->path, DATE(), "2023-10-27", rowType);
}

// Partition key was written as a real column in the data file, and the value is
// NULL. The column does not have column statistics
TEST_F(TableScanTest, partitionedTablePartitionKeyInFile) {
auto fileSchema = ROW({"c0", "c1", "pkey"}, {BIGINT(), DOUBLE(), BIGINT()});
auto filePath = TempFilePath::create();
// Create values of all nulls.
auto vectors =
makeVectors(10, 1'000, fileSchema, [](vector_size_t i) { return true; });
// auto vectors = makeVectors(10, 1'000, rowType);
writeToFile(
filePath->path,
vectors,
std::make_shared<facebook::velox::dwrf::Config>(),
false);
createDuckDbTable(vectors);
testPartitionedTable(
filePath->path, BIGINT(), std::nullopt, fileSchema, true);
testPartitionedTableWithFilterOnPartitionKey(
fileSchema, filePath->path, BIGINT(), std::nullopt);
}

std::vector<StringView> toStringViews(const std::vector<std::string>& values) {
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/tests/utils/HiveConnectorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ void HiveConnectorTestBase::writeToFile(
std::vector<RowVectorPtr> HiveConnectorTestBase::makeVectors(
const RowTypePtr& rowType,
int32_t numVectors,
int32_t rowsPerVector) {
int32_t rowsPerVector,
std::function<bool(vector_size_t /*index*/)> isNullAt) {
std::vector<RowVectorPtr> vectors;
for (int32_t i = 0; i < numVectors; ++i) {
auto vector = std::dynamic_pointer_cast<RowVector>(
velox::test::BatchMaker::createBatch(rowType, rowsPerVector, *pool_));
velox::test::BatchMaker::createBatch(
rowType, rowsPerVector, *pool_, isNullAt));
vectors.push_back(vector);
}
return vectors;
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/utils/HiveConnectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class HiveConnectorTestBase : public OperatorTestBase {
std::vector<RowVectorPtr> makeVectors(
const RowTypePtr& rowType,
int32_t numVectors,
int32_t rowsPerVector);
int32_t rowsPerVector,
std::function<bool(vector_size_t /*index*/)> isNullAt = nullptr);

using OperatorTestBase::assertQuery;

Expand Down

0 comments on commit 929b685

Please sign in to comment.